"""
Panel train class
"""
import tensorflow as tf
import numpy as np
from tensorflow.keras.preprocessing import image
from tensorflow.keras.layers import Dropout, Dense
from tensorflow.keras.layers import Conv2D, Conv2DTranspose
from tensorflow.keras.layers import concatenate
import matplotlib.pyplot as plt
from tensorflow.keras import backend as K
import glob
import cv2
from os import path
import pandas as pd
from detecto import core, utils
from torchvision import transforms
from random import choices
import torch
panel_seg_model_path = path.join(path.dirname(
__file__), 'VGG16Net_ConvTranpose_complete.h5')
panel_classification_model_path = path.join(
path.dirname(__file__), 'VGG16_classification_model.h5')
mounting_classification_model_path = path.join(path.dirname(__file__),
'object_detection_model.pth')
[docs]class TrainPanelSegmentationModel():
'''
A class for training a deep learning architecture to perform image
segmentation on satellite images to detect solar arrays in the image.
'''
[docs] def __init__(self, batch_size, no_epochs, learning_rate):
self.no_of_epochs = no_epochs
self.batch_size = batch_size
self.learning_rate = learning_rate
# Base VGG16 network
self.model = tf.keras.applications.VGG16(
include_top=False, weights='imagenet', input_shape=(640, 640, 3),
pooling='max')
self.layer_dict = dict([(layer.name, layer)
for layer in self.model.layers])
[docs] def loadImagesToNumpyArray(self, image_file_path):
"""
Load in a set of images from a folder into a 4D numpy array,
with dimensions (number images, 640, 640, 3).
Parameters
-----------
image_file_path: string
Path to folder where we want to process png images.
Returns
-----------
nparray
4D numpy array with dimensions
(number images in folder, 640, 640, 3).
"""
# Get a list of the images in the folder
image_file_list = []
files = glob.glob(image_file_path + "/*")
for img_file in files:
image = cv2.imread(img_file)
image_file_list.append(image)
# Convert the image_file_list to a 4d numpy array and return it
img_np_array = np.array(image_file_list)
return img_np_array
[docs] def diceCoeff(self, y_true, y_pred, smooth=1):
"""
Accuracy metric is overly optimistic. IOU, dice coefficient are more
suitable for semantic segmentation tasks. This function is used as the
metric of similarity between the predicted mask and ground truth.
Parameters
-----------
y_true: nparray float
the true mask of the image
y_pred: nparray float
the predicted mask of the data
smooth: int
a parameter to ensure we are not dividing by zero and also a
smoothing parameter. For back propagation. If the prediction
is hard threshold to 0 and 1, it is difficult to back
propagate the dice loss gradient. We add this parameter to
actually smooth out the loss function, making it differentiable.
Returns
-----------
dice: float
The metric of similarity between prediction and ground truth
"""
intersection = K.sum(y_true * y_pred, axis=[1, 2, 3])
union = K.sum(y_true, axis=[1, 2, 3]) + K.sum(y_pred, axis=[1, 2, 3])
dice = K.mean((2. * intersection + smooth)/(union + smooth), axis=0)
return dice
[docs] def diceCoeffLoss(self, y_true, y_pred):
"""
This function is a loss function that can be used when training
the segmentation model. This loss function can be used in place
of binary crossentropy, which is the current loss function in
the training stage.
Parameters
-----------
y_true: nparray float
The true mask of the image
y_pred: nparray float
The predicted mask of the data
Returns
-----------
float
The loss metric between prediction and ground truth
"""
return 1-self.diceCoeff(y_true, y_pred)
[docs] def trainSegmentation(self, train_data, train_mask, val_data, val_mask,
model_file_path=panel_seg_model_path):
"""
This function uses VGG16 as the base network and as a transfer learning
framework to train a model that segments solar panels from a satellite
image. It uses the training data and mask to learn how to predict
the mask of a solar array from a satellite image. It uses the
validation data to prevent overfitting and to test the prediction on
the fly. The validation data is also use to validate when to save the
best model during training.
Parameters
-----------
train_data: nparray float
This should be the training images.
train_mask: nparray int/float
This should be the training images mask - ground truth
val_data : nparray float
This should be the validation images
val_mask : nparray float
This should be the validation images mask - ground truth
Notes
-----
Hence the dimension of the four variables must be [a,b,c,d]
where [a] is the number of input images,
[b,c] are the dimensions of the image - 640 x 640 in this
case and [d] is 3 - RGB
Returns
-----------
results: tf.keras.fit_generator History object
This varaiale contains training history and statistics
custom_model: tf.keras model object
The final trianed model. Note that this may not be the
best model as the best model is saved during training
"""
train_mask = train_mask/np.max(train_mask)
val_mask = val_mask/np.max(val_mask)
train_datagen = image.ImageDataGenerator(
rescale=1./255,
dtype='float32')
val_datagen = image.ImageDataGenerator(
rescale=1./255,
dtype='float32')
train_image_generator = train_datagen.flow(
train_data, train_mask,
batch_size=self.batch_size)
val_image_generator = val_datagen.flow(
val_data, val_mask,
batch_size=self.batch_size)
x = self.layer_dict['block5_conv3'].output
u5 = Conv2DTranspose(512, (2, 2), strides=(2, 2), padding='same')(x)
u5 = concatenate([u5, self.layer_dict['block4_conv3'].output])
c5 = Conv2D(512, (3, 3), activation='elu',
kernel_initializer='he_normal', padding='same')(u5)
c5 = Dropout(0.2)(c5)
c5 = Conv2D(512, (3, 3), activation='elu',
kernel_initializer='he_normal', padding='same')(c5)
u6 = Conv2DTranspose(256, (2, 2), strides=(2, 2), padding='same')(c5)
u6 = concatenate([u6, self.layer_dict['block3_conv2'].output])
c6 = Conv2D(256, (3, 3), activation='elu',
kernel_initializer='he_normal', padding='same')(u6)
c6 = Dropout(0.2)(c6)
c6 = Conv2D(256, (3, 3), activation='elu',
kernel_initializer='he_normal', padding='same')(c6)
u7 = Conv2DTranspose(128, (2, 2), strides=(2, 2), padding='same')(c6)
u7 = concatenate([u7, self.layer_dict['block2_conv2'].output])
c7 = Conv2D(128, (3, 3), activation='elu',
kernel_initializer='he_normal', padding='same')(u7)
c7 = Dropout(0.2)(c7)
c7 = Conv2D(128, (3, 3), activation='elu',
kernel_initializer='he_normal', padding='same')(c7)
u8 = Conv2DTranspose(64, (2, 2), strides=(2, 2), padding='same')(c7)
u8 = concatenate([u8, self.layer_dict['block1_conv2'].output], axis=3)
c8 = Conv2D(32, (3, 3), activation='elu',
kernel_initializer='he_normal', padding='same')(u8)
c8 = Dropout(0.1)(c8)
c8 = Conv2D(32, (3, 3), activation='elu',
kernel_initializer='he_normal', padding='same')(c8)
outputs = Conv2D(1, (1, 1), activation='sigmoid')(c8)
custom_model = tf.keras.Model(inputs=self.model.input, outputs=outputs)
# We fix the weights of the VGG16 architecture, You can choose to make
# those layers trainable too but it will take a long time
for layer in custom_model.layers[:18]:
layer.trainable = False
custom_model.compile(loss='binary_crossentropy',
optimizer=tf.keras.optimizers.Adam(
lr=self.learning_rate, epsilon=1e-08),
metrics=['accuracy', self.diceCoeff]
)
no_of_training_images = np.shape(train_data)[0]
no_of_val_images = np.shape(val_data)[0]
checkpoint = tf.keras.callbacks.ModelCheckpoint(model_file_path,
monitor='val_loss',
verbose=1,
save_best_only=True,
mode='max')
# Training the network
results = custom_model.fit(train_image_generator,
epochs=self.no_of_epochs,
workers=0,
steps_per_epoch=(
no_of_training_images//self.batch_size),
validation_data=val_image_generator,
validation_steps=(
no_of_val_images//self.batch_size),
callbacks=[checkpoint]
)
return custom_model, results
[docs] def trainPanelClassifier(self, train_path, val_path,
model_file_path=panel_classification_model_path):
"""
This function uses VGG16 as the base network and as a transfer learning
framework to train a model that predicts the presence of solar panels
in a satellite image. It uses the training data to learn how to predict
the presence of a solar array in a satellite image. It uses the
validation data to prevent overfitting and to test the prediction on
the fly. The validation data is also use to validate when to save the
best model during training.
Parameters
-----------
train_path: string
This is the path to the folder that contains the training images
Note that the directory must be structured in this format:
train_path/
...has panel/
......a_image_1.jpg
......a_image_2.jpg
...no panels/
......b_image_1.jpg
......b_image_2.jpg
val_path: string
This is the path to the folder that contains the validation images
Note that the directory must be structured in this format:
val_path/
...has panel/
......a_image_1.jpg
......a_image_2.jpg
...no panels/
......b_image_1.jpg
......b_image_2.jpg
Returns
-----------
results: tf.keras.fit_generator History object
This varaiale contains training history and statistics
final_clas_model: tf.keras model object
The final trianed model. Note that this may not be the
best model as the best model is saved during training
"""
class_x = self.layer_dict['global_max_pooling2d'].output
out1 = Dense(units=512, activation="relu")(class_x)
out1 = Dropout(0.2)(out1)
out2 = Dense(units=512, activation="relu")(out1)
out2 = Dropout(0.2)(out2)
out_fin = Dense(units=2, activation="softmax")(out2)
final_class_model = tf.keras.Model(
inputs=self.model.input, outputs=out_fin)
for layer in final_class_model.layers[:18]:
layer.trainable = True
final_class_model.summary()
tr_gen = image.ImageDataGenerator(rescale=1./255,
dtype='float32')
train_data = tr_gen.flow_from_directory(directory=train_path,
target_size=(640, 640),
batch_size=self.batch_size)
val_data = tr_gen.flow_from_directory(directory=val_path,
target_size=(640, 640),
batch_size=self.batch_size)
# Get the number of images in the training and validation sets
no_of_training_images = len(train_data.labels)
no_of_val_images = len(val_data.labels)
final_class_model.compile(loss='categorical_crossentropy',
optimizer=tf.keras.optimizers.Adam(
lr=1e-4, epsilon=1e-08),
metrics=['accuracy']
)
checkpoint = tf.keras.callbacks.ModelCheckpoint(model_file_path,
monitor='val_accuracy',
verbose=1,
save_best_only=True,
mode='max',
save_freq='epoch')
results =\
final_class_model.fit(x=train_data,
workers=0,
epochs=self.no_of_epochs,
steps_per_epoch=(no_of_training_images //
self.batch_size),
validation_data=val_data,
validation_steps=(no_of_val_images //
self.batch_size),
callbacks=[checkpoint])
return final_class_model, results
[docs] def trainMountingConfigClassifier(self, train_path, val_path,
device=torch.device('cuda')):
"""
This function uses Faster R-CNN ResNet50 FPN as the base network
and as a transfer learning framework to train a model that performs
object detection on the mounting configuration of solar arrays. It
uses the training data to locate and classify mounting configuration
of the solar installation. It uses the validation data to prevent
overfitting and to test the prediction on the fly.
Parameters
-----------
train_path: string
This is the path to the folder that contains the training images
Note that the directory must be structured in this format:
train_path/
...images/
......a_image_1.png
......a_image_2.png
...annotations/
......b_image_1.xml
......b_image_2.xml
val_path: string
This is the path to the folder that contains the validation images
Note that the directory must be structured in this format:
val_path/
...images/
......a_image_1.png
......a_image_2.png
...annotations/
......b_image_1.xml
......b_image_2.xml
device: string
This argument is passed to the Model() class in Detecto.
It determines how to run the model: either on GPU via Cuda
(default setting), or on CPU. Please note that running the
model on GPU results in significantly faster training times.
Returns
-----------
model: detecto.core.Model object
The final trained mounting configuration object detection
model.
"""
# Convert the data set combinations (png + xml) to a CSV record.
val_labels_path = (val_path + '/annotations.csv')
train_labels_path = (train_path + '/annotations.csv')
utils.xml_to_csv(train_path + '/annotations/',
train_labels_path)
utils.xml_to_csv(val_path + '/annotations/',
val_labels_path)
# Custom oversampling to balance out our classes
train_data = pd.read_csv(train_labels_path)
class_count = pd.Series(train_data['class'].value_counts())
train_data_resampled = train_data.copy()
for index, count in class_count.iteritems():
number_times_resample = class_count.max() - count
# Randomly sample a class X times
class_index_list = list(
train_data[train_data['class'] == index].index)
# Resample the list with with replacement
idx_to_duplicate = choices(class_index_list,
k=number_times_resample)
for idx in idx_to_duplicate:
dup = train_data.loc[idx]
# Add to the dataframe
train_data_resampled = \
train_data_resampled.append(dup, ignore_index=True)
# Reindex after all of the duplicates have been added
train_data_resampled = train_data_resampled.reset_index(drop=True)
# Re-write the resampled data set
train_data_resampled.to_csv(train_labels_path, index=False)
custom_transforms = transforms.Compose([
transforms.ToPILImage(),
transforms.Resize(800),
transforms.ToTensor(),
utils.normalize_transform()
])
# Load in the training and validation data sets
dataset = core.Dataset(train_labels_path,
train_path + '/images',
transform=custom_transforms)
val_dataset = core.Dataset(val_labels_path,
val_path + '/images')
# Customize training options
loader = core.DataLoader(dataset,
batch_size=self.batch_size,
shuffle=True)
model = core.Model(["ground-fixed",
"carport-fixed",
"rooftop-fixed",
"ground-single_axis_tracker"],
device=device)
losses = model.fit(loader, val_dataset,
epochs=self.no_of_epochs,
learning_rate=self.learning_rate,
verbose=True)
plt.plot(losses)
plt.show()
return model
[docs] def trainingStatistics(self, results, mode):
"""
This function prints the training statistics such as training
loss and accuracy and validation loss and accuarcy. The dice
coefficient was only used for segmentation and not panel
classification. We use mode to decide if we should print out
dice coefficient.
Parameters
-----------
results: tf.keras.fit_generator History object
This is the output of the trained classifier. It contains
training history.
mode: int
If mode = 1, it assumes we want plots for the semantic
segmentation and also plots the dice coefficient results.
For any other value of mode, it does not show plots of dice
coefficients.
Returns
-----------
figures
Figures based on the model training statistics
"""
train_accuracy = results.history['accuracy']
train_loss = results.history['loss']
if mode == 1:
train_dice_coef = results.history['diceCoeff']
validation_metrics = True
try:
val_accuracy = results.history['val_accuracy']
val_loss = results.history['val_loss']
if mode == 1:
val_dice_coef = results.history['val_diceCoeff']
except Exception as e:
print("No validation metrics available.")
print(e)
validation_metrics = False
if mode == 1:
plt.plot(train_dice_coef)
plt.xlabel('Epochs')
plt.ylabel('Percentage')
plt.savefig('Train_dice_coef', dpi=300)
plt.show()
plt.plot(train_loss)
plt.xlabel('Epochs')
plt.ylabel('Percentage')
plt.savefig('Train_loss', dpi=300)
plt.show()
plt.plot(train_accuracy)
plt.xlabel('Epochs')
plt.ylabel('Percentage')
plt.savefig('Train_accuracy', dpi=300)
plt.show()
if validation_metrics is True:
if mode == 1:
plt.plot(val_dice_coef)
plt.xlabel('Epochs')
plt.ylabel('Percentage')
plt.savefig('VAL_dice_coef', dpi=300)
plt.show()
plt.plot(val_loss)
plt.xlabel('Epochs')
plt.ylabel('Percentage')
plt.savefig('VAL_loss', dpi=300)
plt.show()
plt.plot(val_accuracy)
plt.xlabel('Epochs')
plt.ylabel('Percentage')
plt.savefig('VAL_accuracy', dpi=300)
plt.show()
plt.plot(train_accuracy, label='train_accuracy')
plt.plot(train_loss, label='train_loss')
if mode == 1:
plt.plot(train_dice_coef, label='train_dice_coef')
plt.xlabel('Epochs')
plt.ylabel('Percentage')
plt.legend()
plt.savefig('Training statistics', dpi=300)
plt.show()
if validation_metrics is True:
plt.plot(val_accuracy, label='val_accuracy')
plt.plot(val_loss, label='val_loss')
if mode == 1:
plt.plot(val_dice_coef, label='val_dice_coef')
plt.xlabel('Epochs')
plt.ylabel('Percentage')
plt.legend()
plt.savefig('Validation statistics', dpi=300)
plt.show()
return