Tutorial: Orchestration
Orchestration coordinates multiple async operations and agents.
What Is Orchestration?
Orchestration handles:
- Running multiple operations in parallel
- Racing operations for the first result
- Voting across multiple sources
- Timeout handling
- Selecting from multiple futures
Parallel Execution
Run operations concurrently:
lumen
cell fetch_all(urls: list[String]) -> list[String] / {http}
await parallel for url in urls
fetch(url)
end
endParallel with Collection
lumen
cell analyze_sources(sources: list[String]) -> list[Analysis] / {llm}
await parallel for source in sources
analyze(source)
end
endParallel Block
lumen
cell multi_task() -> tuple[String, Int, Bool] / {http, db}
await parallel
result1 = fetch_from_api()
result2 = query_database()
result3 = check_feature_flag()
end
return (result1, result2, result3)
endRace
Return the first completed result:
lumen
cell fast_fetch(url1: String, url2: String) -> String / {http}
await race
fetch(url1)
fetch(url2)
end
endRace with Timeout
lumen
cell fetch_with_timeout(url: String, timeout_ms: Int) -> String | Null / {http}
await race
fetch(url)
sleep(timeout_ms) |> fn(_) => null
end
endVote
Get consensus from multiple sources:
lumen
cell get_answer(question: String) -> String / {llm}
await vote
model_a(question)
model_b(question)
model_c(question)
end
endVoting returns the most common result.
Select
Wait for first available from multiple channels:
lumen
cell handle_events() -> String
await select
event1 = channel_a.receive()
event2 = channel_b.receive()
timeout = sleep(5000)
end
match event
e: Event1 -> handle1(e)
e: Event2 -> handle2(e)
_ -> "timeout"
end
endTimeout
Add timeout constraints:
lumen
cell bounded_fetch(url: String) -> result[String, String] / {http}
let result = await timeout(5000, fetch(url))
match result
data: String -> return ok(data)
null -> return err("Timeout")
end
endOrchestration Process
Combine orchestration patterns:
lumen
orchestration ResearchTeam
use tool llm.chat as Chat
use tool http.get as Fetch
grant Chat model "gpt-4o"
cell research(topic: String) -> Report / {llm, http}
# Gather sources in parallel
let sources = await parallel for engine in ["google", "bing", "duckduckgo"]
search(engine, topic)
end
# Race for quick summary
let quick = await race
summarize_fast(sources)
summarize_thorough(sources)
end
# Vote on best insights
let insights = await vote
extract_insights_a(sources)
extract_insights_b(sources)
extract_insights_c(sources)
end
return combine(quick, insights)
end
cell search(engine: String, topic: String) -> SearchResult / {http}
# Search implementation
end
cell summarize_fast(sources: list[SearchResult]) -> String / {llm}
# Fast summarization
end
cell summarize_thorough(sources: list[SearchResult]) -> String / {llm}
# Thorough summarization
end
endSpawning Futures
Create background tasks:
lumen
cell background_processing() -> Null
let future1 = spawn(long_task_a())
let future2 = spawn(long_task_b())
# Do other work...
let result1 = await future1
let result2 = await future2
return null
endSpawn List
lumen
cell process_all(items: list[Item]) -> list[Result]
let futures = spawn([
process(items[0]),
process(items[1]),
process(items[2])
])
return await futures
endFuture States
Futures have explicit states:
Pending— Not yet completeCompleted(value)— Finished with resultError(message)— Failed
lumen
cell check_future(f: Future) -> String
match f.state
Pending -> "Still running"
Completed(v) -> "Got: {v}"
Error(msg) -> "Failed: {msg}"
end
endDeterministic Scheduling
With @deterministic true:
lumen
@deterministic true
cell ordered_execution() -> list[Int]
# Futures execute in order, not concurrently
let f1 = spawn(task_a())
let f2 = spawn(task_b())
return [await f1, await f2] # Always [a_result, b_result]
endExample: Multi-Model Analysis
lumen
use tool llm.chat as Chat
agent ModelA
use tool llm.chat as Chat
grant Chat model "gpt-4o"
cell analyze(text: String) -> Analysis / {llm}
# GPT-4 analysis
end
end
agent ModelB
use tool llm.chat as Chat
grant Chat model "claude-3-opus"
cell analyze(text: String) -> Analysis / {llm}
# Claude analysis
end
end
orchestration EnsembleAnalyzer
cell analyze(text: String) -> ConsensusAnalysis / {llm}
let a = ModelA()
let b = ModelB()
# Get analyses in parallel
let analyses = await parallel
r1 = a.analyze(text)
r2 = b.analyze(text)
end
# Vote on sentiment
let sentiment = await vote
r1.sentiment
r2.sentiment
return ConsensusAnalysis(
sentiment: sentiment,
details: combine(r1, r2)
)
end
endBest Practices
- Use parallel for independent work — Don't serialize unnecessarily
- Add timeouts — Prevent hanging on slow responses
- Handle failures — Races can fail if all fail
- Use vote for consensus — Better than single-source decisions
- Test determinism — Ensure reproducible results when needed
Next Steps
- Advanced Effects — Effect system deep dive
- Async & Futures — Async programming