concurrency with futures in rust: borrowing parameters

7 minute read 24 April 2024

Following Intra-task vs multi-task concurrency with futures and tokio in rust, I wanted to migrate some code from intra-task to multi-tasks concurrency. But I have some issues with borrowing parameters that I explore in this post.

Concurrency & borrowing with futures::future::join_all

When using futures::future::join_all, I can run futures concurrently, and they can borrow the same parameters. Here is a simplified example where all hello futures borrow &who:

use futures::future::join_all;

#[tokio::main]
async fn main() {
    call_say_hello().await;
}

async fn call_say_hello() {
    println!("Starting jobs...");

    let world = "world";
    let results = say_hello(world).await;

    let length = results.len();
    let all_results = results.join(", ");
    println!("All jobs finished with {length} results: {all_results}");
}

async fn say_hello(who: &str) -> Vec<String> {
    let handles = (1..=5).map(|i| hello(i, who));

    // intra-task concurrency
    let handle = join_all(handles);
    let results = handle.await;

    results
}

async fn hello(i: i32, who: &str) -> String {
    println!("Job {i} started");
    tokio::time::sleep(std::time::Duration::from_secs(1)).await;
    let result = format!("Task {i}: Hello {who}");
    println!("Job {i} finished with result {result}");
    result
}

Running this program prints out:

Starting jobs...
Job 1 started
Job 2 started
Job 3 started
Job 4 started
Job 5 started
Job 1 finished with result Task 1: Hello world
Job 2 finished with result Task 2: Hello world
Job 3 finished with result Task 3: Hello world
Job 4 finished with result Task 4: Hello world
Job 5 finished with result Task 5: Hello world
All jobs finished with 5 results: Task 1: Hello world, Task 2: Hello world, Task 3: Hello world, Task 4: Hello world, Task 5: Hello world

As we saw in Intra-task vs multi-task concurrency with futures and tokio in rust: if we use some blocking IO in hello, for example by usingh std::thread::sleep(std::time::Duration::from_secs(1)); instead of tokio::time::sleep(std::time::Duration::from_secs(1)).await;, the program will run the jobs sequentially.

Starting jobs...
Job 1 started
Job 1 finished with result Task 1: Hello world
Job 2 started
Job 2 finished with result Task 2: Hello world
Job 3 started
Job 3 finished with result Task 3: Hello world
Job 4 started
Job 4 finished with result Task 4: Hello world
Job 5 started
Job 5 finished with result Task 5: Hello world
All jobs finished with 5 results: Task 1: Hello world, Task 2: Hello world, Task 3: Hello world, Task 4: Hello world, Task 5: Hello world

Now let's try to use multi-tasks concurrency to achieve parallelism.

Concurrency & borrowing with tokio::spawn

When using tokio::spawn, I can run futures concurrently, but they cannot borrow the same parameters. say_hello becomes:

async fn say_hello(who: &str) -> Vec<String> {
    let handles = (1..=5).map(|i| hello(i, who));

    let mut set = JoinSet::new();
    for handle in handles {
        set.spawn(handle);
    }

    let mut results = Vec::new();
    while let Some(res) = set.join_next().await {
        results.push(res.unwrap());
    }

    results
}

This does not compile, as tokio Task needs to own their parameters:

error: lifetime may not live long enough
  --> src/main.rs:23:35
   |
22 | async fn say_hello(who: &str) -> Vec<String> {
   |                         - let's call the lifetime of this reference `'1`
23 |     let handles = (1..=5).map(|i| hello(i, who));
   |                                   ^^^^^^^^^^^^^ returning this value requires that `'1` must outlive `'static`

To achieve what we want, we may be able to use the actor pattern with tokio. One actor would own the who parameter and send it to the hello actor. I've tried to implement that, but without any success.

use tokio::sync::{mpsc, oneshot};

struct MyActor<'actor> {
    receiver: mpsc::Receiver<ActorMessage>,
    i: i32,
    who: &'actor str,
}
enum ActorMessage {
    SayHello { respond_to: oneshot::Sender<String> },
}

impl<'actor> MyActor<'actor> {
    fn new(receiver: mpsc::Receiver<ActorMessage>, i: i32, who: &'actor str) -> Self {
        MyActor { receiver, i, who }
    }
    async fn handle_message(&mut self, msg: ActorMessage) {
        match msg {
            ActorMessage::SayHello { respond_to } => {
                let i = self.i;
                let who = self.who;
                println!("Job {i} started");
                tokio::time::sleep(std::time::Duration::from_secs(1)).await;
                // std::thread::sleep(std::time::Duration::from_secs(1));
                let result = format!("Task {i}: Hello {who}");
                println!("Job {i} finished with result {result}");
                let _ = respond_to.send(result);
            }
        }
    }
}

async fn run_my_actor<'a>(mut actor: MyActor<'a>) {
    while let Some(msg) = actor.receiver.recv().await {
        actor.handle_message(msg).await;
    }
}

#[derive(Clone)]
pub struct MyActorHandle<'handle> {
    sender: mpsc::Sender<ActorMessage>,
    lifetime: std::marker::PhantomData<&'handle ()>,
}

impl<'handle> MyActorHandle<'handle> {
    pub fn new<'actor: 'handle>(i: i32, who: &'actor str) -> Self {
        let (sender, receiver) = mpsc::channel(8);
        let actor = MyActor::new(receiver, i, who);
        tokio::spawn(run_my_actor(actor));

        let lifetime = std::marker::PhantomData::<&'handle ()>;

        Self { sender, lifetime }
    }

    pub async fn hello(&self) -> String {
        let (send, recv) = oneshot::channel();
        let msg = ActorMessage::SayHello { respond_to: send };

        let _ = self.sender.send(msg).await;
        recv.await.expect("Actor task has been killed")
    }
}

I cannot get a way of telling the rust compiler that the reference to who outlives the lifetime of the actor.

By thinking again about it, I think that actors are better suited when a actor owns a value, like a resource. I'm not sure it's good for what I'm trying to achieve.

And I got confirmation from this video about tokio actors: https://youtu.be/fTXuGRP1ee4?t=754

Concurrency & borrowing with async_scoped

Another solution is to use the async_scoped crate. This crate allows us to spawn async tasks with a lifetime that is bound to the current scope. This way, we can borrow the who parameter:

async fn say_hello(who: &str) -> Vec<String> {
    let handles = (1..=5).map(|i| hello(i, who));

    let (_, results) = unsafe {
        async_scoped::TokioScope::scope_and_collect(|s| {
            for handle in handles {
                s.spawn(handle);
            }
        })
        .await
    };

    let results = results.into_iter().map(|res| res.unwrap()).collect();

    results
}

This achieves parallelism and prints out jobs running in parallel, even when blocking.

But scope_and_collect is unsafe. From the documentation:

Safety

This function is not completely safe: please see cancellation_soundness in tests.rs for a test-case that suggests how this can lead to invalid memory access if not dealt with care.

The caller must ensure that the lifetime ’a is valid until the returned future is fully driven. Dropping the future is okay, but blocks the current thread until all spawned futures complete.

I've reproduced the unsoundness with tokio in the following code:

async fn reproduce_unsafe_behavior() {
    async fn inner() {
        let world = "world";

        let mut future = Box::pin(unsafe {
            async_scoped::TokioScope::scope_and_collect(|s| {
                s.spawn(async {
                    tokio::time::sleep(Duration::from_millis(500)).await;

                    eprintln!("Trying to read to shared ref");
                    eprintln!("hello {world}");
                });
            })
        });

        let _ = tokio::time::timeout(Duration::from_millis(10), &mut future).await;
        println!("after timeout of 10ms in inner. Time to forget the future.");

        // destructor is not called so that the spawn task can continue to run
        std::mem::forget(future);
    }

    inner().await;

    // waiting 600ms to ensure that the spawned future tried to access the 'world' variable
    tokio::time::sleep(Duration::from_millis(600)).await;
}

Running this program prints out:

after timeout of 10ms in inner. Time to forget the future.
Trying to read to shared ref
hello thread 'tokio-runtime-worker' panicked at library/std/src/io/stdio.rs:1030:9:
failed printing to stderr: Bad address (os error 14)

As far as I understand the issue, scope_and_collect is safe to use unless someone uses std::mem::forget on the returned future.

Concurrency & borrowing with futures_concurrency

My attention was grabed by the v7.6.0 release of futures_concurrency mentionning "Portable Concurrent Async Iteration". This crate allows us to run futures concurrently and borrow the same parameters. Here is how we can use it:

async fn say_hello(who: &str) -> Vec<String> {
    let input: Vec<_> = (1..=5).collect();
    let results = input
        .into_co_stream()
        .map(|i| hello(i, who))
        .collect()
        .await;

    results
}

But when running this program, it does not make any progress after the first task is finished. I've opened an issue about it, let's see how this evolves.