Python Parallel Processing with ThreadPoolExecutor Gives Wrong Results with Keras Model
Asked Answered
A

1

0

I am using parallel processing using the concurrent.futures.ThreadPoolExecutor class to make multiple predictions using a Keras model for different sets of weights.

But the Keras model predictions using parallel processing are not correct.

This is a reproducible sample code that creates 10 sets of weights. Then, it calculates the model's errors using and without parallel processing.

I set a random seed to NumPy to make sure that there is no randomness across the different runs.

import tensorflow.keras
import numpy
import concurrent.futures

numpy.random.seed(1)

def create_rand_weights(model, num_models):
    random_model_weights = []
    for model_idx in range(num_models):
        random_weights = []
        for layer_idx in range(len(model.weights)):
            layer_shape = model.weights[layer_idx].shape
            if len(layer_shape) > 1:
                layer_weights = numpy.random.rand(layer_shape[0], layer_shape[1])
            else:
                layer_weights = numpy.random.rand(layer_shape[0])
            random_weights.append(layer_weights)
        random_weights = numpy.array(random_weights, dtype=object)
        random_model_weights.append(random_weights)
    
    random_model_weights = numpy.array(random_model_weights)
    return random_model_weights

def model_error(model_weights):
    global data_inputs, data_outputs, model
    model.set_weights(model_weights)
    predictions = model.predict(data_inputs)
    mae = tensorflow.keras.losses.MeanAbsoluteError()
    abs_error = mae(data_outputs, predictions).numpy() + 0.00000001
    return abs_error

input_layer  = tensorflow.keras.layers.Input(3)
dense_layer1 = tensorflow.keras.layers.Dense(5, activation="relu")(input_layer)
output_layer = tensorflow.keras.layers.Dense(1, activation="linear")(dense_layer1)
model = tensorflow.keras.Model(inputs=input_layer, outputs=output_layer)

data_inputs = numpy.array([[0.02, 0.1, 0.15],
                           [0.7, 0.6, 0.8],
                           [1.5, 1.2, 1.7],
                           [3.2, 2.9, 3.1]])    
data_outputs = numpy.array([[0.1],
                            [0.6],
                            [1.3],
                            [2.5]])

num_models = 10
random_model_weights = create_rand_weights(model, num_models)

ExecutorClass = concurrent.futures.ThreadPoolExecutor
thread_output = []
with ExecutorClass(max_workers=2) as executor:
    output = executor.map(model_error, random_model_weights)
for out in output:
    thread_output.append(out)
thread_output=numpy.array(thread_output)
print("Wrong Outputs using Threads")
print(thread_output)

print("\n\n")

correct_output = []
for idx in range(num_models):
    error = model_error(random_model_weights[idx])
    correct_output.append(error)
correct_output=numpy.array(correct_output)
print("Correct Outputs without Threads")
print(correct_output)

This is the correct model outputs without using parallel processing:

[6.78012372 3.42922212 4.96738673 6.64474774 6.83102609 4.41165734 3.34482099 7.6132908  7.97145654 6.98378612]

This is the wrong model outputs without using parallel processing:

[3.42922212 3.42922212 6.90911246 6.64474774 4.41165734 3.34482099 7.6132908  7.97145654 6.98378612 6.98378612]

Even that I set a random seed for NumPy, the outputs using parallel processing still vary for different runs.

Aerostat answered 1/3, 2023 at 16:16 Comment(2)
I know nothing about tensorflow but just looking at the code I can't help wondering if tensorflow.keras.Model is thread-safe. Take a look at: https://mcmap.net/q/530719/-is-keras-thread-safeChaudoin
Thanks so much @Pingu. This is helpful. I am now sure that it is not an issue in my code.Aerostat
C
1

It appears that the model is not thread safe. You can modify your code as follows (just this function) which will have the effect of cloning the model:

def model_error(model_weights):
    global data_inputs, data_outputs, model
    _model = tensorflow.keras.models.clone_model(model)
    _model.set_weights(model_weights)
    predictions = _model.predict(data_inputs)
    mae = tensorflow.keras.losses.MeanAbsoluteError()
    abs_error = mae(data_outputs, predictions).numpy() + 0.00000001
    return abs_error
Chaudoin answered 1/3, 2023 at 16:47 Comment(1)
Perfect. This is what I was looking for. I used threads with Keras in my library PyGAD (pygad.readthedocs.io) and thought it was an integration issue. After applying this change, I am sure things will work as expected.Aerostat

© 2022 - 2025 — McMap. All rights reserved.