Supercharge Your Python Asyncio With Aiomultiprocess: A Comprehensive Guide | by Peng Qian | Jul, 2023


Finally, based on my experience, let me share some more practical best practices.

Use pool only

Although aiomultiprocess also provides the Process and Worker classes for us to choose from, we should always use the Pool class to ensure maximum efficiency due to the significant resource consumption of creating processes.

How to use queues

In a previous article, I explained how to use asyncio.Queue to implement the producer-consumer pattern to balance resources and performance.
In aiomultiprocess, we can also use queues. However, since we are in a process pool, we cannot use asyncio.Queue. At the same time, we cannot directly use multiprocessing.Queue in the process pool.
In this case, you should use multiprocessing.Manager().Queue() to create a queue, with the code as follows:

import random
import asyncio
from multiprocessing import Manager
from multiprocessing.queues import Queue

from aiomultiprocess import Pool

async def worker(name: str, queue: Queue):
while True:
item = queue.get()
if not item:
print(f"worker: {name} got the end signal, and will stop running.")
queue.put(item)
break
await asyncio.sleep(random.uniform(0.2, 0.7))
print(f"worker: {name} begin to process value {item}", flush=True)

async def producer(queue: Queue):
for i in range(20):
await asyncio.sleep(random.uniform(0.2, 0.7))
queue.put(random.randint(1, 3))
queue.put(None)

async def main():
queue: Queue = Manager().Queue()
producer_task = asyncio.create_task(producer(queue))

async with Pool() as pool:
c_tasks = [pool.apply(worker, args=(f"worker-{i}", queue))
for i in range(5)]
await asyncio.gather(*c_tasks)

await producer_task

if __name__ == "__main__":
asyncio.run(main())

Using initializer to initialize resources

Suppose you need to use an aiohttp session or a database connection pool in a coroutine method, but we cannot pass arguments when creating tasks in the main process because these objects cannot be pickled.

An alternative is to define a global object and an initialization method. In this initialization method, access the global object and perform initialization.

Just like multiprocessing.Pool, aiomultiprocess.Pool can accept an initialization method and corresponding initialization parameters when initialized. This method will be called to complete the initialization when each process starts:

import asyncio

from aiomultiprocess import Pool
import aiohttp
from aiohttp import ClientSession, ClientTimeout

session: ClientSession | None = None

def init_session(timeout: ClientTimeout = None):
global session
session = aiohttp.ClientSession(timeout=timeout)

async def get_status(url: str) -> int:
global session
async with session.get(url) as response:
status_code = response.status
return status_code

async def main():
url = "https://httpbin.org/get"
timeout = ClientTimeout(2)
async with Pool(initializer=init_session, initargs=(timeout,)) as pool:
tasks = [asyncio.create_task(pool.apply(get_status, (url,)))
for i in range(3)]
status = await asyncio.gather(*tasks)
print(status)

if __name__ == "__main__":
asyncio.run(main())

Exception handling and retries

Although aiomultiprocess.Pool provides the exception_handler parameter to help with exception handling, if you need more flexibility, you need to combine it with asyncio.wait. For the usage of asyncio.wait, you can refer to my previous article.

With asyncio.wait, you can get tasks that encounter exceptions. After extracting the task, you can make some adjustments and then re-execute the task, as shown in the code below:

import asyncio
import random

from aiomultiprocess import Pool

async def worker():
await asyncio.sleep(0.2)
result = random.random()
if result > 0.5:
print("will raise an exception")
raise Exception("something error")
return result

async def main():
pending, results = set(), []
async with Pool() as pool:
for i in range(7):
pending.add(asyncio.create_task(pool.apply(worker)))
while len(pending) > 0:
done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_EXCEPTION)
print(f"now the count of done, pending is {len(done)}, {len(pending)}")
for result in done:
if result.exception():
pending.add(asyncio.create_task(pool.apply(worker)))
else:
results.append(await result)
print(results)

if __name__ == "__main__":
asyncio.run(main())

Using Tenacity for retries

Of course, we have more flexible and powerful options for exception handling and retries, such as using the Tenacity library, which I explained in this article.

With Tenacity, the code above can be significantly simplified. You just need to add a decorator to the coroutine method, and the method will automatically retry when an exception is thrown.

import asyncio
from random import random

from aiomultiprocess import Pool
from tenacity import *

@retry()
async def worker(name: str):
await asyncio.sleep(0.3)
result = random()
if result > 0.6:
print(f"{name} will raise an exception")
raise Exception("something wrong")
return result

async def main():
async with Pool() as pool:
tasks = pool.map(worker, [f"worker-{i}" for i in range(5)])
results = await tasks
print(results)

if __name__ == "__main__":
asyncio.run(main())

Using tqdm to indicate progress

I like tqdm because it can always tell me how far the code has run when I’m waiting in front of the screen. This article also explains how to use it.

Since aiomultiprocess uses asyncio’s API to wait for tasks to complete, it is also compatible with tqdm:

import asyncio
from random import uniform

from aiomultiprocess import Pool
from tqdm.asyncio import tqdm_asyncio

async def worker():
delay = uniform(0.5, 5)
await asyncio.sleep(delay)
return delay * 10

async def main():
async with Pool() as pool:
tasks = [asyncio.create_task(pool.apply(worker)) for _ in range(1000)]
results = await tqdm_asyncio.gather(*tasks)

print(results[:10])

if __name__ == "__main__":
asyncio.run(main())



Source link

Leave a Comment