AsyncIO Task Management: A Hands-On Scheduler Project

by

in
• Originally published at KitfuCoda.Medium.

We discussed Awaitables last week. The article covered coroutines, tasks and futures as well as a quick introduction to the event loop. Let’s build an example task management project to continue our journey on learning asynchronous programming with AsyncIO.

A cute cartoon generated by Microsoft Copilot on the topic

Here’s a recap of relevant articles on AsyncIO so far:

How to write an AsyncIO Telegram bot in Python
Last week, we discussed a quick weekend project on building a word game. That project was sparked by a conversation…kitfucoda.medium.com

Understanding Awaitables: Coroutines, Tasks, and Futures in Python
Previously, we explored chatbot building with AsyncIO. As I am committing to publish one article a week, I am…kitfucoda.medium.com

  1. How to write an AsyncIO Telegram bot in Python
  2. Understanding Awaitables: Coroutines, Tasks, and Futures in Python
  3. AsyncIO Task Management: A Hands-On Scheduler Project

Introduction: Building Our Task Scheduler

Photo by Eden Constantino on Unsplash

In the previous article, we mentioned briefly on event loops. Think of it as a timetable-like structure, which is executed throughout the lifetime of an application. Awaitables, such as coroutines, tasks, and futures are then scheduled for concurrent execution within it. In recent releases, the Python developers provided more abstractions and reduced the need to manually manage event loops.

In the article on programming a chatbot, we created a new process each for the chatbot and the web application. We then call asyncio.run in each of the processes to execute the first coroutine. So let’s copy that to bootstrap our sample application:

import asyncio


async def main():
    await asyncio.sleep(5)


if __name__ == '__main__':
    asyncio.run(main()) 

If we run the program, it will run for 5 seconds through asyncio.sleep, and then exits. As mentioned last week, calling a coroutine (asyncio.sleep) would not cause it to execute. Omitting the await keyword will result in an error. This is because the coroutine object returned by asyncio.sleep(5) does not get scheduled for execution.

We mentioned the developer quality-of-life improved in recent releases. In the snippet, calling asyncio.run (documentation link here) creates an event loop, schedule the coroutine (main()) for execution, and then perform clean-up after completion.

We are building a task management system, where we submit tasks, and then run a check for result manually. So the first thing we need is a simple user interface. I like the simplicity of repl-like (read-evaluate-parse-loop) interface, so we build one to maintain the simplicity of the project. To interact with our application, users can issue simple commands. We start with 2 commands below, and we are adapting from the implementation we did last week.

  1. dex <NUM>: fetch the name of a Pokémon from the Pokédex based on the number <NUM>
  2. fib <NUM>: Returns the <NUM>th Fibonacci number

Let’s start by implementing dex and fib, each represents an I/O bound and CPU-bound operation respectively.

async def dex(client: httpx.AsyncClient, id: int) -> str:
    assert isinstance(id, int)

    response = await client.get(f"https://pokeapi.co/api/v2/pokemon/{id}/")

    return f"The pokemon with id {id} is {response.json()['name']}"


def fib(nth: int) -> str:
    assert isinstance(nth, int) and nth > 0

    result = ()

    for i in range(1, nth + 1):
        match i:
            case 1:
                result = (0,)

            case 2:
                result += (1,)

            case _:
                result = result[1:] + (sum(result),)

    assert len(result) > 0

    return f"The {nth}th fibonacci number is {result[-1]}" 

For dex, we want to reuse the httpx.AsyncClient, so we adapt our code to require it in the arguments.

Tasks and Futures in Action

Photo by Carl Heyerdahl on Unsplash

Assembling the repl-like interface would need a little bit of work. We need to be able to parse the command entered by our users and schedule the correct task. For this, we use funcparserlib as featured in a previous Advent of Code article. We are skipping the implementation detail for parser and lexer in this article, and only focus on the evaluation part, as shown below:

async def evaluate(
    client: httpx.AsyncClient,
    expression: Expression,
) -> Awaitable[str] | str:
match expression:
    case Expression(command=Command.DEX):
        return asyncio.create_task(dex(client, expression.args[0]))

    case Expression(command=Command.FIB):
        return asyncio.create_task(asyncio.to_thread(fib, expression.args[0]))

    case _:
        raise Exception("Unknown command") 

We want to enable users to check the progress of tasks later. So instead of waiting until execution completes, we return them as tasks. For dex requests, it is implemented as a coroutine, hence we just wrap it as a task, and return it. However, for fib, as it is CPU-bound, it will not benefit much from asynchronous execution. As discussed last week, we need to turn it into a future, so it can be executed as a thread of process, as shown below

loop = asyncio.get_running_loop()

# get the 100th Fibonacci number
future = loop.run_in_executor(None, fib, 100)

# later in the execution
result = await future 

As part of the abstraction effort, asyncio.get_running_loop() will fetch the event loop created by asyncio.run we called earlier. However, if the code snippet is executed after the main() coroutine is completed, it will result in an error. The first argument in run_in_executor expects either a ProcessPoolExecutor, or a ThreadPoolExecutor, but will use a default ThreadPoolExecutor if None is passed. For most cases, it is sufficient to just use the default, and hence we can turn it into a task instead

fib_coro = asyncio.to_thread(fib, 100)

task = asyncio.create_task(fib_coro)


# later
result = await task

Now we have the executor done, let’s assemble the repl. We use prompt-toolkit to prompt the user for command, and then it does the following operations in sequence

  1. Read the user entered command and arguments
  2. Parse the command and arguments, and return an Expression object
  3. Evaluate the expression object, and schedule tasks with asyncio.create_task
  4. Loop back to step 1

The main coroutine, is now written as follows, with the corresponding steps are annotated as comments in the snippet

async def main() -> None:
    t, g = TOKENIZER, grammar()
    session = PromptSession()

    tasks = []

    async with httpx.AsyncClient() as client:
        # step 4: loop
        while True:
            # step 1: read input
            line = await session.prompt_async("> ", handle_sigint=False)

            match result := await thread_last(
                line,
                # step 2a: tokenize the input
                (tokenize, t),
                # step 2b: parse the input
                (parse, g),
                # step 3: evaluate
                (evaluate, client),
            ):
                case asyncio.Task():
                    tasks.append((line, result))
                    print(f"Task {len(tasks)} is submitted")

                case _:
                    print(result)

Some notes on the snippet:

  1. t is for the definition of tokens (e.g, a sequence of letters is the name of a command)
  2. g is for the definition of the commands (what dex and fib means)
  3. The thread_last call in the snippet is taken from toolz (project page here). What it does is simply evaluate(client, parse(g, tokenize(t, line))) but we keep it that way to aid readability.
  4. We collect the tasks after evaluations into a simple list, we will work with it later
  5. In the match statement, we print task ID if the evaluator returns a Task, or the returned string otherwise.

So let’s recap what we have built so far. We should be able to run the program, and then issue commands to fetch Pokémon names, and calculate Fibonacci numbers. You may have noticed, we have yet to print the result on screen.

On the other hand, we also know that main() is calling the evaluator to schedule tasks to the event loop with asyncio.create_task(). We also see await is used when the order of execution is crucial:

  1. When we perform a GET HTTP request, so we can parse the response JSON after receiving it
  2. When we are receiving a user input, as we need to pass the input to parser later
  3. We need to wait for evaluator to create a task, before we loop back to receiving another input from the user

So what are the differences between the two, when should we use asyncio.create_task() and await? As a rule of thumb, if the order of execution does not matter, or if we only care about the outcome much later, use asyncio.create_task. If the coroutine does something that is needed later as shown above, then use await to ensure it is run as soon as possible, before we move on to the next line in code.

Managing Task Execution and Displaying Results

Photo by Stephen Dawson on Unsplash

Now we can start working on the tracker, and then build a dashboard to list tasks. These are the two command we want to build next:

  1. job <NUM>: We show the result of computation, or a message indicating it is still waiting for a result
  2. dash: We draw a simple table to show some information about all submitted tasks

Starting with the actual function again, first we have one for job

def job(task: tuple[str, asyncio.Task[str]]) -> str:
    try:
        return task[-1].result()

    except asyncio.InvalidStateError:
        return "Still waiting" 

Each entry in the task list, is a tuple of user input, and the actual Task object. So in this function, we are interested in finding the result. However, if the task is still not ready, then calling .result() will raise InvalidStateError, then we can return a message to tell users that it is still waiting.

Then a dashboard to display an overview of the tasks

def dash(tasks: Sequence[tuple[str, asyncio.Task[str]]]) -> str:
    result = [
        "\t".join(("id", f"{'command':16s}", "done?", "result")),
        "\t".join(("==", f"{'=' * 16}", "=====", "======")),
    ]

    for num, (line, task) in enumerate(tasks, 1):
        result.append(
            "\t".join(
                (
                    str(num),
                    f"{line:16s}",
                    str(task.done()),
                    task.done() and str(task.result())[:16] or "",
                )
            )
        )

    return "\n".join(result)

The function simply returns a table-like report to show if tasks are done, and the corresponding result whenever applicable.

Then we need to expand the evaluator, to call the corresponding job tracking functions we just added above.

async def evaluate(
    client: httpx.AsyncClient,
    tasks: Sequence[tuple[str, asyncio.Task[str]]],
    expression: Expression,
) -> Awaitable[str] | str:
    match expression:
        case Expression(command=Command.DEX):
            return asyncio.create_task(dex(client, expression.args[0]))

        case Expression(command=Command.FIB):
            return asyncio.create_task(asyncio.to_thread(fib, expression.args[0]))

        case Expression(command=Command.JOB):
            return job(tasks[expression.args[0] - 1])

        case Expression(command=Command.DASH):
            return dash(tasks)

        case _:
            raise Exception("Unknown command")

Previously, every time a user submits a task with either dex or fib, we return a job number. Now they can check the progress with the job command with corresponding job number, or display an overview of submitted jobs with dash.

Task Cancellation and Graceful Shutdown

Photo by Campaign Creators on Unsplash

We now already have a mostly complete program. While prompt-toolkit provides some handling for some signals, we can opt to do it on our own too (hence the handle_sigint=False in our session.prompt_async). For that, we start by declaring the ShutdownHandler, to gracefully shut down all scheduled unfinished tasks:

@dataclass
class ShutdownHandler:
    signal: int | None = None

    async def __call__(self) -> None:
        logger.info("Shutting down tasks. signal:%s", self.signal or "")

        tasks = tuple(
            task for task in asyncio.all_tasks() if task is not asyncio.current_task()
        )

        for task in tasks:
            task.cancel()

        await asyncio.gather(*tasks, return_exceptions=True)

        asyncio.get_running_loop().stop()

Notice we are applying the __call__ dunder we discussed previously to avoid usage of functools.partial. In the coroutine, we are shutting down all the remaining tasks, and then stop the event loop (hence ending the asyncio.run we did in the beginning). Notice we are excluding the current coroutine so asyncio.get_running_loop().stop() will get to run.

Next, we set up the shutdown handler to run, whenever the program receives certain signals from the operating system. In our example, we are handling SIGTERM, SIGHUP and SIGINT. We can then register the ShutdownHandler we just implemented to the event loop to respond to the signals we listed.

Now we can proceed to add 2 more commands, namely

  1. kill <NUM>: Kill the job number <NUM>
  2. quit: Exit the program

The corresponding functions for both commands are shown below:

def kill(task: tuple[str, asyncio.Task[str]]) -> str:
    task[-1].cancel()

    return f'Task "{task[0]}" is killed'


def quit(shutdown_handler: Callable[..., Coroutine[None, None, None]]) -> str:
    asyncio.create_task(shutdown_handler())

    return "Exiting"

Every time the method .cancel() is called on a task, a CancelledError is raised. If the task is executing SQL statements in a transaction, this serves as an opportunity to quit the transaction, and rollback to undo changes done so far. But in some cases, we can just ignore the error and quit quietly. For instance, we can just suppress the error in our main() coroutine. While we revisit main(), we need to also ensure init() is called to set up the ShutdownHandler().

from contextlib import suppress

def init() -> None:
    loop = asyncio.get_running_loop()

    for sig in (signal.SIGTERM, signal.SIGHUP, signal.SIGINT):
        handler = ShutdownHandler(sig)

        loop.add_signal_handler(sig, lambda: asyncio.create_task(handler()))


async def main() -> None:
    # skip the CancelledError
    with suppress(asyncio.CancelledError):
        init()

        print("\n".join(welcome_text()))

        t, g = TOKENIZER, grammar()
        session = PromptSession()

        tasks = []

        async with httpx.AsyncClient() as client:
            while True:
                line = await session.prompt_async("> ", handle_sigint=False)

                match result := await thread_last(
                    line,
                    (tokenize, t),
                    (parse, g),
                    (evaluate, client, tasks, ShutdownHandler()),
                ):
                    case asyncio.Task():
                        tasks.append((line, result))
                        print(f"Task {len(tasks)} is submitted")

                    case _:
                        print(result)

And the updated evaluator function is slightly revised to add support for the two new commands:

async def evaluate(
    client: httpx.AsyncClient,
    tasks: Sequence[tuple[str, asyncio.Task[str]]],
    shutdown_handler: Callable[..., Coroutine[None, None, None]],
    expression: Expression,
) -> Awaitable[str] | str:
    match expression:
        case Expression(command=Command.DEX):
            return asyncio.create_task(dex(client, expression.args[0]))

        case Expression(command=Command.FIB):
            return asyncio.create_task(asyncio.to_thread(fib, expression.args[0]))

        case Expression(command=Command.JOB):
            return job(tasks[expression.args[0] - 1])

        case Expression(command=Command.KILL):
            return kill(tasks[expression.args[0]] - 1)

        case Expression(command=Command.DASH):
            return dash(tasks)

        case Expression(command=Command.QUIT):
            return quit(shutdown_handler)

        case _:
            raise Exception("Unknown command")

Background Tasks and Event Management

Photo by Markus Spiske on Unsplash

The implementation seems straightforward, as we are constantly interacting with the main() coroutine. It is always waiting for user commands, and by exiting we just terminate the infinite while loop. However, there are cases where the application would just run autonomously. For instance we have a simple application as follows:

async def background_task():
    while True:
        await asyncio.sleep(10)

        print("Print something")


async def main():
    # send work to execute in the background
    asyncio.create_task(background_task())

    # do some work
    ultimate_answer = 42

    # hmm, we are done here


if __name__ == '__main__':
    asyncio.run(main()) 

A similar pattern can be observed in the article where we discussed chatbot programming, where the background_task is a coroutine to consume messages from a queue. In this mini example program, it will end silently, and the first print() in background_task() will not happen, because main() ended so soon. In the past, I have seen people attaching another infinite loop at the end of main() so background_task() gets to run indefinitely.

async def main():
    # send work to execute in the background
    asyncio.create_task(background_task())

    # do some work
    ultimate_answer = 42

    # let's pretend we are doing work
    while True:
        pass

While it seems to work, such that the application seems to continue to run. However, background_task() is not printing because the infinite loop in main() is blocking other coroutines from running. As a workaround, we could replace pass with something like await asyncio.sleep(2), but it felt like a hack and I don’t quite like it.

Alternatively, we could create an asyncio.Event object, and then pass it to main(). Then we adapt the ShutdownHandler discussed previously, to receive an event object, as shown below:

@dataclass
class ShutdownHandler:
    exit_event: asyncio.Event
    signal: int | None = None

    async def __call__(self) -> None:
        logger.info("Shutting down tasks. signal:%s", self.signal or "")
        self.exit_event.set()


async def main(exit_event: asyncio.Event):
    loop = asyncio.get_running_loop()

    for sig in (signal.SIGTERM, signal.SIGHUP, signal.SIGINT):
        handler = ShutdownHandler(exit_event, sig)

        loop.add_signal_handler(sig, lambda: asyncio.create_task(handler()))

    # send work to execute in the background
    task = asyncio.create_task(background_task())

    # do some work
    ultimate_answer = 42

    await exit_event.wait()
    task.cancel()


if __name__ == "__main__":
    asyncio.run(main(asyncio.Event()))

So now main() now receives an event, setup the ShutdownHandler, and then upon receiving from the event, cancel the background_task(). For completeness’ sake, we can also handle the CancelledError in background_task, as follows:

async def background_task():
    try:
        while True:
            await asyncio.sleep(10)

            print("Print something")

    except asyncio.CancelledError:
        print("Bye bye")

Handling Exceptions

Photo by David Pupăză on Unsplash

The one last thing I want to discuss, before we conclude, is on error handling. When we work with asyncio enough, we will notice if an awaitable running in background fails, it is not always obvious to us. Fortunately, like signals, we can also attach a handler if an exception is raised. Back to our example project, let’s implement an ExceptionHandler for the purpose.

@dataclass
class ExceptionHandler:
    shutdown_handler: ShutdownHandler

    def __call__(self, loop, context) -> None:
        logger.error("Application failed. context:%s", context)

        asyncio.create_task(self.shutdown_handler())

Then, add a new line to register the handler after the signal handler registration

async def init():
    ...

    loop.set_exception_handler(ExceptionHandler(ShutdownHandler())) 

While this is a more a catch-all workaround, we should still properly handle exceptions in all our coroutines. For instance, in addition to CancelledError, we should probably also handle errors corresponding to network problems in dex. However we are keeping our example concise to avoid making this article into an epic novel.

Conclusion: Mastering Asynchronous Task Orchestration

Photo by Martin Wilner on Unsplash

That’s all I want to cover in this article. While we spent quite some time building the application, we still barely scratched the surface of the capability of AsyncIO. We discussed about how to start executing a coroutine, and then from there how to queue more tasks and awaitables into the event loop. We also discussed about exiting the program gracefully, and how to handle errors.

I owe a lot to this series of articles by Lynn Root while I was re-learning AsyncIO when rewriting bigmeow, the chatbot project. Some part of the code in our example project today is adapted from her implementation. The complete project can be found at GitHub, with some slight changes.

Next week we will expand this example project, to do some comparison between concurrency and parallelism. If AsyncIO is helpful for I/O-bound operations (like dex in our example), then we need to find out a more efficient way to do CPU-bound operations (fib in our example). If you are interested in knowing more, feel free to subscribe to the newsletter.

Thank you so much for reading this far, and I shall write again, next week.


To ensure clarity and address potential language errors, this article received editorial assistance from a large language model. Please note that all ideas and concepts presented herein are my own. The code examples provided were not generated or edited by an LLM. If you’re interested in project collaboration or job opportunities, feel free to reach out to me via a message here on Medium or through LinkedIn for further discussion.


Comments

Leave a Reply

Your email address will not be published. Required fields are marked *