2

I am working on a big data problem and am stuck with some concurrency and async io issues. The problem is as follows:

1) Have multiple huge files (~4gb each x upto 15) which I am processing using ProcessPoolExecutor from concurrent.futures module this way :

def process(source):
    files = os.list(source)
    with ProcessPoolExecutor() as executor:
        future_to_url = {executor.submit(process_individual_file, source, input_file):input_file for input_file in files}
        for future in as_completed(future_to_url):
            data = future.result()

2) Now in each file, I want to go line by line, process line to create a particular json, group such 2K jsons together and hit an API with that request to get response. Here is the code:

def process_individual_file(source, input_file):
    limit = 2000
    with open(source+input_file) as sf:
        for line in sf:
            json_array.append(form_json(line))
            limit -= 1

            if limit == 0:
                response = requests.post(API_URL, json=json_array)
                #check response status here
                limit = 2000

3) Now the problem, the number of lines in each file being really large and that API call blocking and bit slow to respond, the program is taking huge amount of time to complete.

4) What I want to achieve is to make that API call async so that I can keep processing next batch of 2000 when that API call is happening.

5) Things I tried till now : I was trying to implement this using asyncio but there we need to collect the set of future tasks and wait for completion using event loop. Something like this:

async def process_individual_file(source, input_file):
    tasks = []
    limit = 2000
    with open(source+input_file) as sf:
        for line in sf:
            json_array.append(form_json(line))
            limit -= 1

            if limit == 0:
               tasks.append(asyncio.ensure_future(call_api(json_array)))
               limit = 2000

    await asyncio.wait(tasks)

ioloop = asyncio.get_event_loop()
ioloop.run_until_complete(process_individual_file(source, input_file))
ioloop.close()

6) I am really not understanding this because this is indirectly the same as previous as it waits to collect all tasks before launching them. Can someone help me with what should be the correct architecture of this problem ? How can I call the API async way, without collecting all tasks and with ability to process next batch parallely ?

1 Answer 1

2

I am really not understanding this because this is indirectly the same as previous as it waits to collect all tasks before launching them.

No, you wrong here. When you create asyncio.Task with asyncio.ensure_future it starts executing call_api coroutine immediately. This is how tasks in asyncio work:

import asyncio


async def test(i):
    print(f'{i} started')
    await asyncio.sleep(i)


async def main():
    tasks = [
        asyncio.ensure_future(test(i))
        for i
        in range(3)
    ]

    await asyncio.sleep(0)
    print('At this moment tasks are already started')

    await asyncio.wait(tasks)


if __name__ ==  '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Output:

0 started
1 started
2 started
At this moment tasks are already started

Problem with your approach is that process_individual_file is not actually asynchronous: it does large amount of CPU-related job without returning control to your asyncio event loop. It's a problem - function blocks event loop making impossible tasks to be executed.

Very simple, but effective solution I think you can use - is to return control to event loop manually using asyncio.sleep(0) after a few amount of executing process_individual_file, for example, on reading each line:

async def process_individual_file(source, input_file):
    tasks = []
    limit = 2000
    with open(source+input_file) as sf:
        for line in sf:
            await asyncio.sleep(0)  # Return control to event loop to allow it execute tasks

            json_array.append(form_json(line))
            limit -= 1

            if limit == 0:
               tasks.append(asyncio.ensure_future(call_api(json_array)))
               limit = 2000

    await asyncio.wait(tasks)

Upd:

there will be more than millions of requests to be done and hence I am feeling uncomfortable to store future objects for all of them in a list

It makes much sense. Nothing good will happen if you run million parallel network requests. Usual way to set limit in this case is to use synchronization primitives like asyncio.Semaphore.

I advice you to make generator to get json_array from file, and acquire Semaphore before adding new task and release it on task ready. You will get clean code protected from many parallel running tasks.

This will look like something like this:

def get_json_array(input_file):
    json_array = []
    limit = 2000

    with open(input_file) as sf:
        for line in sf:
            json_array.append(form_json(line))

            limit -= 1
            if limit == 0:
                yield json_array  # generator will allow split file-reading logic from adding tasks

                json_array = []
                limit = 2000


sem = asyncio.Semaphore(50)  # don't allow more than 50 parallel requests

async def process_individual_file(input_file):
    for json_array in get_json_array(input_file):
        await sem.acquire()  # file reading wouldn't resume until there's some place for newer tasks
        task = asyncio.ensure_future(call_api(json_array))
        task.add_done_callback(lambda t: sem.release())  # on task done - free place for next tasks
        task.add_done_callback(lambda t: print(t.result()))  # print result on some call_api done
Sign up to request clarification or add additional context in comments.

3 Comments

Thanks @Mikhail Gerasimov for correcting me. Indeed I didn't understand it correctly and thus was stuck as expected results were not coming. What you are saying makes sense but again one more doubt I am getting here is that because of size of file, there will be more than millions of requests to be done and hence I am feeling uncomfortable to store future objects for all of them in a list.
Here is one more article I was going through in the mean time link. Seems like I can trigger event loop in another thread and delegate my futures to that thread and trigger callback on API response. Working on the same poc right now. Let me also try out your suggestions and get back with results. Thanks a lot :)
@ShubhamPatil I updated answer showing how to avoid many parallel requests.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.