-
Akash Kothari authoredAkash Kothari authored
resnet50_imagenet.py 7.93 KiB
import os
import glob
import random
import scipy
import scipy.io
import cv2
import numpy as np
import tensorflow as tf
import keras
from keras.models import Sequential, Model
from keras.layers import *
from keras.utils import to_categorical
from keras.applications.resnet50 import ResNet50, preprocess_input
from keras import backend as K
from frontend.approxhpvm_translator import translate_to_approxhpvm
from frontend.weight_utils import dumpCalibrationData2
np.random.seed(2020)
os.environ["CUDA_VISIBLE_DEVICES"] = "1"
K.set_image_data_format('channels_first')
data_format = 'channels_first'
IMAGENET_DIR = '/home/nz11/ILSVRC2012/'
OUTPUT_DIR = 'data/resnet50_imagenet_tune_regenerate/'
WEIGHTS_PATH = 'data/resnet50_imagenet/weights.h5'
NUM_CLASSES = 200
IMAGES_PER_CLASS = 40
# VAL_SIZE = 100
def identity_block(input_tensor, kernel_size, filters, stage, block):
filters1, filters2, filters3 = filters
bn_axis = 1
x = Conv2D(filters1, (1, 1))(input_tensor)
x = BatchNormalization(axis=bn_axis)(x)
x = Activation('relu')(x)
x = Conv2D(filters2, kernel_size,
padding='same')(x)
x = BatchNormalization(axis=bn_axis)(x)
x = Activation('relu')(x)
x = Conv2D(filters3, (1, 1))(x)
x = BatchNormalization(axis=bn_axis)(x)
x = add([x, input_tensor])
x = Activation('relu')(x)
return x
def conv_block(input_tensor,
kernel_size,
filters,
stage,
block,
strides=(2, 2)):
filters1, filters2, filters3 = filters
bn_axis = 1
x = Conv2D(filters1, (1, 1), strides=strides)(input_tensor)
x = BatchNormalization(axis=bn_axis)(x)
x = Activation('relu')(x)
x = Conv2D(filters2, kernel_size, padding='same')(x)
x = BatchNormalization(axis=bn_axis)(x)
x = Activation('relu')(x)
x = Conv2D(filters3, (1, 1))(x)
x = BatchNormalization(axis=bn_axis)(x)
shortcut = Conv2D(filters3, (1, 1), strides=strides)(input_tensor)
shortcut = BatchNormalization(
axis=bn_axis)(shortcut)
x = add([x, shortcut])
x = Activation('relu')(x)
return x
def get_resnet50_nchw_keras():
img_input = Input(shape=(3, 224, 224))
bn_axis = 1
x = ZeroPadding2D((3, 3))(img_input)
x = Conv2D(64, (7, 7), strides=(2, 2))(x)
# x = BatchNormalization(axis=bn_axis)(x)
x = Activation('relu')(x)
x = MaxPooling2D((3, 3), strides=(2, 2))(x)
x = BatchNormalization(axis=bn_axis)(x)
x = conv_block(x, 3, [64, 64, 256], stage=2, block='a', strides=(1, 1))
x = identity_block(x, 3, [64, 64, 256], stage=2, block='b')
x = identity_block(x, 3, [64, 64, 256], stage=2, block='c')
x = conv_block(x, 3, [128, 128, 512], stage=3, block='a')
x = identity_block(x, 3, [128, 128, 512], stage=3, block='b')
x = identity_block(x, 3, [128, 128, 512], stage=3, block='c')
x = identity_block(x, 3, [128, 128, 512], stage=3, block='d')
x = conv_block(x, 3, [256, 256, 1024], stage=4, block='a')
x = identity_block(x, 3, [256, 256, 1024], stage=4, block='b')
x = identity_block(x, 3, [256, 256, 1024], stage=4, block='c')
x = identity_block(x, 3, [256, 256, 1024], stage=4, block='d')
x = identity_block(x, 3, [256, 256, 1024], stage=4, block='e')
x = identity_block(x, 3, [256, 256, 1024], stage=4, block='f')
x = conv_block(x, 3, [512, 512, 2048], stage=5, block='a')
x = identity_block(x, 3, [512, 512, 2048], stage=5, block='b')
x = identity_block(x, 3, [512, 512, 2048], stage=5, block='c')
x = AveragePooling2D((7, 7))(x)
x = Flatten()(x)
x = Dense(1000)(x)
x = Activation('softmax')(x)
model = Model(img_input, x)
original_model = ResNet50()
for i in range(len(original_model.layers)):
try:
model.layers[i].set_weights(original_model.layers[i].get_weights())
# model.layers[i].trainable = False
except:
print (i, 'skipped')
model.layers[5].set_weights(original_model.layers[3].get_weights())
return model
def load_image(x):
image = cv2.imread(x)
height, width, _ = image.shape
new_height = height * 256 // min(image.shape[:2])
new_width = width * 256 // min(image.shape[:2])
image = cv2.resize(image, (new_width, new_height), interpolation=cv2.INTER_CUBIC)
height, width, _ = image.shape
startx = width // 2 - (224 // 2)
starty = height // 2 - (224 // 2)
image = image[starty:starty + 224, startx:startx + 224]
image = image[:, :, ::-1]
image = np.transpose(image, (2, 0, 1))
image = preprocess_input(image.astype(np.float32), data_format=data_format)
return image.astype(np.float32)
meta = scipy.io.loadmat(IMAGENET_DIR + 'ILSVRC2012_devkit_t12/data/meta.mat')
original_idx_to_synset = {}
synset_to_name = {}
for i in range(1000):
ilsvrc2012_id = int(meta['synsets'][i,0][0][0][0])
synset = meta['synsets'][i,0][1][0]
name = meta['synsets'][i,0][2][0]
original_idx_to_synset[ilsvrc2012_id] = synset
synset_to_name[synset] = name
synset_to_keras_idx = {}
keras_idx_to_name = {}
f = open(IMAGENET_DIR + 'ILSVRC2012_devkit_t12/data/synset_words.txt', 'r')
c = 0
for line in f:
parts = line.split(' ')
synset_to_keras_idx[parts[0]] = c
keras_idx_to_name[c] = ' '.join(parts[1:])
c += 1
f.close()
model = get_resnet50_nchw_keras()
X_tune, X_test = [], []
y_tune, y_true = [], []
classes = glob.glob(IMAGENET_DIR + 'val/*')
for c in np.random.permutation(len(classes))[:NUM_CLASSES]:
x = glob.glob(classes[c] + '/*')
x = np.array(x)
idx = np.random.permutation(len(x))
idx = idx[:max(len(idx), IMAGES_PER_CLASS)]
synset = classes[c].split('/')[-1]
images = list(map(lambda x : load_image(x), x[idx]))
labels = [synset_to_keras_idx[synset]] * len(x[idx])
X_test += images[:IMAGES_PER_CLASS // 2]
y_true += labels[:IMAGES_PER_CLASS // 2]
X_tune += images[IMAGES_PER_CLASS // 2:]
y_tune += labels[IMAGES_PER_CLASS // 2:]
X_test = np.array(X_test)
y_true = np.array(y_true)
X_tune = np.array(X_tune)
y_tune = np.array(y_tune)
print ('tune size', len(X_tune))
print ('test size', len(X_test))
def train_helper(x):
try:
x = x.decode('utf-8')
except:
pass
image = load_image(x)
y = np.zeros(1000, dtype=np.uint8)
y[synset_to_keras_idx[x.split('/')[-2]]]= 1
return image, y
train_images = glob.glob(IMAGENET_DIR + 'train/*/*')
random.shuffle(train_images)
dataset = tf.data.Dataset().from_tensor_slices(train_images)
dataset = dataset.map(
lambda x : tf.py_func(train_helper, [x], [tf.float32, tf.uint8]),
num_parallel_calls=16
)
dataset = dataset.shuffle(buffer_size=1000)
dataset = dataset.batch(64)
dataset = dataset.repeat()
next_element = dataset.make_one_shot_iterator().get_next()
sess = tf.Session()
def generate():
while True:
yield sess.run(next_element)
model.compile(optimizer=keras.optimizers.Adam(lr=0.00001), loss='categorical_crossentropy', metrics=['acc'])
if os.path.exists(WEIGHTS_PATH):
model.load_weights(WEIGHTS_PATH)
else:
pass
# model.fit_generator(generate(), steps_per_epoch=1000, validation_data=(X_test, to_categorical(y_true, num_classes=1000)), epochs=6)
# model.save_weights(OUTPUT_DIR + 'weights.h5')
translate_to_approxhpvm(model, OUTPUT_DIR, X_tune, y_tune, 1000)
# # dumpCalibrationData2(OUTPUT_DIR + 'test_input_10K.bin', X_test, OUTPUT_DIR + 'test_labels_10K.bin', y_true)
# dumpCalibrationData2(OUTPUT_DIR + 'tune_input.bin', X_tune, OUTPUT_DIR + 'tune_labels.bin', y_tune)
# dumpCalibrationData2(OUTPUT_DIR + 'test_input.bin', X_test, OUTPUT_DIR + 'test_labels.bin', y_true)
pred = np.argmax(model.predict(X_test), axis=1)
print ('val accuracy', np.sum(pred == y_true.ravel()) / len(X_test))
# pred = np.argmax(model.predict(X_tune), axis=1)
# print ('val accuracy', np.sum(pred == y_tune.ravel()) / len(X_tune))