3

I am given the task to update a database over the network with sqlalchemy. I have decided to use python's threading module. Currently I am using 1 thread, aka the producer thread, to direct other threads to consume work units via a queue.

The producer thread does something like this:

  def produce(self, last_id):
    unit = session.query(Request).order_by(Request.id) \
        .filter(Request.item_id == None).yield_per(50)
    self.queue.put(unit, True, Master.THREAD_TIMEOUT)     

while the consumer threads does something similar to this:

  def consume(self):
    unit = self.queue.get()
    request = unit
    item = Item.get_item_by_url(request)
    request.item = item
    session.add(request)
    session.flush()

and I am using sqlalchemy's scoped session:

session = scoped_session(sessionmaker(autocommit=True, autoflush=True, bind=engine))

However, I am getting the exception,

"sqlalchemy.exc.InvalidRequestError: Object FOO is already attached to session '1234' (this is '5678')"

I understand that this exception comes from the fact that the request object is created in one session (the producer session) while the consumers are using another scoped session because they belong to another thread.

My work around is to have my producer thread pass in the request.id into the queue while the consumer has to call the code below to retrieve the request object.

request = session.query(Request).filter(Request.id == request_id).first()

I do not like this solution because this involves another network call and is obviously not optimal.

  1. Are there ways to avoid wasting the result of the producer's db call?
  2. Is there a way to write the "produce" so that more than 1 id is passed into the queue as a work unit?

Feedback welcomed!

1 Answer 1

4

You need to detach your Request instance from the main thread session before you put it into the queue, then attach it to the queue processing thread session when taken from the queue again.

To detach, call .expunge() on the session, passing in the request:

session.expunge(unit)

and then when processing it in a queue thread, re-attach it by merging; set the load flag to False to prevent a round-trip to the database again:

session.merge(request, load=False)
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.