Skip to content
Snippets Groups Projects
common.py 4.57 KiB
import tensorflow as tf
import numpy as np
from tensorflow import keras
from tensorflow.keras import layers
import pandas as pd

TEST_SIZE = 365*3//5
VALIDATION_SIZE = 365*4//5

BATCH_SIZE = 64
INPUT_TIME_STEP = 10 # in minutes
INPUT_FEATURES_SIZE = 7
SLEEP_STAGES = 4
MAX_EPOCHS = 20

SLEEP_DATA_PATH = ".data/sleep_data_simple.csv"

def training_test_split_by_unique_index(data, index: str, test_size: int = 10):
    test_ids = np.random.choice(data[index].unique(), size = test_size, replace=False)
    return data[~data[index].isin(test_ids)], data[data[index].isin(test_ids)]


# Adapted from https://www.tensorflow.org/tutorials/structured_data/time_series
class WindowGenerator():
    def __init__(self, data, index: str = "sleep_id", input_width: int = INPUT_TIME_STEP, validation_size: int = VALIDATION_SIZE, test_size: int = TEST_SIZE, input_feature_slice: slice = slice(1,100), label_feature_slice: slice = slice(-4,100), generate_data_now: bool = True):
        # Partition data
        self.training, self.testing = training_test_split_by_unique_index(data, index, test_size)
        self.training, self.validation = training_test_split_by_unique_index(self.training, index, validation_size)

        # Window paramters
        self.input_width = input_width
        self.label_width = 1
        self.shift = 1

        self.total_window_size = self.input_width + self.shift

        self.input_slice = slice(0, input_width)
        self.input_indices = np.arange(self.total_window_size)[self.input_slice]

        self.label_start = self.total_window_size - self.label_width
        self.labels_slice = slice(self.label_start, None)
        self.label_indices = np.arange(self.total_window_size)[self.labels_slice]

        self.input_feature_slice = input_feature_slice
        self.label_feature_slice = label_feature_slice

        self.sample_ds = self.make_dataset(data[data[index] == 0])

        if generate_data_now:
            self.training_ds = self.make_dataset(self.training, index)
            self.validation_ds = self.make_dataset(self.validation, index)
            self.testing_ds = self.make_dataset(self.testing, index)


    def __repr__(self):
        return "WindowGenerator:\n\t" +'\n\t'.join([
            f'Total window size: {self.total_window_size}',
            f'Input indices: {self.input_indices}',
            f'Label indices: {self.label_indices}',
        ])

    def split_window(self, features):
        inputs = features[:, self.input_slice, self.input_feature_slice]
        labels = tf.squeeze(features[:, self.labels_slice, self.label_feature_slice])
        inputs.set_shape([None, self.input_width, None])
        # labels.set_shape([None, self.label_width, None])
        return inputs, labels

    def make_dataset(self, data, index_group: str = "sleep_id", sort_by: str = "minutes_since_begin"):
        ds_all = None
        for i_group in data[index_group].unique():
            subset_data = np.array(data[data[index_group] == i_group].sort_values(by=[sort_by]), dtype=np.float32)
            ds = tf.keras.utils.timeseries_dataset_from_array(
                data=subset_data,
                targets=None,
                sequence_length=self.total_window_size,
                sequence_stride=1,
                shuffle=False,
                batch_size=BATCH_SIZE,)
            ds_all = ds if ds_all is None else ds_all.concatenate(ds)
        ds_all = ds_all.map(self.split_window)

        return ds_all

# Adapted from https://www.tensorflow.org/tutorials/structured_data/time_series#linear_model
def compile_and_fit(model, window: WindowGenerator, loss = tf.losses.CategoricalCrossentropy(from_logits=True), optimizer = tf.optimizers.Adam(), metrics = None, early_stop: bool = True, patience:int = 5, baseline = None, epochs: int = MAX_EPOCHS):
    if metrics is None:
        metrics = [tf.keras.metrics.CategoricalCrossentropy(from_logits=True), tf.keras.metrics.CategoricalAccuracy(), tf.keras.metrics.CategoricalHinge()]

    callbacks = []
    if early_stop:
        early_stopping = tf.keras.callbacks.EarlyStopping(
            monitor='val_loss',
            patience=patience,
            baseline = baseline,
            mode='min'
        )
        callbacks.append(early_stopping)

    model.compile(
        loss=loss,
        optimizer=optimizer,
        metrics=metrics,
    )

    return model.fit(window.training_ds, epochs=epochs, validation_data=window.validation_ds, callbacks=callbacks)

def save_history(history, file_name: str = "history.csv"):
    pd.DataFrame.from_dict(history).to_csv(file_name, index=False)


sleep_data = pd.read_csv(SLEEP_DATA_PATH)
wg = WindowGenerator(sleep_data)