I am using parallel processing using the concurrent.futures.ThreadPoolExecutor
class to make multiple predictions using a Keras model for different sets of weights.
But the Keras model predictions using parallel processing are not correct.
This is a reproducible sample code that creates 10 sets of weights. Then, it calculates the model's errors using and without parallel processing.
I set a random seed to NumPy
to make sure that there is no randomness across the different runs.
import tensorflow.keras
import numpy
import concurrent.futures
numpy.random.seed(1)
def create_rand_weights(model, num_models):
random_model_weights = []
for model_idx in range(num_models):
random_weights = []
for layer_idx in range(len(model.weights)):
layer_shape = model.weights[layer_idx].shape
if len(layer_shape) > 1:
layer_weights = numpy.random.rand(layer_shape[0], layer_shape[1])
else:
layer_weights = numpy.random.rand(layer_shape[0])
random_weights.append(layer_weights)
random_weights = numpy.array(random_weights, dtype=object)
random_model_weights.append(random_weights)
random_model_weights = numpy.array(random_model_weights)
return random_model_weights
def model_error(model_weights):
global data_inputs, data_outputs, model
model.set_weights(model_weights)
predictions = model.predict(data_inputs)
mae = tensorflow.keras.losses.MeanAbsoluteError()
abs_error = mae(data_outputs, predictions).numpy() + 0.00000001
return abs_error
input_layer = tensorflow.keras.layers.Input(3)
dense_layer1 = tensorflow.keras.layers.Dense(5, activation="relu")(input_layer)
output_layer = tensorflow.keras.layers.Dense(1, activation="linear")(dense_layer1)
model = tensorflow.keras.Model(inputs=input_layer, outputs=output_layer)
data_inputs = numpy.array([[0.02, 0.1, 0.15],
[0.7, 0.6, 0.8],
[1.5, 1.2, 1.7],
[3.2, 2.9, 3.1]])
data_outputs = numpy.array([[0.1],
[0.6],
[1.3],
[2.5]])
num_models = 10
random_model_weights = create_rand_weights(model, num_models)
ExecutorClass = concurrent.futures.ThreadPoolExecutor
thread_output = []
with ExecutorClass(max_workers=2) as executor:
output = executor.map(model_error, random_model_weights)
for out in output:
thread_output.append(out)
thread_output=numpy.array(thread_output)
print("Wrong Outputs using Threads")
print(thread_output)
print("\n\n")
correct_output = []
for idx in range(num_models):
error = model_error(random_model_weights[idx])
correct_output.append(error)
correct_output=numpy.array(correct_output)
print("Correct Outputs without Threads")
print(correct_output)
This is the correct model outputs without using parallel processing:
[6.78012372 3.42922212 4.96738673 6.64474774 6.83102609 4.41165734 3.34482099 7.6132908 7.97145654 6.98378612]
This is the wrong model outputs without using parallel processing:
[3.42922212 3.42922212 6.90911246 6.64474774 4.41165734 3.34482099 7.6132908 7.97145654 6.98378612 6.98378612]
Even that I set a random seed for NumPy, the outputs using parallel processing still vary for different runs.