Concurrency with Async
Async provides different APIs and performance characteristics compared to threads, but many patterns are similar. Key differences:
- Tasks are lightweight compared to OS threads
- Fair vs unfair scheduling
- Cooperative vs preemptive multitasking
Spawning Tasks
extern crate trpl; // required for mdbook test use std::time::Duration; fn main() { trpl::run(async { trpl::spawn_task(async { for i in 1..10 { println!("hi number {i} from the first task!"); trpl::sleep(Duration::from_millis(500)).await; } }); for i in 1..5 { println!("hi number {i} from the second task!"); trpl::sleep(Duration::from_millis(500)).await; } }); }
Like thread::spawn
, trpl::spawn_task
runs code concurrently. Unlike threads:
- Tasks run on the same thread (unless using a multi-threaded runtime)
- Much lower overhead - millions of tasks are feasible
- Terminated when the runtime shuts down
Joining Tasks
Tasks return join handles that implement Future
:
extern crate trpl; // required for mdbook test use std::time::Duration; fn main() { trpl::run(async { let handle = trpl::spawn_task(async { for i in 1..10 { println!("hi number {i} from the first task!"); trpl::sleep(Duration::from_millis(500)).await; } }); for i in 1..5 { println!("hi number {i} from the second task!"); trpl::sleep(Duration::from_millis(500)).await; } handle.await.unwrap(); }); }
Joining Multiple Futures
trpl::join
combines futures and waits for all to complete:
extern crate trpl; // required for mdbook test use std::time::Duration; fn main() { trpl::run(async { let fut1 = async { for i in 1..10 { println!("hi number {i} from the first task!"); trpl::sleep(Duration::from_millis(500)).await; } }; let fut2 = async { for i in 1..5 { println!("hi number {i} from the second task!"); trpl::sleep(Duration::from_millis(500)).await; } }; trpl::join(fut1, fut2).await; }); }
Key points about join
:
- Fair scheduling: alternates between futures equally
- Deterministic ordering: consistent execution pattern
- Structured concurrency: all futures complete before continuing
Compare this to threads where the OS scheduler determines execution order non-deterministically.
Message Passing
Async channels work similarly to sync channels but with async methods:
extern crate trpl; // required for mdbook test fn main() { trpl::run(async { let (tx, mut rx) = trpl::channel(); let val = String::from("hi"); tx.send(val).unwrap(); let received = rx.recv().await.unwrap(); println!("Got: {received}"); }); }
The recv
method returns a Future
that resolves when a message arrives.
Multiple Messages with Delays
extern crate trpl; // required for mdbook test
use std::time::Duration;
fn main() {
trpl::run(async {
let (tx, mut rx) = trpl::channel();
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("future"),
];
for val in vals {
tx.send(val).unwrap();
trpl::sleep(Duration::from_millis(500)).await;
}
while let Some(value) = rx.recv().await {
println!("received '{value}'");
}
});
}
This runs serially - all sends complete before receives start. For concurrency, separate into different futures:
extern crate trpl; // required for mdbook test
use std::time::Duration;
fn main() {
trpl::run(async {
let (tx, mut rx) = trpl::channel();
let tx_fut = async {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("future"),
];
for val in vals {
tx.send(val).unwrap();
trpl::sleep(Duration::from_millis(500)).await;
}
};
let rx_fut = async {
while let Some(value) = rx.recv().await {
println!("received '{value}'");
}
};
trpl::join(tx_fut, rx_fut).await;
});
}
Channel Cleanup
Channels need proper cleanup to avoid infinite loops. Move the sender into an async block so it gets dropped when sending completes:
extern crate trpl; // required for mdbook test use std::time::Duration; fn main() { trpl::run(async { let (tx, mut rx) = trpl::channel(); let tx_fut = async move { let vals = vec![ String::from("hi"), String::from("from"), String::from("the"), String::from("future"), ]; for val in vals { tx.send(val).unwrap(); trpl::sleep(Duration::from_millis(500)).await; } }; let rx_fut = async { while let Some(value) = rx.recv().await { println!("received '{value}'"); } }; trpl::join(tx_fut, rx_fut).await; }); }
The move
keyword transfers ownership into the async block. When the block completes, tx
is dropped, closing the channel.
Multiple Producers
Clone the sender for multiple producers:
extern crate trpl; // required for mdbook test use std::time::Duration; fn main() { trpl::run(async { let (tx, mut rx) = trpl::channel(); let tx1 = tx.clone(); let tx1_fut = async move { let vals = vec![ String::from("hi"), String::from("from"), String::from("the"), String::from("future"), ]; for val in vals { tx1.send(val).unwrap(); trpl::sleep(Duration::from_millis(500)).await; } }; let rx_fut = async { while let Some(value) = rx.recv().await { println!("received '{value}'"); } }; let tx_fut = async move { let vals = vec![ String::from("more"), String::from("messages"), String::from("for"), String::from("you"), ]; for val in vals { tx.send(val).unwrap(); trpl::sleep(Duration::from_millis(1500)).await; } }; trpl::join3(tx1_fut, tx_fut, rx_fut).await; }); }
Use trpl::join3
for three futures, or the join!
macro for variable numbers.
Patterns Summary
Task Spawning: spawn_task
for fire-and-forget concurrency
Joining: join
/join!
for structured concurrency waiting on all futures
Channels: Async message passing with trpl::channel
Cleanup: Use async move
to ensure proper resource cleanup