I am looking at the best (and correct way) to obtain a request-independent db session.
The problem is the following: I am building a web application that has to access the database. The endpoint exposed accepts a request, performs the first work, then create a thread (that will perform the hard work), starts it, and replies to the client with a unique id for the "job". Meanwhile the thread goes on with its work (and it has to access the database) and the client can perform polling to check the status. I am not using dedicated framework to perform this background job, but only a simple thread. I can only have one single background thread going on at any time, for this reason I am maintaining the state in a singleton.
The application is created with the application factory design https://flask.palletsprojects.com/en/1.1.x/patterns/appfactories/
I am using Gunicorn as WSGI server and sqlite as database.
The basic structure of the code is the following (I am removing the business logic and imports, but the concept remain):
api_jobs.py
@bp.route('/jobs', methods=['POST'])
def create_job():
data = request.get_json(force=True) or {}
name = data['name']
job_controller = JobController() # This is a singleton
job_process = job_controller.start_job(name)
job_process_dict = job_process.to_dict()
return jsonify(job_process_dict)
controller.py
class Singleton(type):
_instances = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
return cls._instances[cls]
class JobController(object):
__metaclass__ = Singleton
def __init__(self):
self.job_thread = None
def start_job(self, name):
if self.job_thread is not None:
job_id = self.job_thread.job_id
job_process = JobProcess.query.get(job_id)
if job_process.status != 'end':
raise ValueError('A job process is already ongoing!')
else:
self.job_thread = None
job_process = JobProcess(name)
db.session.add(job_process)
db.session.commit() # At this step I create the ID
self.job_thread = JobThread(db.session, job_process.id)
self.job_thread.start()
return job_process
class JobThread(threading.Thread):
def __init__(self, db_session, job_id):
self.job_id = job_id
self.db_session = db_session
self.session = self.db_session()
def run(self):
self.job_process = self.session.query(JobProcess).get(self.job_id)
self.job_process.status = 'working'
self.session.commit()
i = 0
while True:
sleep(1)
print('working hard')
i = i +1
if i > 10:
break
self.job_process.status = 'end'
self.session.commit()
self.db_session.remove()
models.py
class JobProcess(db.Model):
id = db.Column(db.Integer, primary_key=True)
status = db.Column(db.String(64))
name = db.Column(db.String(64))
def to_dict(self):
data = {
'id': self.id,
'status': self.status,
'name': self.name,
}
return data
From my understanding, calling self.session = self.db_session() is actually doing nothing (due to the fact that sqlalchemy is using a registry, that is also a proxy, if I am not wrong), however that was the best attempt that I found to create a "new/detached/useful" session.
I checked out https://docs.sqlalchemy.org/en/13/orm/contextual.html#using-thread-local-scope-with-web-applications in order to obtain a request-independent db-session, however even using the suggested method of creating a new session factory (sessionmaker + scoped_session), does not work.
The errors that I obtain, with slight changes to the code, are multiple, in this configuration the error is
DetachedInstanceError: Instance <JobProcess at 0x7f875f81c350> is not bound to a Session; attribute refresh operation cannot proceed (Background on this error at: http://sqlalche.me/e/bhk3)
The basic question remains: Is it possible to create a session that will live inside the thread and that I will take care of creating/tearing down?