14

I am using keras 2.0.8 with tensorflow 1.3.0 backend.

I am loading a model in the class init and then use it to predict multithreaded.

import tensorflow as tf
from keras import backend as K
from keras.models import load_model


class CNN:
    def __init__(self, model_path):
        self.cnn_model = load_model(model_path)
        self.session = K.get_session()
        self.graph = tf.get_default_graph()

    def query_cnn(self, data):
        X = self.preproccesing(data)
        with self.session.as_default():
            with self.graph.as_default():
                return self.cnn_model.predict(X)

I initialize the CNN once and the query_cnn method happens from multiple threads.

The exception i get in my log is:

  File "/home/*/Similarity/CNN.py", line 43, in query_cnn
    return self.cnn_model.predict(X)
  File "/usr/local/lib/python3.5/dist-packages/keras/models.py", line 913, in predict
    return self.model.predict(x, batch_size=batch_size, verbose=verbose)
  File "/usr/local/lib/python3.5/dist-packages/keras/engine/training.py", line 1713, in predict
    verbose=verbose, steps=steps)
  File "/usr/local/lib/python3.5/dist-packages/keras/engine/training.py", line 1269, in _predict_loop
    batch_outs = f(ins_batch)
  File "/usr/local/lib/python3.5/dist-packages/keras/backend/tensorflow_backend.py", line 2273, in __call__
    **self.session_kwargs)
  File "/usr/local/lib/python3.5/dist-packages/tensorflow/python/client/session.py", line 895, in run
    run_metadata_ptr)
  File "/usr/local/lib/python3.5/dist-packages/tensorflow/python/client/session.py", line 1124, in _run
    feed_dict_tensor, options, run_metadata)
  File "/usr/local/lib/python3.5/dist-packages/tensorflow/python/client/session.py", line 1321, in _do_run
    options, run_metadata)
  File "/usr/local/lib/python3.5/dist-packages/tensorflow/python/client/session.py", line 1340, in _do_call
    raise type(e)(node_def, op, message)
tensorflow.python.framework.errors_impl.NotFoundError: PruneForTargets: Some target nodes not found: group_deps 

The code works fine most of the times, its probably some problem with the multithreading.

How can i fix it?

1 Answer 1

26
+50

Make sure you finish the graph creation before creating the other threads.

Calling finalize() on the graph may help you with that.

def __init__(self, model_path):
        self.cnn_model = load_model(model_path)
        self.session = K.get_session()
        self.graph = tf.get_default_graph()
        self.graph.finalize()

Update 1: finalize() will make your graph read-only so it can be safely used in multiple threads. As a side effect, it will help you find unintentional behavior and sometimes memory leaks as it will throw an exception when you try to modify the graph.

Imagine that you have a thread that does for instance one hot encoding of your inputs. (bad example:)

def preprocessing(self, data):
    one_hot_data = tf.one_hot(data, depth=self.num_classes)
    return self.session.run(one_hot_data)

If you print the amount of objects in the graph you will notice that it will increase over time

# amount of nodes in tf graph
print(len(list(tf.get_default_graph().as_graph_def().node)))

But if you define the graph first that won't be the case (slightly better code):

def preprocessing(self, data):
    # run pre-created operation with self.input as placeholder
    return self.session.run(self.one_hot_data, feed_dict={self.input: data})

Update 2: According to this thread you need to call model._make_predict_function() on a keras model before doing multithreading.

Keras builds the GPU function the first time you call predict(). That way, if you never call predict, you save some time and resources. However, the first time you call predict is slightly slower than every other time.

The updated code:

def __init__(self, model_path):
    self.cnn_model = load_model(model_path)
    self.cnn_model._make_predict_function() # have to initialize before threading
    self.session = K.get_session()
    self.graph = tf.get_default_graph() 
    self.graph.finalize() # make graph read-only

Update 3: I did a proof of concept of a warming up, because _make_predict_function() doesn't seems to work as expected. First I created a dummy model:

import tensorflow as tf
from keras.layers import *
from keras.models import *

model = Sequential()
model.add(Dense(256, input_shape=(2,)))
model.add(Dense(1, activation='softmax'))

model.compile(loss='mean_squared_error', optimizer='adam')

model.save("dummymodel")

Then in another script I loaded that model and made it run on multiple threads

import tensorflow as tf
from keras import backend as K
from keras.models import load_model
import threading as t
import numpy as np

K.clear_session()

class CNN:
    def __init__(self, model_path):

        self.cnn_model = load_model(model_path)
        self.cnn_model.predict(np.array([[0,0]])) # warmup
        self.session = K.get_session()
        self.graph = tf.get_default_graph()
        self.graph.finalize() # finalize

    def preproccesing(self, data):
        # dummy
        return data

    def query_cnn(self, data):
        X = self.preproccesing(data)
        with self.session.as_default():
            with self.graph.as_default():
                prediction = self.cnn_model.predict(X)
        print(prediction)
        return prediction


cnn = CNN("dummymodel")

th = t.Thread(target=cnn.query_cnn, kwargs={"data": np.random.random((500, 2))})
th2 = t.Thread(target=cnn.query_cnn, kwargs={"data": np.random.random((500, 2))})
th3 = t.Thread(target=cnn.query_cnn, kwargs={"data": np.random.random((500, 2))})
th4 = t.Thread(target=cnn.query_cnn, kwargs={"data": np.random.random((500, 2))})
th5 = t.Thread(target=cnn.query_cnn, kwargs={"data": np.random.random((500, 2))})
th.start()
th2.start()
th3.start()
th4.start()
th5.start()

th2.join()
th.join()
th3.join()
th5.join()
th4.join()

Commenting the lines for the warmingup and finalize I was able to reproduce your first issue

Sign up to request clarification or add additional context in comments.

14 Comments

Should i finalize after saving the graph variable?
The issue is that the graph creation is not thread-safe, so you need to finish the graph creation in one thread, and then start the others. finalize() will make the graph read-only
I uploaded it to production and i wait to see if it comes back. For now its good. I will give it a few days more.
I can see on the traceback that "Graph is finalized and cannot be modified." is called by the line return self.cnn_model.predict(X). So this is not happening on the preprocessing stage, and there is nothing in the preprocessing that use tf. Maybe somehow keras is trying to change the graph?
:(, the next thing that I would try is to warmup the model with dummy data in the __init__() function. Something like self.cnn_model.predict(np.zeros(inputdim))
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.