9

I just read this introduction, but am having trouble implementing either of the examples (commented code being the second example):

import asyncio
import pandas as pd
from openpyxl import load_workbook

async def loop_dfs(dfs):
    async def clean_df(df):
        df.drop(["column_1"], axis=1, inplace=True)
        ... a bunch of other inplace=True functions ...
        return "Done"

    # tasks = [clean_df(df) for (table, dfs) in dfs.items()]
    # await asyncio.gather(*tasks)

    tasks = [clean_df(df) for (table, df) in dfs.items()]
    completed, pending = await asyncio.wait(tasks)


def main():
    dfs = {
        sn: pd.read_excel("excel.xlsx", sheet_name=sn)
        for sn in load_workbook("excel.xlsx").sheetnames
    }

    # loop = asyncio.get_event_loop()
    # loop.run_until_complete(loop_dfs(dfs))

    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(loop_dfs(dfs))
    finally:
        loop.close()

main()

I saw a few other posts about how pandas doesn't support asyncio, and maybe i'm just missing a bigger picture, but that shouldn't matter if i'm doing inplace operations right? I saw recommendations for Dask but without immediate support for reading excel, figured i'd try this first but I keep getting

RuntimeError: Event loop already running

1 Answer 1

14

I saw a few other posts about how pandas doesn't support asyncio, and maybe i'm just missing a bigger picture, but that shouldn't matter if i'm doing inplace operations right?

In-place operations are those that modify existing data. That is a matter of efficiency, whereas your goal appears to be parallelization, an entirely different matter.

Pandas doesn't support asyncio not only because this wasn't yet implemented, but because Pandas doesn't typically do the kind of operations that asyncio supports well: network and subprocess IO. Pandas functions either use the CPU or wait for disk access, neither of which is a good fit for asyncio. Asyncio allows network communication to be expressed with coroutines that look like ordinary synchronous code. Inside a coroutine each blocking operation (e.g. a network read) is awaited, which automatically suspends the whole task if the data is not yet available. At each such suspension the system switches to the next task, creating effectively a cooperative multi-tasking system.

When trying to call a library that doesn't support asyncio, such as pandas, things will superficially appear to work, but you won't get any benefit and the code will run serially. For example:

async def loop_dfs(dfs):
    async def clean_df(df):
        ...    
    tasks = [clean_df(df) for (table, df) in dfs.items()]
    completed, pending = await asyncio.wait(tasks)

Since clean_df doesn't contain a single instance of await, it is a coroutine in name only - it will never actually suspend its execution to allow other coroutines to run. Thus await asyncio.wait(tasks) will run the tasks in series, as if you wrote:

for table, df in dfs.items():
    clean_df(df)

To get things to run in parallel (provided pandas occasionally releases the GIL during its operations), you should hand off the individual CPU-bound functions to a thread pool:

async def loop_dfs(dfs):
    def clean_df(df):  # note: ordinary def
        ...
    loop = asyncio.get_event_loop()
    tasks = [loop.run_in_executor(clean_df, df)
             for (table, df) in dfs.items()]
    completed, pending = await asyncio.wait(tasks)

If you go down that route, you don't need asyncio in the first place, you can simply use concurrent.futures. For example:

def loop_dfs(dfs):  # note: ordinary def
    def clean_df(df):
        ...
    with concurrent.futures.ThreadPoolExecutor() as executor:
        futures = [executor.submit(clean_df, df)
                   for (table, df) in dfs.items()]
        concurrent.futures.wait(futures)

figured i'd try this first but I keep getting RuntimeError: Event loop already running

That error typically means that you've started the script in an environment that already runs asyncio for you, such as a jupyter notebook. If that is the case, make sure that you run your script with stock python, or consult your notebook's documentation how to change your code to submit the coroutines to the event loop that already runs.

Sign up to request clarification or add additional context in comments.

12 Comments

Thank you very much this helped clear up all my confusion and I was using jupyter so that explains the error!
"Pandas doesn't support asyncio not only because this wasn't yet implemented, but because pandas is a CPU-bound library which is not a good fit for asyncio." I doubt that is true in many use cases. (CSV-text processing large files).
@EvanCarroll Can you elaborate what you mean by CSV-text processing? It certainly sounds like a CPU-bound operation, and therefore consistent with the statement in the answer.
Yes, I mean in any application that reads or writes from disk, you're going to find that operation to be an order of magnitude slower than processing. Pandas can do both. Read a 2 GB file from disk to memory. That's not CPU bound. That's I/O bound. Write a 2 GB file from memory to disk. It's even worse. You have to do a lot with Pandas to be slower than either of those operations. It's not to say you can't be CPU bound. But that in most cases where a disk is involved, I imagine you're I/O bound.
@EvanCarroll Good point, and I've now amended the answer not to focus on "CPU-bound". The rest of the point remains - when Pandas needs to wait for IO, it's doing disk IO which asyncio doesn't support at all (there are at least two external libraries that do, both using threads). I am not a Pandas developer, but I suspect that this is the reason why natively supporting asyncio doesn't make much sense.
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.