4

I have a route in my Flask app that spawns a process (using multiprocessing.Process) to do some background work. That process needs to be able to write to the database.

__init__.py:

from flask_sqlalchemy import SQLAlchemy
from project.config import Config

db = SQLAlchemy()

# app factory
def create_app(config_class=Config):
    app = Flask(__name__)
    app.config.from_object(Config)
    db.init_app(app)
    return app

And this is the relevant code that illustrates that way i'm spawning the process and using the db connection:

def worker(row_id):    
    db_session = db.create_scoped_session()

    # Do stuff with db_session here

    db_session.close()

@app.route('/worker/<row_id>/start')
def start(row_id):
    p = Process(target=worker, args=(row_id,))
    p.start()
    return redirect('/')

The problem is that sometimes (not always) i have errors like this one:

sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) insufficient data in "D" message lost synchronization with server: got message type "a", length 1668573551

I assume that this is related to the fact that there is another process accessing the database (because if i don't use a separate process, everything works fine) but i honestly can't find a way of fixing it. As you can see on my code, i tried used create_scoped_session() method as an attempt to fix this but the problem is the same.

Any help?

6
  • I wonder what happens with the process object when it goes out of scope. With start() you start running it, yes, but it doesn't block but goes to the background. Then your function proceeds to do the redirect, and the p object goes out of scope and is deleted. Or does multiprocessing keep it alive, does the db ob succeed? In the doc they use join() to wait for the process to stop, ofc you can return from the api though and keep it running on the back by holding a reference to it if that's it. docs.python.org/3/library/… Commented Apr 23, 2022 at 22:33
  • @antont yes the process is kept alive in the background, i don't want to block before redirecting. Commented Apr 23, 2022 at 22:36
  • You mean the mp lib keeps it alive? Ok. Perhaps do the init_db thing in the worker? I'm using SQLAlchemy from many processes (workers) with FastAPI (similar to Flask but with gevent async biz), and there we create the db connections from scratch per process I think. Then sessions within the process. Commented Apr 23, 2022 at 22:41
  • @antont hmmm. that's interesting. but how would i go about passing the app variable to the database initializer? i mean, if i go by your suggestion, should i do db_session = db.init_app(app)just like i do on __init__.py? Commented Apr 23, 2022 at 22:43
  • 1
    I was thinking you don't need the app there. The Flask app is for the http and url routing, not needed for running a db op. I think a new process needs a new db connection anyway, but you can connect with sqlalchemy and don't need flask for it. What you use from flask_sqlalchemy is I guess some helper, I don't know it, but I think you can connect just with SQLAlchemy to do any normal db op. Commented Apr 23, 2022 at 22:50

1 Answer 1

3

Ok so, i followed @antont 's hint and created a new sqlalchemy session inside the worker function this way and it worked flawlessly:

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

def worker(row_id):    
    db_url = os.environ['DATABASE_URL']
    db_engine = create_engine(db_url)
    Session = sessionmaker(bind=db_engine)
    db_session = Session()

    # Do stuff with db_session here

    db_session.close()
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.