Channel Pipeline
Source: Custom
Topics: Channels, Goroutines, Generics, Context
Problem
Implement Run that feeds items from src through a sequence of Pipe stages.
Each Pipe reads from an input channel, transforms items, and writes to a new output channel.
Stages execute concurrently — stage N+1 begins consuming as soon as stage N starts producing.
Requirements:
- Convert
src into a source channel (a goroutine sends each item then closes).
- Chain the pipes: the output channel of stage N becomes the input channel of stage N+1.
- Collect and return results from the final channel in the original order.
- Pass
ctx into each Pipe so stages can stop early on cancellation.
- Return a non-nil empty slice when
src is empty or no pipes are provided.
Types:
type Pipe[T any] func(ctx context.Context, in <-chan T) <-chan T
func Run[T any](ctx context.Context, src []T, pipes ...Pipe[T]) []T
Key concepts
- Source channel: launch a goroutine that sends each item then
closes the channel.
- Channel chaining:
ch = pipe(ctx, ch) in a loop threads all stages together.
- Close discipline: each
Pipe must close its output channel when in is drained or ctx is done, so the next stage terminates.
- Concurrent stages: while stage 1 processes item 3, stage 2 is already processing item 2 — true pipeline parallelism.
Run
go test -v -bench=. -benchmem ./challenges/concurrency/pipeline/
Sign in to submit your solution.