Skip to content
Snippets Groups Projects
common.py 4.57 KiB
Newer Older
  • Learn to ignore specific revisions
  • import tensorflow as tf
    import numpy as np
    from tensorflow import keras
    from tensorflow.keras import layers
    import pandas as pd
    
    TEST_SIZE = 365*3//5
    VALIDATION_SIZE = 365*4//5
    
    BATCH_SIZE = 64
    INPUT_TIME_STEP = 10 # in minutes
    INPUT_FEATURES_SIZE = 7
    SLEEP_STAGES = 4
    MAX_EPOCHS = 20
    
    SLEEP_DATA_PATH = ".data/sleep_data_simple.csv"
    
    def training_test_split_by_unique_index(data, index: str, test_size: int = 10):
        test_ids = np.random.choice(data[index].unique(), size = test_size, replace=False)
        return data[~data[index].isin(test_ids)], data[data[index].isin(test_ids)]
    
    
    # Adapted from https://www.tensorflow.org/tutorials/structured_data/time_series
    class WindowGenerator():
        def __init__(self, data, index: str = "sleep_id", input_width: int = INPUT_TIME_STEP, validation_size: int = VALIDATION_SIZE, test_size: int = TEST_SIZE, input_feature_slice: slice = slice(1,100), label_feature_slice: slice = slice(-4,100), generate_data_now: bool = True):
            # Partition data
            self.training, self.testing = training_test_split_by_unique_index(data, index, test_size)
            self.training, self.validation = training_test_split_by_unique_index(self.training, index, validation_size)
    
            # Window paramters
            self.input_width = input_width
            self.label_width = 1
            self.shift = 1
    
            self.total_window_size = self.input_width + self.shift
    
            self.input_slice = slice(0, input_width)
            self.input_indices = np.arange(self.total_window_size)[self.input_slice]
    
            self.label_start = self.total_window_size - self.label_width
            self.labels_slice = slice(self.label_start, None)
            self.label_indices = np.arange(self.total_window_size)[self.labels_slice]
    
            self.input_feature_slice = input_feature_slice
            self.label_feature_slice = label_feature_slice
    
            self.sample_ds = self.make_dataset(data[data[index] == 0])
    
            if generate_data_now:
                self.training_ds = self.make_dataset(self.training, index)
                self.validation_ds = self.make_dataset(self.validation, index)
                self.testing_ds = self.make_dataset(self.testing, index)
    
    
        def __repr__(self):
            return "WindowGenerator:\n\t" +'\n\t'.join([
                f'Total window size: {self.total_window_size}',
                f'Input indices: {self.input_indices}',
                f'Label indices: {self.label_indices}',
            ])
    
        def split_window(self, features):
            inputs = features[:, self.input_slice, self.input_feature_slice]
            labels = tf.squeeze(features[:, self.labels_slice, self.label_feature_slice])
            inputs.set_shape([None, self.input_width, None])
            # labels.set_shape([None, self.label_width, None])
            return inputs, labels
    
        def make_dataset(self, data, index_group: str = "sleep_id", sort_by: str = "minutes_since_begin"):
            ds_all = None
            for i_group in data[index_group].unique():
                subset_data = np.array(data[data[index_group] == i_group].sort_values(by=[sort_by]), dtype=np.float32)
                ds = tf.keras.utils.timeseries_dataset_from_array(
                    data=subset_data,
                    targets=None,
                    sequence_length=self.total_window_size,
                    sequence_stride=1,
                    shuffle=False,
                    batch_size=BATCH_SIZE,)
                ds_all = ds if ds_all is None else ds_all.concatenate(ds)
            ds_all = ds_all.map(self.split_window)
    
            return ds_all
    
    # Adapted from https://www.tensorflow.org/tutorials/structured_data/time_series#linear_model
    def compile_and_fit(model, window: WindowGenerator, loss = tf.losses.CategoricalCrossentropy(from_logits=True), optimizer = tf.optimizers.Adam(), metrics = None, early_stop: bool = True, patience:int = 5, baseline = None, epochs: int = MAX_EPOCHS):
        if metrics is None:
            metrics = [tf.keras.metrics.CategoricalCrossentropy(from_logits=True), tf.keras.metrics.CategoricalAccuracy(), tf.keras.metrics.CategoricalHinge()]
    
        callbacks = []
        if early_stop:
            early_stopping = tf.keras.callbacks.EarlyStopping(
                monitor='val_loss',
                patience=patience,
                baseline = baseline,
                mode='min'
            )
            callbacks.append(early_stopping)
    
        model.compile(
            loss=loss,
            optimizer=optimizer,
            metrics=metrics,
        )
    
        return model.fit(window.training_ds, epochs=epochs, validation_data=window.validation_ds, callbacks=callbacks)
    
    def save_history(history, file_name: str = "history.csv"):
        pd.DataFrame.from_dict(history).to_csv(file_name, index=False)
    
    
    sleep_data = pd.read_csv(SLEEP_DATA_PATH)
    wg = WindowGenerator(sleep_data)