Mobility Bot

Commute Alerts for Slack and Microsoft Teams

Using Python asyncio in a standalone script

Using Python asyncio is a great way to write performant code in modern Python. Several years ago, you had to use threading or multiprocessing library to gain parallelism, especially when dealing with I/O-bound tasks such as database access and web requests.

Many existing examples of async code are of async-based web request handlers, but not a standalone script. For Mobility Bot, we have a few long-running processes that help gather service alert data from 20+ transit operators, and we want these to run on a regular cadence. It doesn’t make sense to pay the cost of initialization each interval, so this is implemented as a long-running python script. This was the original setup:

import asyncio
import signal

async def main(stop_event: asyncio.Event):
    # perform any one-time initialization here, such as a DB connection pool
    await open_pool()

    processors: list[Processor] = [...] # processors have an `async def process` method

    # start the main loop
    while not stop_event.is_set():
        await asyncio.gather(*[p.process() for p in processors], return_exceptions=True)

        # sleep 60 seconds, allowing an interrupt on Ctrl-C or other signal via regular wakeups
        for _ in range(60):
            if stop_event.is_set():
                break
            await asyncio.sleep(1.0)

    logger.info("shutting down...")
    await close_pool()
    await asyncio.sleep(0.25)


if __name__ == "__main__":
    stop_event = asyncio.Event()

    def int_handler(signal: int, frame):
        stop_event.set()

    signal.signal(signal.SIGINT, int_handler)
    asyncio.run(main(stop_event))

This code worked for some time, but it has two main drawbacks.

  1. As we added more processors, we were waiting longer for the asyncio.gather to finish, as it would take as long as the slowest processor. This means we were often waiting longer than desired between checks, even for processors that complete quickly. Our desired interval of 60 seconds turned into closer to 67 seconds.
  2. Becuase every processor kicked off at the same time, we’d experience the thundering herd problem when it came to database interactions. There would be a flurry of activity while processors were running, and then 60 seconds of nothing.

To solve both of these problems, we modified the implementation so each processor could be “free-running”. This means they each sleep independently, which causes them to spread out a bit as far as when they are running, evening out the load on the database. The change isn’t a big one, but it really helps take advantage of asyncio’s abilities to run many things concurrently.

import asyncio

async def main():
    # perform any one-time initialization here, such as a DB connection pool
    await open_pool()

    processors: list[Processor] = [...] # processors have an `async def process` method

    async def periodic_process(p: Processor, delay: float | int):
        while True:
            await p.process()
            await asyncio.sleep(delay)

    # each periodic task runs independently until it is interrupted
    try:
        async with asyncio.TaskGroup() as task_group:
            for p in processors:
                task_group.create_task(periodic_process(p, 60))
    except asyncio.CancelledError:
        pass

    logger.info("shutting down...")
    await close_pool()
    await asyncio.sleep(0.25)

if __name__ == "__main__":
    asyncio.run(main())

This means a processor that takes 500ms will run every 60.5 seconds, but a processor that takes 4000ms will run every 64 seconds, remediating the thundering herd problem without needing any sort of delay randomization or other hacks.

Finally, if you want improved performance over the built-in asyncio implementation, it is trivial to switch to uvloop. This is likely already a dependency of your project if you’re using any of the asyncio-based web frameworks, since many of them recommend using uvicorn as a web server.

import asyncio
import uvloop

async def main():
    # implementation code goes here
    pass

if __name__ == "__main__":
    # Add this one line, and uvloop will be used for the main event loop
    asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
    asyncio.run(main())
Related Posts

🚌 Ensure your commute is smooth and hassle-free with Mobility Bot. Install it now in your Slack workspace!
🚇 Stay informed with real-time transit alerts tailored to your specific routes and schedule.