MAVSDK – Python: easy asyncio

MAVSDK-Python: easy asyncio

Author: Jonas Vautherin | Auterion Software Engineer and MAVSDK core maintainer

In my last post, we went over the basics for getting started with MAVSDK-Python. I’m assuming that you have a functional setup and that you were able to run some of the examples. We will now go a bit deeper into the mechanics behind asyncio.

What is asyncio and why do we need it?

As defined in the python documentation, asyncio is a library to write concurrent code using the async/await syntax. There are different approaches to asynchronous programming, such as threads, callbacks, etc (I found this a good short read). Asyncio uses “coroutines”, which are essentially functions (“subroutines” in python) that can be paused and resumed later. The point of that is to be able to run coroutines in parallel: the CPU runs part of a coroutine for a while, then pauses it and moves to another coroutine. The concept is a bit similar to what happens with threads, except that asyncio runs on only one thread (and still provides parallelism)!

Working with asyncio is not so different than working with normal, synchronous code. The difference is that we work with those coroutines, and therefore we need to use the corresponding syntax. Don’t worry though, it’s as easy as adding the async and await keywords in front of function definitions and function calls. But we will get back to that in the next section. Right now I would like to motivate the reason for using asyncio in MAVSDK-Python.

Let’s make a thought experiment, and think of some synchronous code that would arm the drone and takeoff (this code is not MAVSDK-Python code, so it will not work):

drone.arm()
drone.takeoff()

That’s all good, and we could make quite a few things in such a script: upload a mission, start it, etc. But at some point we may want to print the flight mode when it changes, while the script is running. We could try to do something like this:

for flight_mode in drone.flight_mode():
    print(f"Flight mode: {flight_mode}")

Or, if we imagine a polling interface, something like this:

while True:
    print(f"Flight mode: {drone.flight_mode()}")

But both those snippets are infinite loops, right? If you put that loop at the beginning of your script, then the script will just stay in this loop forever. And if you put it at the end, then it will not do anything useful, because it will start only after the script has been run already. What you want is to run this in parallel to the script. So typically you would run that in a thread.

That’s where asyncio is useful: it is an alternative to threads which is (amongst other benefits) arguably easier to use for concurrent programming. Still, it comes with some new syntax, which may make it a bit intimidating for writing simple scripts that would not usually require threads. But I strongly believe that this syntax is easy to learn, and totally worth it (e.g. coroutines exist in other languages like Javascript or Kotlin)!

My goal today will be to show you how easy it is to use asyncio. However, we build MAVSDK for the community, and we could consider providing a synchronous interface in MAVSDK-Python instead. Feel free to give us feedback, because it does matter to us (#mavsdk on Slack would be a good place for that)!

It looks just like synchronous code

Remember the takeoff_and_land.py example script from last time? I argue that it does show everything you need to know in order to write most small scripts that don’t require parallelism. Let’s have a look at it, again:

#!/usr/bin/env python3

import asyncio
from mavsdk import System


async def run():

    drone = System()
    await drone.connect(system_address="udp://:14540")

    print("Waiting for drone...")
    async for state in drone.core.connection_state():
        if state.is_connected:
            print(f"Drone discovered with UUID: {state.uuid}")
            break

    print("-- Arming")
    await drone.action.arm()

    print("-- Taking off")
    await drone.action.takeoff()

    await asyncio.sleep(5)

    print("-- Landing")
    await drone.action.land()


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run())

Now, let’s think about what the script would look like if it was not using coroutines (i.e. if MAVSDK was not exposing an asynchronous interface). Of course that code will not work, but it is to get an idea:

#!/usr/bin/env python3

import time
from mavsdk import System


def run():

    drone = System()
    drone.connect(system_address="udp://:14540")

    print("Waiting for drone...")
    for state in drone.core.connection_state():
        if state.is_connected:
            print(f"Drone discovered with UUID: {state.uuid}")
            break

    print("-- Arming")
    drone.action.arm()

    print("-- Taking off")
    drone.action.takeoff()

    time.sleep(5)

    print("-- Landing")
    drone.action.land()


if __name__ == "__main__":
    run()

See the difference? Remove the event loop initialization, the async and await keywords, and you have normal, synchronous code.

Calling a coroutine with await drone.action.arm() works exactly like calling synchronous code with drone.action.arm(). Nothing happens behind your back, both are blocking calls. The only difference is that in asyncio, drone.action.arm() returns a coroutine that you need to await for. If you forget to do it, the interpreter will complain at runtime:

RuntimeWarning: coroutine ‘Action.arm’ was never awaited
drone.action.arm()

This is trivial to fix, since it also gives you the line of code at which this happened. Just go there, and add await.

One may wonder how to deal with a return value in a coroutine. And that’s trivial, as shown in the firmware_version.py example:

info = await drone.info.get_version()
print(info)

Now that we have seen how to work with coroutines (and realized that they behave exactly like normal function calls), let’s talk about the second construct existing in MAVSDK-Python: async generators. In the example, that’s this part:

    print("Waiting for drone...")
    async for state in drone.core.connection_state():
        if state.is_connected:
            print(f"Drone discovered with UUID: {state.uuid}")
            break

Again, it is blocking, and it behaves exactly like a normal for loop. Except that you need the async keyword, this time (it is an “async generator”). If you forget the keyword, the interpretor complains, again, and tells you where it is missing:

for state in drone.core.connection_state():
TypeError: ‘async_generator’ object is not iterable

Just for comparison, here is how this code could look like with a polling interface (i.e. similar to DroneKit):

    print("Waiting for drone...")
    while True:
        state = drone.core.connection_state
        if state.is_connected:
            print(f"Drone discovered with UUID: {state.uuid}")
            break

That is not very different, is it? You will find another example using async generators in the example directory: telemetry_flight_mode.py.

That’s it, we have seen the two (and only two) constructs exposed by MAVSDK-Python! How can I be so sure? Because we auto-generate the code, and we only ever have those two cases: a “normal” function call, that may or may not return a value (await drone.action.arm() vs info = await drone.info.get_version()), and a “callback”, that becomes an async generator and hence looks like a for loop.

There is another thing that may look a bit peculiar at first: the event loop. What is this weird loop.run_until_complete(run())? Asyncio has exactly one event loop responsible for handling tasks running in parallel. At some point, you need to register tasks if you want your script to do something. As long as you only run one task (equivalent to having only one thread), then all you need is to pass the run() function to the loop at the beginning of the script, and that’s all. And don’t worry: like starting a thread, you cannot start a parallel task without knowing it (that requires to run something like asyncio.ensure_future() or asyncio.gather()).

It means that you can always start you script with the following structure, and you don’t have to think about the event loop:

#!/usr/bin/env python3

import asyncio
from mavsdk import System


async def run():
    print("Write your code here")


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run())

You can then really write any python code you want, like:

#!/usr/bin/env python3

import asyncio
import time


async def run():
    print("This is the beginning of the script")
    time.sleep(2)

    user_name = input("What is your name? ")
    print(f"Hello, {user_name}!")


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run())

I hope that this first part helped understanding how working with asyncio is not so different than working with a synchronous interface. Until now, we have seen how to:

  • Setup a run() function acting as the “main” or “entrypoint” of the script
  • Call a coroutine (one of the only two constructs exposed by MAVSDK)
  • Use an async generator (the second construct exposed by MAVSDK)

We have also learned that as long as we only use one task, then we can run python calls like we always did, except that we need to use the async and await keywords. In some way, using asyncio with only one task is like not using asyncio at all, except that you still need to start the event loop and use the asyncio syntax.

Last precision: running only one task does not mean having only one function. It means that you only ever register run() to the event loop. So don’t use run_until_complete(), asyncio.ensure_future() or asyncio.gather() unless you understand what you are doing (we will talk about that in the next section).

Here is an example where I moved all the blocs into their own function:

#!/usr/bin/env python3

import asyncio
from mavsdk import System


async def run():
    drone = System()
    await drone.connect(system_address="udp://:14540")

    await wait_for_drone(drone)
    await arm(drone)
    await takeoff(drone)
    await land(drone)

async def wait_for_drone(drone):
    print("Waiting for drone...")
    async for state in drone.core.connection_state():
        if state.is_connected:
            print(f"Drone discovered with UUID: {state.uuid}")
            break

async def arm(drone):
    print("-- Arming")
    await drone.action.arm()

async def takeoff(drone):
    print("-- Taking off")
    await drone.action.takeoff()
    await asyncio.sleep(5)

async def land(drone):
    print("-- Landing")
    await drone.action.land()


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run())

Running parallel tasks: asyncio “threading”

Asyncio enables running multiple tasks in parallel, but it is all running on only one thread. The technique used is called cooperative multitasking. Instead of having multiple threads running code in parallel, with the system deciding when it wants to switch to another thread, cooperative multitasking requires the tasks to be “cooperative” in that they need to leave opportunities for the event loop to give control to other tasks. Such an opportunity is hinted by the await keyword.

Let’s look at a simple example involving two tasks:

#!/usr/bin/env python3

import asyncio


async def run():
    print("This is the beginning of the script")

    print("Starting the secondary task...")
    secondary_task = asyncio.ensure_future(secondary_fun())

    await asyncio.sleep(1)
    print("Doing something in the run() function")
    await asyncio.sleep(1)

    print("Waiting for 'secondary_task' to finish...")
    await secondary_task

    print("secondary_fun() finished, ready to exit!")

async def secondary_fun():
    for i in range(0, 6):
        print(f"iteration {i}")
        await asyncio.sleep(0.5)


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run())

It will output:

This is the beginning of the script
Starting the secondary task…
iteration 0
iteration 1
Doing something in the run() function
iteration 2
iteration 3
Waiting for ‘secondary_task’ to finish…
iteration 4
iteration 5
secondary_fun() finished, ready to exit!

This example introduces asyncio.ensure_future(secondary_fun()), which starts the secondary_fun() function as a parallel task (a bit like a thread). And just like with threads, we want to wait for the thread to finish before our “main” function ends, otherwise the program will stop too early. The way we “join” a task is by awaiting it: secondary_task = asyncio.ensure_future(secondary_fun()) starts seconday_fun() in a new parallel task and returns a handle to it. At the end of run(), we await secondary_task, which will effectively block until secondary_fun() returns. Just like joining a thread.

From the output, we clearly see that we have two functions running in parallel. An important question is: when does the event loop switch between run() and secondary_fun()? And the answer is: it may do it everytime the active task awaits on something. It does not mean that it has to, but those are the places where it can happen. We are not necessarily interested in knowing exactly how it behaves. What’s important is to realize when a task can block the other.

It turns out that await asyncio.sleep(1) is a perfect tool to understand that. It is the cooperative equivalent of time.sleep(1). Feel free to replace its occurences in run() with time.sleep(1), and observe that the output is different:

#!/usr/bin/env python3

import asyncio
import time

from mavsdk import System


async def run():
    print("This is the beginning of the script")

    print("Starting the secondary task...")
    secondary_task = asyncio.ensure_future(secondary_fun())

    time.sleep(1)
    print("Doing something in the run() function")
    time.sleep(1)

    print("Waiting for 'secondary_task' to finish...")
    await secondary_task

    print("secondary_fun() finished, ready to exit!")

async def secondary_fun():
    for i in range(0, 6):
        print(f"iteration {i}")
        await asyncio.sleep(0.5)


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run())

This is the beginning of the script
Starting the secondary task…
Doing something in the run() function
Waiting for ‘secondary_task’ to finish…
iteration 0
iteration 1
iteration 2
iteration 3
iteration 4
iteration 5
secondary_fun() finished, ready to exit!

Note how secondary_fun() did not get a chance to run until run() finally awaited for it! That is because time.sleep(1) does not release control (note that there is no await keyword involved with it)! And you can’t just await time.sleep(1), because it has to be an awaitable.

What we can conclude from this is that it is fine to run non-asynchronous (here meaning “non-cooperative”) code, but it will not release control for the other tasks that are supposed to run in parallel. Some small calls are fine: print() is not cooperative, but it is so quick to run that we don’t really care. Because await asyncio.sleep(60) takes one minute to complete, we probably care about not blocking the parallel tasks during that time.

You will find more examples using parallel tasks in the example directory: telemetry_is_armed_is_in_air.py is a very simple example running two tasks in parallel. Very similar but with more tasks, you’ll find telemetry.py. And finally, we have more complete examples showing how to use coroutines and async generators with parallel tasks in telemetry_takeoff_and_land.py and mission.py.

You can still use synchronous libraries

One question that quickly comes when using asyncio is whether or not one can use non-asyncio libraries. And the short answer is that yes, it is possible! There is just one thing to be aware of: a call to a synchronous library is not “cooperative”, so it will block the parallel tasks while it runs. Let’s enumerate the different situations that can happen:

  1. You have only one task (i.e. you only registered run() to the event loop and nothing more)
  2. You have parallel tasks, and want to make a short call to a synchronous library
  3. You have parallel tasks, and want to make a long call to a synchronous library

First case is easy: there is no problem at all. As we have seen above, if there is only one task, it means that there is no parallelism. Just call whatever you want there!

Second case would be similar to calling print(): it is not “cooperative”, but it is super fast, so it does not really count as “blocking parallel tasks”. Feel free to just make that call.

Third case is more tricky: you have a synchronous library that will make a blocking call that may take time (e.g. download a file). If you make that call, all the tasks will be blocked until it returns, i.e. they don’t run in parallel anymore. One solution is to run that call in a ThreadPoolExecutor.

Let’s do that in an example. It uses the same template as above: a run() function, and a secondary function secondary_fun() running in a parallel task.

#!/usr/bin/env python3

import asyncio

from mavsdk import System


async def run():
    print("This is the beginning of the script")

    print("Starting the secondary task...")
    asyncio.ensure_future(secondary_fun())

    await long_async_fun()

async def secondary_fun():
    i = 0
    while True:
        print(f"iteration {i}")
        i += 1
        await asyncio.sleep(0.5)

async def long_async_fun():
    await asyncio.sleep(5)


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run())

This time we don’t wait for secondary_fun() to finish, so the script just exits when run() is finished. But run() calls long_async_fun(), which simulates an asynchronous call to a function that would do something during 5 seconds (we do await asyncio.sleep(5) for the example, but imagine a file download or similar). Because long_async_fun() is asynchronous/cooperative, secondary_fun() gets a chance to run in the meantime. The output of this program is the following:

This is the beginning of the script
Starting the secondary task…
iteration 0
iteration 1
iteration 2
iteration 3
iteration 4
iteration 5
iteration 6
iteration 7
iteration 8
iteration 9

Now, let’s instead replace that by a long_sync_fun(), which is not cooperative:

#!/usr/bin/env python3

import asyncio
import time

from mavsdk import System


async def run():
    print("This is the beginning of the script")

    print("Starting the secondary task...")
    asyncio.ensure_future(secondary_fun())

    long_sync_fun()

async def secondary_fun():
    i = 0
    while True:
        print(f"iteration {i}")
        i += 1
        await asyncio.sleep(0.5)

# UNUSED
async def long_async_fun():
    await asyncio.sleep(5)

def long_sync_fun():
    time.sleep(5)

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run())

Note how long_sync_fun() is not defined with the async keyword, and how we don’t call it with await. That’s because it is not an asynchronous function. So while it sleeps for 5 seconds, it will not release control and secondary_fun() will not be able to run. Let’s look at the output:

This is the beginning of the script
Starting the secondary task…
iteration 0

If you run the code, you’ll see that it sleeps for 5 seconds between “Starting the secondary task…” and “iteration 0”. That’s what we expected: secondary_fun() only started after long_sync_fun() returned (instead of running in parallel), and could only make one iteration before run() finished and the script exited. This is the kind of long synchronous calls that we want to run in the background using a ThreadPoolExecutor:

#!/usr/bin/env python3

import asyncio
import concurrent.futures
import time

from mavsdk import System


async def run():
    print("This is the beginning of the script")

    print("Starting the secondary task...")
    asyncio.ensure_future(secondary_fun())

    await wrapped_long_sync_fun()

async def secondary_fun():
    i = 0
    while True:
        print(f"iteration {i}")
        i += 1
        await asyncio.sleep(0.5)

# UNUSED
async def long_async_fun():
    await asyncio.sleep(5)

async def wrapped_long_sync_fun():
    executor = concurrent.futures.ThreadPoolExecutor()
    await loop.run_in_executor(executor, long_sync_fun)

def long_sync_fun():
    time.sleep(5)

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run())

I wrapped long_sync_fun() into wrapped_long_sync_fun() here, which simply runs it into the executor. loop.run_in_executor returns a coroutine that runs the function in a separate thread, so I effectively created an asynchronous function out of long_sync_fun()! It is not perfect, because it is using a thread under the hood, but that’s an easy way to bridge between asynchronous code and a synchronous library. Of course, the executor could be shared between multiple synchronous functions if necessary, there is no need to create a new one every time.

Let’s verify that it works by looking at the output:

This is the beginning of the script
Starting the secondary task…
iteration 0
iteration 1
iteration 2
iteration 3
iteration 4
iteration 5
iteration 6
iteration 7
iteration 8
iteration 9

And it does! The output is exactly the same as what we got when running await asyncio.sleep(5), except that this time we are running the synchronous time.sleep(5) inside a ThreadPoolExecutor.

I hope that I managed to prove that running a single task in asyncio is trivial, and that dealing with parallel tasks is actually pretty easy once we know the syntax. Happy coding!

2020-02-19T20:21:35+00:00