Intra-task vs multi-task concurrency with futures and tokio in rust

4 minute read 05 April 2024

Those are personal notes about concurrency in rust with futures and tokio.

Those notes were driven by some discussion with Alice Ryhl on discord, talking about the difference between intra-task concurrency vs multi-task concurrency.

Intra-task concurrency

I was using futures::future::join_all, thinking that it would run child-futures in parallel. Coming from a scala background, I was expecting the same behavior as Future.sequence in scala.

But it's not exactly the case. join_all will run futures concurrently, but not in parallel. When using tokio, it will run them in the same tokio task. Whenever a child-future is waiting for IO, the task will switch to another future. In my application where I'm dealing with non-blocking IO, join_all has the desired effect of running futures concurrently. But if I would have blocking futures, I would need to use a different approach.

Example simulating a CPU-bound (or IO blocking) job:

use futures::future::join_all;

#[tokio::main]
async fn main() {
    println!("Starting jobs...");

    let handles = (1..=5).map(blocking_job);

    // intra-task concurrency
    let handle = join_all(handles);
    handle.await;

    println!("All jobs finished");
}

async fn blocking_job(i: i32) {
    println!("Job {i} started");
    // This will block the current thread
    std::thread::sleep(std::time::Duration::from_secs(1));
    println!("Job {i} finished");
}

This program prints out:

Starting jobs...
Job 1 started
Job 1 finished
Job 2 started
Job 2 finished
Job 3 started
Job 3 finished
Job 4 started
Job 4 finished
Job 5 started
Job 5 finished
All jobs finished
cargo run  0,05s user 0,01s system 1% cpu 5,057 total

The blocking jobs are run sequentially.

If instead of blocking jobs, we have non-blocking IO jobs:

use futures::future::join_all;

#[tokio::main]
async fn main() {
    println!("Starting jobs...");

    let handles = (1..=5).map(non_blocking_job);

    // intra-task concurrency
    let handle = join_all(handles);
    handle.await;

    println!("All jobs finished");
}

async fn non_blocking_job(i: i32) {
    println!("Job {i} started");
    // simulating waiting for IO
    tokio::time::sleep(std::time::Duration::from_secs(1)).await;
    println!("Job {i} finished");
}

It produces the expected output:

Starting jobs...
Job 1 started
Job 2 started
Job 3 started
Job 4 started
Job 5 started
Job 1 finished
Job 2 finished
Job 3 finished
Job 4 finished
Job 5 finished
All jobs finished
cargo run  0,74s user 0,09s system 53% cpu 1,555 total

join_all is creating a future. When we poll this future, it will poll one of the futures in the list. If the future is waiting for IO (in a non-blocking way), it will switch to another future. This is intra-task concurrency.

Multi-task concurrency

Citation from Alice Ryhl:

Alice Ryhl — 03/10/2024 10:19 AM — Concurrency within a single task is just inferior to spawning several tasks. It performs worse, it breaks some things such as block_in_place, prevents parallelism, doesn't show them in tokio-console, and so on.

So to achieve parallelism, it's better to spawn tasks. Here is an example:

use tokio::task::JoinSet;

#[tokio::main]
async fn main() {
    println!("Starting jobs...");

    let handles = (1..=5).map(blocking_job);

    let mut set = JoinSet::new();
    for handle in handles {
        set.spawn(handle);
    }

    while let Some(_res) = set.join_next().await {
        _res.unwrap();
        println!("Task finished");
    }

    println!("All jobs finished");
}

async fn blocking_job(i: i32) {
    println!("Job {i} started");
    // This will block the current thread
    std::thread::sleep(std::time::Duration::from_secs(1));
    println!("Job {i} finished");
}

Instead of using join_all, we are using JoinSet. This will spawn tasks in parallel. The output is:

Starting jobs...
Job 1 started
Job 2 started
Job 3 started
Job 4 started
Job 5 started
Job 2 finished
Job 1 finished
Job 5 finished
Job 4 finished
Job 3 finished
Task finished
Task finished
Task finished
Task finished
Task finished
All jobs finished
cargo run  0,76s user 0,10s system 55% cpu 1,544 total

It's achieving parallelism with blocking and non-blocking jobs.