Those are personal notes about concurrency in rust with futures and tokio.
Those notes were driven by some discussion with Alice Ryhl on discord, talking about the difference between intra-task concurrency vs multi-task concurrency.
Intra-task concurrency
I was using futures::future::join_all
, thinking that it would run child-futures in parallel. Coming from a scala background, I was expecting the same behavior as Future.sequence
in scala.
But it's not exactly the case. join_all
will run futures concurrently, but not in parallel. When using tokio, it will run them in the same tokio task. Whenever a child-future is waiting for IO, the task will switch to another future. In my application where I'm dealing with non-blocking IO, join_all
has the desired effect of running futures concurrently. But if I would have blocking futures, I would need to use a different approach.
Example simulating a CPU-bound (or IO blocking) job:
use futures::future::join_all;
#[tokio::main]
async fn main() {
println!("Starting jobs...");
let handles = (1..=5).map(blocking_job);
// intra-task concurrency
let handle = join_all(handles);
handle.await;
println!("All jobs finished");
}
async fn blocking_job(i: i32) {
println!("Job {i} started");
// This will block the current thread
std::thread::sleep(std::time::Duration::from_secs(1));
println!("Job {i} finished");
}
This program prints out:
Starting jobs...
Job 1 started
Job 1 finished
Job 2 started
Job 2 finished
Job 3 started
Job 3 finished
Job 4 started
Job 4 finished
Job 5 started
Job 5 finished
All jobs finished
cargo run 0,05s user 0,01s system 1% cpu 5,057 total
The blocking jobs are run sequentially.
If instead of blocking jobs, we have non-blocking IO jobs:
use futures::future::join_all;
#[tokio::main]
async fn main() {
println!("Starting jobs...");
let handles = (1..=5).map(non_blocking_job);
// intra-task concurrency
let handle = join_all(handles);
handle.await;
println!("All jobs finished");
}
async fn non_blocking_job(i: i32) {
println!("Job {i} started");
// simulating waiting for IO
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
println!("Job {i} finished");
}
It produces the expected output:
Starting jobs...
Job 1 started
Job 2 started
Job 3 started
Job 4 started
Job 5 started
Job 1 finished
Job 2 finished
Job 3 finished
Job 4 finished
Job 5 finished
All jobs finished
cargo run 0,74s user 0,09s system 53% cpu 1,555 total
join_all
is creating a future. When we poll this future, it will poll one of the futures in the list. If the future is waiting for IO (in a non-blocking way), it will switch to another future. This is intra-task concurrency.
Multi-task concurrency
Citation from Alice Ryhl:
Alice Ryhl — 03/10/2024 10:19 AM — Concurrency within a single task is just inferior to spawning several tasks. It performs worse, it breaks some things such as block_in_place, prevents parallelism, doesn't show them in tokio-console, and so on.
So to achieve parallelism, it's better to spawn tasks. Here is an example:
use tokio::task::JoinSet;
#[tokio::main]
async fn main() {
println!("Starting jobs...");
let handles = (1..=5).map(blocking_job);
let mut set = JoinSet::new();
for handle in handles {
set.spawn(handle);
}
while let Some(_res) = set.join_next().await {
_res.unwrap();
println!("Task finished");
}
println!("All jobs finished");
}
async fn blocking_job(i: i32) {
println!("Job {i} started");
// This will block the current thread
std::thread::sleep(std::time::Duration::from_secs(1));
println!("Job {i} finished");
}
Instead of using join_all
, we are using JoinSet
. This will spawn tasks in parallel. The output is:
Starting jobs...
Job 1 started
Job 2 started
Job 3 started
Job 4 started
Job 5 started
Job 2 finished
Job 1 finished
Job 5 finished
Job 4 finished
Job 3 finished
Task finished
Task finished
Task finished
Task finished
Task finished
All jobs finished
cargo run 0,76s user 0,10s system 55% cpu 1,544 total
It's achieving parallelism with blocking and non-blocking jobs.