Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
import tensorflow as tf
import numpy as np
from tensorflow import keras
from tensorflow.keras import layers
import pandas as pd
TEST_SIZE = 365*3//5
VALIDATION_SIZE = 365*4//5
BATCH_SIZE = 64
INPUT_TIME_STEP = 10 # in minutes
INPUT_FEATURES_SIZE = 7
SLEEP_STAGES = 4
MAX_EPOCHS = 20
SLEEP_DATA_PATH = ".data/sleep_data_simple.csv"
def training_test_split_by_unique_index(data, index: str, test_size: int = 10):
test_ids = np.random.choice(data[index].unique(), size = test_size, replace=False)
return data[~data[index].isin(test_ids)], data[data[index].isin(test_ids)]
# Adapted from https://www.tensorflow.org/tutorials/structured_data/time_series
class WindowGenerator():
def __init__(self, data, index: str = "sleep_id", input_width: int = INPUT_TIME_STEP, validation_size: int = VALIDATION_SIZE, test_size: int = TEST_SIZE, input_feature_slice: slice = slice(1,100), label_feature_slice: slice = slice(-4,100), generate_data_now: bool = True):
# Partition data
self.training, self.testing = training_test_split_by_unique_index(data, index, test_size)
self.training, self.validation = training_test_split_by_unique_index(self.training, index, validation_size)
# Window paramters
self.input_width = input_width
self.label_width = 1
self.shift = 1
self.total_window_size = self.input_width + self.shift
self.input_slice = slice(0, input_width)
self.input_indices = np.arange(self.total_window_size)[self.input_slice]
self.label_start = self.total_window_size - self.label_width
self.labels_slice = slice(self.label_start, None)
self.label_indices = np.arange(self.total_window_size)[self.labels_slice]
self.input_feature_slice = input_feature_slice
self.label_feature_slice = label_feature_slice
self.sample_ds = self.make_dataset(data[data[index] == 0])
if generate_data_now:
self.training_ds = self.make_dataset(self.training, index)
self.validation_ds = self.make_dataset(self.validation, index)
self.testing_ds = self.make_dataset(self.testing, index)
def __repr__(self):
return "WindowGenerator:\n\t" +'\n\t'.join([
f'Total window size: {self.total_window_size}',
f'Input indices: {self.input_indices}',
f'Label indices: {self.label_indices}',
])
def split_window(self, features):
inputs = features[:, self.input_slice, self.input_feature_slice]
labels = tf.squeeze(features[:, self.labels_slice, self.label_feature_slice])
inputs.set_shape([None, self.input_width, None])
# labels.set_shape([None, self.label_width, None])
return inputs, labels
def make_dataset(self, data, index_group: str = "sleep_id", sort_by: str = "minutes_since_begin"):
ds_all = None
for i_group in data[index_group].unique():
subset_data = np.array(data[data[index_group] == i_group].sort_values(by=[sort_by]), dtype=np.float32)
ds = tf.keras.utils.timeseries_dataset_from_array(
data=subset_data,
targets=None,
sequence_length=self.total_window_size,
sequence_stride=1,
shuffle=False,
batch_size=BATCH_SIZE,)
ds_all = ds if ds_all is None else ds_all.concatenate(ds)
ds_all = ds_all.map(self.split_window)
return ds_all
# Adapted from https://www.tensorflow.org/tutorials/structured_data/time_series#linear_model
def compile_and_fit(model, window: WindowGenerator, loss = tf.losses.CategoricalCrossentropy(from_logits=True), optimizer = tf.optimizers.Adam(), metrics = None, early_stop: bool = True, patience:int = 5, baseline = None, epochs: int = MAX_EPOCHS):
if metrics is None:
metrics = [tf.keras.metrics.CategoricalCrossentropy(from_logits=True), tf.keras.metrics.CategoricalAccuracy(), tf.keras.metrics.CategoricalHinge()]
callbacks = []
if early_stop:
early_stopping = tf.keras.callbacks.EarlyStopping(
monitor='val_loss',
patience=patience,
baseline = baseline,
mode='min'
)
callbacks.append(early_stopping)
model.compile(
loss=loss,
optimizer=optimizer,
metrics=metrics,
)
return model.fit(window.training_ds, epochs=epochs, validation_data=window.validation_ds, callbacks=callbacks)
def save_history(history, file_name: str = "history.csv"):
pd.DataFrame.from_dict(history).to_csv(file_name, index=False)
sleep_data = pd.read_csv(SLEEP_DATA_PATH)
wg = WindowGenerator(sleep_data)