Working with Any Number of Futures
Variable-Arity Joins
The join!
macro handles variable numbers of futures:
extern crate trpl; // required for mdbook test use std::time::Duration; fn main() { trpl::run(async { let (tx, mut rx) = trpl::channel(); let tx1 = tx.clone(); let tx1_fut = async move { let vals = vec![ String::from("hi"), String::from("from"), String::from("the"), String::from("future"), ]; for val in vals { tx1.send(val).unwrap(); trpl::sleep(Duration::from_secs(1)).await; } }; let rx_fut = async { while let Some(value) = rx.recv().await { println!("received '{value}'"); } }; let tx_fut = async move { let vals = vec![ String::from("more"), String::from("messages"), String::from("for"), String::from("you"), ]; for val in vals { tx.send(val).unwrap(); trpl::sleep(Duration::from_secs(1)).await; } }; trpl::join!(tx1_fut, tx_fut, rx_fut); }); }
For dynamic collections of futures, use join_all
:
extern crate trpl; // required for mdbook test
use std::time::Duration;
fn main() {
trpl::run(async {
let (tx, mut rx) = trpl::channel();
let tx1 = tx.clone();
let tx1_fut = async move {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("future"),
];
for val in vals {
tx1.send(val).unwrap();
trpl::sleep(Duration::from_secs(1)).await;
}
};
let rx_fut = async {
while let Some(value) = rx.recv().await {
println!("received '{value}'");
}
};
let tx_fut = async move {
let vals = vec![
String::from("more"),
String::from("messages"),
String::from("for"),
String::from("you"),
];
for val in vals {
tx.send(val).unwrap();
trpl::sleep(Duration::from_secs(1)).await;
}
};
let futures = vec![tx1_fut, rx_fut, tx_fut];
trpl::join_all(futures).await;
});
}
Error: Each async block creates a unique anonymous type. Even identical blocks have different types.
Trait Objects for Dynamic Collections
Use trait objects to unify future types:
extern crate trpl; // required for mdbook test
use std::time::Duration;
fn main() {
trpl::run(async {
let (tx, mut rx) = trpl::channel();
let tx1 = tx.clone();
let tx1_fut = async move {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("future"),
];
for val in vals {
tx1.send(val).unwrap();
trpl::sleep(Duration::from_secs(1)).await;
}
};
let rx_fut = async {
while let Some(value) = rx.recv().await {
println!("received '{value}'");
}
};
let tx_fut = async move {
let vals = vec![
String::from("more"),
String::from("messages"),
String::from("for"),
String::from("you"),
];
for val in vals {
tx.send(val).unwrap();
trpl::sleep(Duration::from_secs(1)).await;
}
};
let futures =
vec![Box::new(tx1_fut), Box::new(rx_fut), Box::new(tx_fut)];
trpl::join_all(futures).await;
});
}
extern crate trpl; // required for mdbook test
use std::time::Duration;
fn main() {
trpl::run(async {
let (tx, mut rx) = trpl::channel();
let tx1 = tx.clone();
let tx1_fut = async move {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("future"),
];
for val in vals {
tx1.send(val).unwrap();
trpl::sleep(Duration::from_secs(1)).await;
}
};
let rx_fut = async {
while let Some(value) = rx.recv().await {
println!("received '{value}'");
}
};
let tx_fut = async move {
let vals = vec![
String::from("more"),
String::from("messages"),
String::from("for"),
String::from("you"),
];
for val in vals {
tx.send(val).unwrap();
trpl::sleep(Duration::from_secs(1)).await;
}
};
let futures: Vec<Box<dyn Future<Output = ()>>> =
vec![Box::new(tx1_fut), Box::new(rx_fut), Box::new(tx_fut)];
trpl::join_all(futures).await;
});
}
Pin Requirements: join_all
requires futures to implement Unpin
. Most async blocks don’t implement Unpin
automatically.
Pinning Futures
Use Pin<Box<T>>
to satisfy the Unpin
requirement:
extern crate trpl; // required for mdbook test use std::pin::Pin; // -- snip -- use std::time::Duration; fn main() { trpl::run(async { let (tx, mut rx) = trpl::channel(); let tx1 = tx.clone(); let tx1_fut = async move { let vals = vec![ String::from("hi"), String::from("from"), String::from("the"), String::from("future"), ]; for val in vals { tx1.send(val).unwrap(); trpl::sleep(Duration::from_secs(1)).await; } }; let rx_fut = async { while let Some(value) = rx.recv().await { println!("received '{value}'"); } }; let tx_fut = async move { let vals = vec![ String::from("more"), String::from("messages"), String::from("for"), String::from("you"), ]; for val in vals { tx.send(val).unwrap(); trpl::sleep(Duration::from_secs(1)).await; } }; let futures: Vec<Pin<Box<dyn Future<Output = ()>>>> = vec![Box::pin(tx1_fut), Box::pin(rx_fut), Box::pin(tx_fut)]; trpl::join_all(futures).await; }); }
Performance optimization: Use pin!
macro to avoid heap allocation:
extern crate trpl; // required for mdbook test use std::pin::{Pin, pin}; // -- snip -- use std::time::Duration; fn main() { trpl::run(async { let (tx, mut rx) = trpl::channel(); let tx1 = tx.clone(); let tx1_fut = pin!(async move { // --snip-- let vals = vec![ String::from("hi"), String::from("from"), String::from("the"), String::from("future"), ]; for val in vals { tx1.send(val).unwrap(); trpl::sleep(Duration::from_secs(1)).await; } }); let rx_fut = pin!(async { // --snip-- while let Some(value) = rx.recv().await { println!("received '{value}'"); } }); let tx_fut = pin!(async move { // --snip-- let vals = vec![ String::from("more"), String::from("messages"), String::from("for"), String::from("you"), ]; for val in vals { tx.send(val).unwrap(); trpl::sleep(Duration::from_secs(1)).await; } }); let futures: Vec<Pin<&mut dyn Future<Output = ()>>> = vec![tx1_fut, rx_fut, tx_fut]; trpl::join_all(futures).await; }); }
Tradeoffs
Same types + dynamic count: Use join_all
with trait objects and pinning
Different types + known count: Use join!
macro or specific join
functions
Performance: Stack pinning (pin!
) vs heap allocation (Box::pin
)
extern crate trpl; // required for mdbook test fn main() { trpl::run(async { let a = async { 1u32 }; let b = async { "Hello!" }; let c = async { true }; let (a_result, b_result, c_result) = trpl::join!(a, b, c); println!("{a_result}, {b_result}, {c_result}"); }); }
The join!
macro handles heterogeneous types but requires compile-time knowledge of count and types.
Racing Futures
race
returns the first completed future, discarding others:
extern crate trpl; // required for mdbook test use std::time::Duration; fn main() { trpl::run(async { let slow = async { println!("'slow' started."); trpl::sleep(Duration::from_millis(100)).await; println!("'slow' finished."); }; let fast = async { println!("'fast' started."); trpl::sleep(Duration::from_millis(50)).await; println!("'fast' finished."); }; trpl::race(slow, fast).await; }); }
Key point: Futures only yield at await points. CPU-intensive work without await points will block other futures (cooperative multitasking).
Yielding Control
Force yield points with yield_now()
:
extern crate trpl; // required for mdbook test use std::{thread, time::Duration}; fn main() { trpl::run(async { // We will call `slow` here later }); } fn slow(name: &str, ms: u64) { thread::sleep(Duration::from_millis(ms)); println!("'{name}' ran for {ms}ms"); }
extern crate trpl; // required for mdbook test use std::{thread, time::Duration}; fn main() { trpl::run(async { let a = async { println!("'a' started."); slow("a", 30); slow("a", 10); slow("a", 20); trpl::sleep(Duration::from_millis(50)).await; println!("'a' finished."); }; let b = async { println!("'b' started."); slow("b", 75); slow("b", 10); slow("b", 15); slow("b", 350); trpl::sleep(Duration::from_millis(50)).await; println!("'b' finished."); }; trpl::race(a, b).await; }); } fn slow(name: &str, ms: u64) { thread::sleep(Duration::from_millis(ms)); println!("'{name}' ran for {ms}ms"); }
extern crate trpl; // required for mdbook test use std::{thread, time::Duration}; fn main() { trpl::run(async { let one_ms = Duration::from_millis(1); let a = async { println!("'a' started."); slow("a", 30); trpl::sleep(one_ms).await; slow("a", 10); trpl::sleep(one_ms).await; slow("a", 20); trpl::sleep(one_ms).await; println!("'a' finished."); }; let b = async { println!("'b' started."); slow("b", 75); trpl::sleep(one_ms).await; slow("b", 10); trpl::sleep(one_ms).await; slow("b", 15); trpl::sleep(one_ms).await; slow("b", 350); trpl::sleep(one_ms).await; println!("'b' finished."); }; trpl::race(a, b).await; }); } fn slow(name: &str, ms: u64) { thread::sleep(Duration::from_millis(ms)); println!("'{name}' ran for {ms}ms"); }
extern crate trpl; // required for mdbook test use std::{thread, time::Duration}; fn main() { trpl::run(async { let a = async { println!("'a' started."); slow("a", 30); trpl::yield_now().await; slow("a", 10); trpl::yield_now().await; slow("a", 20); trpl::yield_now().await; println!("'a' finished."); }; let b = async { println!("'b' started."); slow("b", 75); trpl::yield_now().await; slow("b", 10); trpl::yield_now().await; slow("b", 15); trpl::yield_now().await; slow("b", 350); trpl::yield_now().await; println!("'b' finished."); }; trpl::race(a, b).await; }); } fn slow(name: &str, ms: u64) { thread::sleep(Duration::from_millis(ms)); println!("'{name}' ran for {ms}ms"); }
Performance comparison: yield_now()
is significantly faster than sleep()
for cooperation points.
extern crate trpl; // required for mdbook test use std::time::{Duration, Instant}; fn main() { trpl::run(async { let one_ns = Duration::from_nanos(1); let start = Instant::now(); async { for _ in 1..1000 { trpl::sleep(one_ns).await; } } .await; let time = Instant::now() - start; println!( "'sleep' version finished after {} seconds.", time.as_secs_f32() ); let start = Instant::now(); async { for _ in 1..1000 { trpl::yield_now().await; } } .await; let time = Instant::now() - start; println!( "'yield' version finished after {} seconds.", time.as_secs_f32() ); }); }
Building Async Abstractions
Compose futures to create higher-level abstractions:
extern crate trpl; // required for mdbook test
use std::time::Duration;
fn main() {
trpl::run(async {
let slow = async {
trpl::sleep(Duration::from_millis(100)).await;
"I finished!"
};
match timeout(slow, Duration::from_millis(10)).await {
Ok(message) => println!("Succeeded with '{message}'"),
Err(duration) => {
println!("Failed after {} seconds", duration.as_secs())
}
}
});
}
extern crate trpl; // required for mdbook test
use std::time::Duration;
fn main() {
trpl::run(async {
let slow = async {
trpl::sleep(Duration::from_secs(5)).await;
"Finally finished"
};
match timeout(slow, Duration::from_millis(10)).await {
Ok(message) => println!("Succeeded with '{message}'"),
Err(duration) => {
println!("Failed after {} seconds", duration.as_secs())
}
}
});
}
async fn timeout<F: Future>(
future_to_try: F,
max_time: Duration,
) -> Result<F::Output, Duration> {
// Here is where our implementation will go!
}
extern crate trpl; // required for mdbook test use std::time::Duration; use trpl::Either; // --snip-- fn main() { trpl::run(async { let slow = async { trpl::sleep(Duration::from_secs(5)).await; "Finally finished" }; match timeout(slow, Duration::from_secs(2)).await { Ok(message) => println!("Succeeded with '{message}'"), Err(duration) => { println!("Failed after {} seconds", duration.as_secs()) } } }); } async fn timeout<F: Future>( future_to_try: F, max_time: Duration, ) -> Result<F::Output, Duration> { match trpl::race(future_to_try, trpl::sleep(max_time)).await { Either::Left(output) => Ok(output), Either::Right(_) => Err(max_time), } }
Pattern: Use race
to implement timeout logic. First argument gets priority due to non-fair scheduling.
Key Patterns
- Dynamic collections: Trait objects + pinning for runtime-determined future sets
- Static collections:
join!
macro for compile-time known futures - Cooperation: Insert
yield_now()
in CPU-intensive loops - Composition: Build complex async behavior from simpler primitives
- Performance: Choose between stack (
pin!
) and heap (Box::pin
) allocation