Something went wrong on our end
-
SurajSSingh authoredSurajSSingh authored
common.py 4.57 KiB
import tensorflow as tf
import numpy as np
from tensorflow import keras
from tensorflow.keras import layers
import pandas as pd
TEST_SIZE = 365*3//5
VALIDATION_SIZE = 365*4//5
BATCH_SIZE = 64
INPUT_TIME_STEP = 10 # in minutes
INPUT_FEATURES_SIZE = 7
SLEEP_STAGES = 4
MAX_EPOCHS = 20
SLEEP_DATA_PATH = ".data/sleep_data_simple.csv"
def training_test_split_by_unique_index(data, index: str, test_size: int = 10):
test_ids = np.random.choice(data[index].unique(), size = test_size, replace=False)
return data[~data[index].isin(test_ids)], data[data[index].isin(test_ids)]
# Adapted from https://www.tensorflow.org/tutorials/structured_data/time_series
class WindowGenerator():
def __init__(self, data, index: str = "sleep_id", input_width: int = INPUT_TIME_STEP, validation_size: int = VALIDATION_SIZE, test_size: int = TEST_SIZE, input_feature_slice: slice = slice(1,100), label_feature_slice: slice = slice(-4,100), generate_data_now: bool = True):
# Partition data
self.training, self.testing = training_test_split_by_unique_index(data, index, test_size)
self.training, self.validation = training_test_split_by_unique_index(self.training, index, validation_size)
# Window paramters
self.input_width = input_width
self.label_width = 1
self.shift = 1
self.total_window_size = self.input_width + self.shift
self.input_slice = slice(0, input_width)
self.input_indices = np.arange(self.total_window_size)[self.input_slice]
self.label_start = self.total_window_size - self.label_width
self.labels_slice = slice(self.label_start, None)
self.label_indices = np.arange(self.total_window_size)[self.labels_slice]
self.input_feature_slice = input_feature_slice
self.label_feature_slice = label_feature_slice
self.sample_ds = self.make_dataset(data[data[index] == 0])
if generate_data_now:
self.training_ds = self.make_dataset(self.training, index)
self.validation_ds = self.make_dataset(self.validation, index)
self.testing_ds = self.make_dataset(self.testing, index)
def __repr__(self):
return "WindowGenerator:\n\t" +'\n\t'.join([
f'Total window size: {self.total_window_size}',
f'Input indices: {self.input_indices}',
f'Label indices: {self.label_indices}',
])
def split_window(self, features):
inputs = features[:, self.input_slice, self.input_feature_slice]
labels = tf.squeeze(features[:, self.labels_slice, self.label_feature_slice])
inputs.set_shape([None, self.input_width, None])
# labels.set_shape([None, self.label_width, None])
return inputs, labels
def make_dataset(self, data, index_group: str = "sleep_id", sort_by: str = "minutes_since_begin"):
ds_all = None
for i_group in data[index_group].unique():
subset_data = np.array(data[data[index_group] == i_group].sort_values(by=[sort_by]), dtype=np.float32)
ds = tf.keras.utils.timeseries_dataset_from_array(
data=subset_data,
targets=None,
sequence_length=self.total_window_size,
sequence_stride=1,
shuffle=False,
batch_size=BATCH_SIZE,)
ds_all = ds if ds_all is None else ds_all.concatenate(ds)
ds_all = ds_all.map(self.split_window)
return ds_all
# Adapted from https://www.tensorflow.org/tutorials/structured_data/time_series#linear_model
def compile_and_fit(model, window: WindowGenerator, loss = tf.losses.CategoricalCrossentropy(from_logits=True), optimizer = tf.optimizers.Adam(), metrics = None, early_stop: bool = True, patience:int = 5, baseline = None, epochs: int = MAX_EPOCHS):
if metrics is None:
metrics = [tf.keras.metrics.CategoricalCrossentropy(from_logits=True), tf.keras.metrics.CategoricalAccuracy(), tf.keras.metrics.CategoricalHinge()]
callbacks = []
if early_stop:
early_stopping = tf.keras.callbacks.EarlyStopping(
monitor='val_loss',
patience=patience,
baseline = baseline,
mode='min'
)
callbacks.append(early_stopping)
model.compile(
loss=loss,
optimizer=optimizer,
metrics=metrics,
)
return model.fit(window.training_ds, epochs=epochs, validation_data=window.validation_ds, callbacks=callbacks)
def save_history(history, file_name: str = "history.csv"):
pd.DataFrame.from_dict(history).to_csv(file_name, index=False)
sleep_data = pd.read_csv(SLEEP_DATA_PATH)
wg = WindowGenerator(sleep_data)