3

I need to download a list of sites/URLs (which can vary over time) and I currently use multiprocessing.Manager().Queue() to submit and update said list.
I have to check each URL/task every second: hence each task will basically never ends (until a specific condition mets, like user interrupting). I thought that multiprocessing.Process() combined with asyncio and a good async HTTP client would solve the problem. Unfortunately, I still get a very high CPU usage after submitting 50 or more URLs. You will notice the difference yourselves when the tasks are not doing any requests - running mock_request() - and when they are - running do_request() -.

Here is an example to reproduce each case (press CTRL+C to end it gracefully at any time).

import asyncio, os, sys, time, httpx
import multiprocessing
import queue as Queue

class ExitHandler(object):
    def __init__(self, manager, queue, processes):
        self.manager = manager
        self.queue = queue
        self.processes = processes
        
    def set_exit_handler(self):
        if os.name == "nt":
            try:
                import win32api
                win32api.SetConsoleCtrlHandler(self.on_exit, True)
            except ImportError:
                version = ".".join(map(str, sys.version_info[:2]))
                raise Exception("pywin32 not installed for Python " + version)
        else:
            import signal
            signal.signal(signal.SIGINT, self.on_exit)
            #signal.signal(signal.CTRL_C_EVENT, func)
            signal.signal(signal.SIGTERM, self.on_exit)

    def on_exit(self, sig, func=None):
        print('[Main process]: exit triggered, terminating all workers')
        STOP_WAIT_SECS= 5 
        for _ in range(N_WORKERS):
            self.queue.put('END')
        
        try:
            end_time = time.time() + STOP_WAIT_SECS
            # wait up to STOP_WAIT_SECS for all processes to complete
            for proc in self.processes:
                join_secs = max(0.0, min(end_time - time.time(), STOP_WAIT_SECS))
                proc.join(join_secs)

            # clear the procs list and _terminate_ any procs that have not yet exited
            while self.processes and len(self.processes) > 0:
                proc = self.processes.pop()
                if proc.is_alive():
                    proc.terminate()
            
            self.manager.shutdown()

            # finally, kill this thread and any running
            os._exit(0)
        except Exception:
            pass

async def mock_request(url):

    # we won't do any request here, it's just an example of how much less CPU
    # each process consumes when not doing requests

    x = 0
    while True:
        try:
            x += 1
            print('Finished downloading {}'.format(url))
            await asyncio.sleep(1)
        except asyncio.CancelledError:
            return

async def do_request(url):

    while True:
        try:
            # I use httpx (https://github.com/encode/httpx/) as async client for its simplicity
            # feel free to use your preferred library (e.g. aiohttp)
            async with httpx.AsyncClient() as s:
                await s.get(url)
                print('Finished downloading {}'.format(url))
                await asyncio.sleep(1)
        except asyncio.CancelledError:
            return

def worker(queue):
    
    try:
        event_loop = asyncio.get_event_loop()
        event_loop.run_until_complete(request_worker(queue))
    except KeyboardInterrupt:
        pass

async def request_worker(queue):
    
    p = multiprocessing.current_process()
    loop = asyncio.get_event_loop()
    
    while True:
        try:
            task = await loop.run_in_executor(None, queue.get)
            
            if task == 'END':
                break
            
            elif task['action'] == 'DOWNLOAD':
                print('Worker {}: Received new task'.format(p.name))
                f = loop.create_task(do_request(task['url'])) # high CPU usage
                # f = loop.create_task(mock_request(task['url'])) # low (almost none) CPU usage

        except KeyboardInterrupt:
            pass
        except Queue.Empty:
            pass

    print('Task Worker {}: ending'.format(p.name))

def run_workers(queue, processes):

    print('Starting workers')

    for _ in range(N_WORKERS):
        processes.append(multiprocessing.Process(target=worker, args=(queue,)))

    task = {
        'action': 'DOWNLOAD',
        'url': 'https://google.com'
    }
    
    # this is just an example forcing the same URL * 100 times, while in reaility
    # it will be 1 different URL per task
    for _ in range(100):
        queue.put(task)

    for p in processes:
        p.start()

    for p in processes:
        p.join()
    
    return True

if __name__ == "__main__":
    processes = []
    N_WORKERS = 8 # processes to spawn
    manager = multiprocessing.Manager()
    q = manager.Queue() # main queue to send URLs to

    # just a useful clean exit handler (press CTRL+C to terminate)
    exit_handler = ExitHandler(manager, q, processes) 
    exit_handler.set_exit_handler()

    # start the workers
    run_workers(q, processes)

Here's just an example of how many CPU each process consumes, when doing requests simultaneously:

cpu usage

Any solution that significantly reduces CPU usage (keeping the same amount of requests per second) is accepted, either it uses multiprocessing or not. The only must for me is the async pattern.

3
  • 2
    What if you just... don't use multiprocessing, but still use asyncio? I suppose it's a long shot, but it's possible multiprocessing overhead is a significant part of your CPU usage. (and even if it's not, having a single process might make profiling easier) Commented Mar 3, 2021 at 11:12
  • @tjollans any solution is accepted, I tried several ways myself - including single process. I'm still unable to significantly reduce CPU usage (keeping the same amount of request per second). Commented Mar 3, 2021 at 11:17
  • I didn't use multiprocessing for my case, but I had issues spawning a lot of request ( ~500 requests \ sec ) using httpx. I ended up using aiohttp as the async http client backend, as it could handle my case without timeouts. Commented Mar 3, 2021 at 11:26

1 Answer 1

4
+50

This stands out:

while True:
    try:
        async with httpx.AsyncClient() as s:

This initializes a new client for each request which, by looking at the implementation, does imports and initializes an SSL context. These are expensive operations IMO, so running them inside the loop might be what's costing so much CPU.

Instead, consider reordering the code as

async with httpx.AsyncClient() as s:
  while True:
    try:
Sign up to request clarification or add additional context in comments.

1 Comment

Well, I guess you spot the bug... I'll have to rework my logic a little since this was just a simple example, but I already did some tests and yeah that seems to be the problem. I will confirm it and accept this answer in a few hours.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.