import tensorflow as tf import numpy as np from tensorflow import keras from tensorflow.keras import layers import pandas as pd TEST_SIZE = 365*3//5 VALIDATION_SIZE = 365*4//5 BATCH_SIZE = 64 INPUT_TIME_STEP = 10 # in minutes INPUT_FEATURES_SIZE = 7 SLEEP_STAGES = 4 MAX_EPOCHS = 20 SLEEP_DATA_PATH = ".data/sleep_data_simple.csv" def training_test_split_by_unique_index(data, index: str, test_size: int = 10): test_ids = np.random.choice(data[index].unique(), size = test_size, replace=False) return data[~data[index].isin(test_ids)], data[data[index].isin(test_ids)] # Adapted from https://www.tensorflow.org/tutorials/structured_data/time_series class WindowGenerator(): def __init__(self, data, index: str = "sleep_id", input_width: int = INPUT_TIME_STEP, validation_size: int = VALIDATION_SIZE, test_size: int = TEST_SIZE, input_feature_slice: slice = slice(1,100), label_feature_slice: slice = slice(-4,100), generate_data_now: bool = True): # Partition data self.training, self.testing = training_test_split_by_unique_index(data, index, test_size) self.training, self.validation = training_test_split_by_unique_index(self.training, index, validation_size) # Window paramters self.input_width = input_width self.label_width = 1 self.shift = 1 self.total_window_size = self.input_width + self.shift self.input_slice = slice(0, input_width) self.input_indices = np.arange(self.total_window_size)[self.input_slice] self.label_start = self.total_window_size - self.label_width self.labels_slice = slice(self.label_start, None) self.label_indices = np.arange(self.total_window_size)[self.labels_slice] self.input_feature_slice = input_feature_slice self.label_feature_slice = label_feature_slice self.sample_ds = self.make_dataset(data[data[index] == 0]) if generate_data_now: self.training_ds = self.make_dataset(self.training, index) self.validation_ds = self.make_dataset(self.validation, index) self.testing_ds = self.make_dataset(self.testing, index) def __repr__(self): return "WindowGenerator:\n\t" +'\n\t'.join([ f'Total window size: {self.total_window_size}', f'Input indices: {self.input_indices}', f'Label indices: {self.label_indices}', ]) def split_window(self, features): inputs = features[:, self.input_slice, self.input_feature_slice] labels = tf.squeeze(features[:, self.labels_slice, self.label_feature_slice]) inputs.set_shape([None, self.input_width, None]) # labels.set_shape([None, self.label_width, None]) return inputs, labels def make_dataset(self, data, index_group: str = "sleep_id", sort_by: str = "minutes_since_begin"): ds_all = None for i_group in data[index_group].unique(): subset_data = np.array(data[data[index_group] == i_group].sort_values(by=[sort_by]), dtype=np.float32) ds = tf.keras.utils.timeseries_dataset_from_array( data=subset_data, targets=None, sequence_length=self.total_window_size, sequence_stride=1, shuffle=False, batch_size=BATCH_SIZE,) ds_all = ds if ds_all is None else ds_all.concatenate(ds) ds_all = ds_all.map(self.split_window) return ds_all # Adapted from https://www.tensorflow.org/tutorials/structured_data/time_series#linear_model def compile_and_fit(model, window: WindowGenerator, loss = tf.losses.CategoricalCrossentropy(from_logits=True), optimizer = tf.optimizers.Adam(), metrics = None, early_stop: bool = True, patience:int = 5, baseline = None, epochs: int = MAX_EPOCHS): if metrics is None: metrics = [tf.keras.metrics.CategoricalCrossentropy(from_logits=True), tf.keras.metrics.CategoricalAccuracy(), tf.keras.metrics.CategoricalHinge()] callbacks = [] if early_stop: early_stopping = tf.keras.callbacks.EarlyStopping( monitor='val_loss', patience=patience, baseline = baseline, mode='min' ) callbacks.append(early_stopping) model.compile( loss=loss, optimizer=optimizer, metrics=metrics, ) return model.fit(window.training_ds, epochs=epochs, validation_data=window.validation_ds, callbacks=callbacks) def save_history(history, file_name: str = "history.csv"): pd.DataFrame.from_dict(history).to_csv(file_name, index=False) sleep_data = pd.read_csv(SLEEP_DATA_PATH) wg = WindowGenerator(sleep_data)