Channel Pipeline (Rust)
Topics: channels, threads, cancellation
Problem
Feed items from src through a sequence of Pipe stages. Each stage reads from an input channel,
transforms items, and writes to a new output channel; stages run concurrently — stage N+1 starts
consuming as soon as stage N starts producing.
pub struct Cancel(/* … */); // cooperative cancellation, stands in for Go's context
pub type Pipe<T> = Box<dyn Fn(Cancel, Receiver<T>) -> Receiver<T>>;
pub fn run<T: Send + 'static>(cancel: &Cancel, src: Vec<T>, pipes: Vec<Pipe<T>>) -> Vec<T>;
- Turn
src into a source channel (a thread sends each item, then drops the sender).
- Chain the pipes: the output channel of stage N becomes the input of stage N+1.
- Collect and return the final channel's values in the original order.
- Pass
cancel into each stage so the pipeline can stop early; a cancelled token must make run
return promptly instead of hanging.
- An empty
src (or no pipes) returns an empty vec.
src=[1,2,3], pipes=[] → [1,2,3]
src=[1,2,3], pipes=[double] → [2,4,6]
src=[1,2,3], pipes=[double, add_one] → [3,5,7]
Key concepts
- Source thread: spawn a thread that sends each item; dropping its
Sender closes the channel.
- Channel chaining:
rx = pipe(cancel.clone(), rx) in a loop threads all stages together.
- Close discipline: a stage's thread ends when its input
Receiver is exhausted, which cascades
shutdown from the source down to the collector.
- Cancellation: the source checks the token before each send, so cancelling drains the pipeline
and
run returns.
Grading
Your solution.rs is compiled together with a trusted tests.rs (which include!s it) using
rustc --test. Only solution.rs is yours to edit.
Sign in to submit your solution.