Unleashing the Power of Python Asyncio’s Queue | by Peng Qian | Jun, 2023


The queue mentioned earlier is a First-In-First-Out (FIFO) queue, where the first item to enter the queue is the first to be retrieved. This is suitable when all tasks in the queue have the same priority.

However, consider the following situation:

Suppose there is a queue with tasks waiting in line, each requiring a long processing time.

An error log or VIP user access is a high-priority task that needs immediate attention. What should we do?

Photo by Ethan Hu on Unsplash

This is where asyncio.PriorityQueue comes into play.

Briefly describe asyncio.PriorityQueue’s implementation

Unlike FIFO queues based on lists, asyncio.PriorityQueue is based on heaps. It is built using a binary tree structure.

You may be familiar with binary search trees, which ensure that the most minor node is always the leftmost node.

However, the binary tree in asyncio.PriorityQueue ensures that the most minor node is always at the top, so the highest priority node is permanently removed first.

On the left is the binary tree used by PriorityQueue, and on the right is the binary search tree.
On the left is the binary tree used by PriorityQueue, and on the right is the binary search tree. Image by Author

Real-world example with asyncio.PriorityQueue

Let’s illustrate the usage of asyncio.PriorityQueue with a real-world scenario that exists in practice.

Imagine we have an order service API. The API takes time for each order to process, but we can’t keep users waiting too long.

So when a user places an order, the API first puts the order into a queue, allowing a background task to process it asynchronously while immediately returning a message to the user.

This API accepts orders from two types of users: regular users and VIP users. It must ensure that VIP user orders are processed with the highest priority.

VIP orders are processed with the highest priority.
VIP orders are processed with the highest priority. Image by Author

To keep the learning curve low for readers, in this example, we will use aiohttp to implement the server. The specific code is as follows:

First, we define an enumeration marking the two categories: regular users and VIP users.

Next, we use dataclass to define a user’s order, which contains the user type and order processing duration. The order duration is not considered in priority sorting.

Then we define the consumer method process_order_worker, which retrieves orders from the queue and simulates the order processing.

Don’t forget to use queue.task_done() to tell the queue that we finished processing the order.

Following that, we implement the order API using aiohttp. This API responds to user requests, generates an order object, and places it in the asyncio.PriorityQueue.

It then immediately returns a response to the user, avoiding user wait time.

When the program starts, we use create_order_queue to initialize the queue and order consumption tasks.

When the program ends, we use destroy_order_queue to ensure that all orders in the queue are processed and the background tasks are closed correctly.

queue.join() will wait for all the data in the queue to be processed. asyncio.wait_for sets a timeout of 20 seconds, after which it will no longer wait queue.join() to complete.

We can test this implementation using PyCharm’s HTTP Request:

API prioritizes orders from VIP users whenever possible.
API prioritizes orders from VIP users whenever possible. Image by Author

As you can see, the two high-priority tasks are processed as expected. Perfect!



Source link

Leave a Comment