2

I have 2 functions.

1st function stores the data received in a list and 2nd function writes the data into a csv file.

I'm using Flask. Whenever a web service has been called it will store the data and send response to it, as soon as it sends response it triggers the 2nd function.

My Code:

from flask import Flask, flash, request, redirect, url_for, session
import json

app = Flask(__name__)

arr = []

@app.route("/test", methods=['GET','POST'])
def check():
    arr.append(request.form['a'])
    arr.append(request.form['b'])
    res = {'Status': True}
    return json.dumps(res)

def trigger():
    df = pd.DataFrame({'x': arr})
    df.to_csv("docs/xyz.csv", index=False)
    return 

Obviously the 2nd function is not called.

Is there a way to achieve this?

P.S: My real life problem is different where trigger function is time consuming and I don't want user to wait for it to finish execution.

2
  • 1
    I had similar problem in past, I used celery to push the function to task queue and returned success. you can also check async implementation with aiohttp. stackoverflow.com/questions/53430465/… Commented Nov 30, 2018 at 11:55
  • What was the end solution? Commented Dec 8, 2021 at 12:49

5 Answers 5

2

P.S: My real life problem is different where trigger function is time consuming and I don't want user to wait for it to finish execution.

Consider using celery which is made for the very problem you're trying to solve. From docs:

Celery is a simple, flexible, and reliable distributed system to process vast amounts of messages, while providing operations with the tools required to maintain such a system.

I recommend you integrate celery with your flask app as described here. your trigger method would then become a straightforward celery task that you can execute without having to worry about long response time.

Sign up to request clarification or add additional context in comments.

Comments

1

One solution would be to have a background thread that will watch a queue. You put your csv data in the queue and the background thread will consume it. You can start such a thread before first request:

import threading
from multiprocessing import Queue

class CSVWriterThread(threading.Thread):
    def __init__(self, *args, **kwargs):
        threading.Thread.__init__(self, *args, **kwargs)
        self.input_queue = Queue()

    def send(self, item):
        self.input_queue.put(item)

    def close(self):
        self.input_queue.put(None)
        self.input_queue.join()

    def run(self):
        while True:
            csv_array = self.input_queue.get()
            if csv_array is None:
                break

            # Do something here ...
            df = pd.DataFrame({'x': csv_array})
            df.to_csv("docs/xyz.csv", index=False)


            self.input_queue.task_done()
            time.sleep(1)
        # Done
        self.input_queue.task_done()
        return

@app.before_first_request
def activate_job_monitor():
    thread = CSVWriterThread()
    app.csvwriter = thread
    thread.start()

And in your code put the message in the queue before returning:

@app.route("/test", methods=['GET','POST'])
def check():
    arr.append(request.form['a'])
    arr.append(request.form['b'])
    res = {'Status': True}
    app.csvwriter.send(arr)
    return json.dumps(res)

3 Comments

and Queue is in multiprocessing module now.
One more thing, how can I pass multiple params to app.csvwriter.send(arr). ?Suppose I have to send one more array called j = [1,2] How to read it in run function?
Pass and get them as a tuple: app.csvwriter.send((first_arg, second_arg)) and then first_arg, second_arg, = self.input_queue.get()
1

Im actually working on another interesting case on my side where i pass the work off to a python worker that sends the job to a redis queue. There are some great blogs using redis with Flask , you basically need to ensure redis is running (able to connect on port 6379)

The worker would look something like this:

import os

import redis
from rq import Worker, Queue, Connection

listen = ['default']

redis_url = os.getenv('REDISTOGO_URL', 'redis://localhost:6379')

conn = redis.from_url(redis_url)

if __name__ == '__main__':
    with Connection(conn):
        worker = Worker(list(map(Queue, listen)))
        worker.work()

In my example I have a function that queries a database for usage and since it might be a lengthy process i pass it off to the worker (running as a seperate script)

def post(self):

    data = Task.parser.parse_args()

    job = q.enqueue_call(
        func=migrate_usage, args=(my_args),
        result_ttl=5000
    )
    print("Job ID is: {}".format(job.get_id()))
    job_key = job.get_id()

    print(str(Job.fetch(job_key, connection=conn).result))

    if job:
        return {"message": "Job : {} added to queue".format(job_key)}, 201

Credit due to the following article:

https://realpython.com/flask-by-example-implementing-a-redis-task-queue/#install-requirements

1 Comment

sounds interesting...looking into it.
0

You can try use streaming. See next example:

import time
from flask import Flask, Response

app = Flask(__name__)

@app.route('/')
def main():
    return '''<div>start</div>
    <script>
        var xhr = new XMLHttpRequest();
        xhr.open('GET', '/test', true);
        xhr.onreadystatechange = function(e) {
            var div = document.createElement('div');
            div.innerHTML = '' + this.readyState + ':' + this.responseText;
            document.body.appendChild(div);
        };
        xhr.send();
    </script>
    '''

@app.route('/test')
def test():
    def generate():
        app.logger.info('request started')
        for i in range(5):
            time.sleep(1)
            yield str(i)
        app.logger.info('request finished')
        yield ''
    return Response(generate(), mimetype='text/plain')

if __name__ == '__main__':
    app.run('0.0.0.0', 8080, True)

All magic in this example in genarator where you can start response data, after do some staff and yield empty data to end your stream.

For details look at http://flask.pocoo.org/docs/patterns/streaming/.

Comments

0

You can defer route specific actions with limited context by combining after_this_request and response.call_on_close. Note that request and response context won't be available but the route function context remains available. So you'll need to copy any request/response data you'll need into local variables for deferred access.

I moved your array to a local var to show how the function context is preserved. You could change your csv write function to an append so you're not pushing data endlessly into memory.

from flask import Flask, flash, request, redirect, url_for, session
import json

app = Flask(__name__)

@app.route("/test", methods=['GET','POST'])
def check():
    arr = []
    arr.append(request.form['a'])
    arr.append(request.form['b'])
    res = {'Status': True}

    @flask.after_this_request
    def add_close_action(response):
        @response.call_on_close
        def process_after_request():
            df = pd.DataFrame({'x': arr})
            df.to_csv("docs/xyz.csv", index=False)
        return response
    return json.dumps(res)

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.