Using Python asyncio in a standalone script
Using Python asyncio is a great way to write performant code in modern Python. Several years ago, you had to use threading
or multiprocessing
library to gain parallelism, especially when dealing with I/O-bound tasks such as database access and web requests.
Many existing examples of async code are of async-based web request handlers, but not a standalone script. For Mobility Bot, we have a few long-running processes that help gather service alert data from 20+ transit operators, and we want these to run on a regular cadence. It doesn’t make sense to pay the cost of initialization each interval, so this is implemented as a long-running python script. This was the original setup:
import asyncio
import signal
async def main(stop_event: asyncio.Event):
# perform any one-time initialization here, such as a DB connection pool
await open_pool()
processors: list[Processor] = [...] # processors have an `async def process` method
# start the main loop
while not stop_event.is_set():
await asyncio.gather(*[p.process() for p in processors], return_exceptions=True)
# sleep 60 seconds, allowing an interrupt on Ctrl-C or other signal via regular wakeups
for _ in range(60):
if stop_event.is_set():
break
await asyncio.sleep(1.0)
logger.info("shutting down...")
await close_pool()
await asyncio.sleep(0.25)
if __name__ == "__main__":
stop_event = asyncio.Event()
def int_handler(signal: int, frame):
stop_event.set()
signal.signal(signal.SIGINT, int_handler)
asyncio.run(main(stop_event))
This code worked for some time, but it has two main drawbacks.
- As we added more processors, we were waiting longer for the
asyncio.gather
to finish, as it would take as long as the slowest processor. This means we were often waiting longer than desired between checks, even for processors that complete quickly. Our desired interval of 60 seconds turned into closer to 67 seconds. - Becuase every processor kicked off at the same time, we’d experience the thundering herd problem when it came to database interactions. There would be a flurry of activity while processors were running, and then 60 seconds of nothing.
To solve both of these problems, we modified the implementation so each processor could be “free-running”. This means they each sleep independently, which causes them to spread out a bit as far as when they are running, evening out the load on the database. The change isn’t a big one, but it really helps take advantage of asyncio’s abilities to run many things concurrently.
import asyncio
async def main():
# perform any one-time initialization here, such as a DB connection pool
await open_pool()
processors: list[Processor] = [...] # processors have an `async def process` method
async def periodic_process(p: Processor, delay: float | int):
while True:
await p.process()
await asyncio.sleep(delay)
# each periodic task runs independently until it is interrupted
try:
async with asyncio.TaskGroup() as task_group:
for p in processors:
task_group.create_task(periodic_process(p, 60))
except asyncio.CancelledError:
pass
logger.info("shutting down...")
await close_pool()
await asyncio.sleep(0.25)
if __name__ == "__main__":
asyncio.run(main())
This means a processor that takes 500ms will run every 60.5 seconds, but a processor that takes 4000ms will run every 64 seconds, remediating the thundering herd problem without needing any sort of delay randomization or other hacks.
Finally, if you want improved performance over the built-in asyncio
implementation, it is trivial to switch to uvloop
. This is likely already a dependency of your project if you’re using any of the asyncio-based web frameworks, since many of them recommend using uvicorn
as a web server.
import asyncio
import uvloop
async def main():
# implementation code goes here
pass
if __name__ == "__main__":
# Add this one line, and uvloop will be used for the main event loop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
asyncio.run(main())
Related Posts
- Using Slack Bolt in a Python Litestar App
- Improved Related Alert Matching
- Using Block Kit helpers in the Python Slack SDK
🚌 Ensure your commute is smooth and hassle-free with Mobility Bot. Install it now in your Slack workspace!
🚇 Stay informed with real-time transit alerts tailored to your specific routes and schedule.