Expand description
§Stream Task Management Architecture
This module contains the core stream task management system that handles streaming computation actors and barrier coordination in RisingWave.
§Architecture Overview
The stream task management system consists of the following layered components:
§External Interface Layer
- Meta Service: Central coordination service
LocalStreamManager
: Public API handler for StreamService/ExchangeService
§Core Control Layer
LocalBarrierWorker
: Central event coordinator and barrier processor- Owns
ControlStreamHandle
: Bidirectional communication with Meta Service - Manages
ManagedBarrierState
: Multi-database barrier state coordinatorDatabaseManagedBarrierState
: Per-database barrier state manager- Uses
StreamActorManager
: Actor factory and lifecycle manager - Manages
PartialGraphManagedBarrierState
: Per-partial-graph barrier coordination
- Uses
- Owns
§Actor Execution Layer
- Stream Actors: Individual computation units
LocalBarrierManager
: Actor-to-system event bridge
§Key Event Types
risingwave_pb::stream_service::streaming_control_stream_request::Request
: Barrier injection events sent from Meta Service toControlStreamHandle
LocalActorOperation
: Meta control events sent fromLocalStreamManager
tobarrier_worker::LocalBarrierWorker
LocalBarrierEvent
: Events sent from actors viaLocalBarrierManager
tobarrier_worker::managed_state::DatabaseManagedBarrierState
§Data Flow and Event Processing
§1. Barrier Flow
This is the primary coordination mechanism for checkpoints and barriers:
- Meta Service sends
risingwave_pb::stream_service::InjectBarrierRequest
viastreaming_control_stream
barrier_worker::ControlStreamHandle
(owned bybarrier_worker::LocalBarrierWorker
) receives the requestbarrier_worker::LocalBarrierWorker
processes the request and callsbarrier_worker::LocalBarrierWorker::send_barrier()
barrier_worker::managed_state::DatabaseManagedBarrierState::transform_to_issued
creates new actors if neededbarrier_worker::managed_state::PartialGraphManagedBarrierState::transform_to_issued
transitions toIssued
statebarrier_worker::managed_state::InflightActorState::issue_barrier
sends barriers to individual actors
- Stream Actors receive barriers and process them
- Stream Actors finish processing barriers and send
LocalBarrierEvent::ReportActorCollected
viaLocalBarrierManager::collect
barrier_worker::managed_state::DatabaseManagedBarrierState::poll_next_event
processes the collection viaDatabaseManagedBarrierState::collect
barrier_worker::LocalBarrierWorker::complete_barrier
initiates state store sync if neededbarrier_worker::LocalBarrierWorker::on_epoch_completed
sendsrisingwave_pb::stream_service::BarrierCompleteResponse
to Meta Service
§2. Actor Lifecycle Management
How actors are created, managed, and destroyed:
- Meta Service sends
risingwave_pb::stream_service::InjectBarrierRequest
withactors_to_build
barrier_worker::managed_state::DatabaseManagedBarrierState::transform_to_issued
processes new actorsStreamActorManager::spawn_actor
creates and starts actor tasksbarrier_worker::managed_state::InflightActorState::start
tracks actor in system
§3. Error Handling Flow
How errors propagate through the system:
- Stream Actors encounter errors and call
LocalBarrierManager::notify_failure
LocalBarrierManager
sends error viaactor_failure_rx
barrier_worker::managed_state::DatabaseManagedBarrierState::poll_next_event
sendsActorError
eventbarrier_worker::LocalBarrierWorker::on_database_failure
suspends database and reports to Meta Service- Meta Service responds with
risingwave_pb::stream_service::streaming_control_stream_request::ResetDatabaseRequest
barrier_worker::LocalBarrierWorker::reset_database
starts database reset processbarrier_worker::managed_state::SuspendedDatabaseState::reset
cleans up actors and state
Modules§
- actor_
manager 🔒 - await_
tree_ key - barrier_
manager 🔒 - barrier_
worker 🔒 - env 🔒
- managed_
state - progress
- stream_
manager 🔒
Structs§
- Actor
Eval Error Report - Report expression evaluation errors to the actor context.
- Barrier
Complete Result - Collect result of some barrier on current compute node. Will be reported to the meta service in
LocalBarrierWorker::on_epoch_completed
. - Create
Mview Progress Reporter - The progress held by the backfill executors to report to the local barrier manager.
- Executor
Params - Parameters to construct executors.
- Local
Barrier Manager - Can send
LocalBarrierEvent
tosuper::barrier_worker::managed_state::DatabaseManagedBarrierState::poll_next_event
- Local
Stream Manager LocalStreamManager
directly handles public API for streaming, e.g.,StreamService
,ExchangeService
.- Partial
Graph 🔒Id - Stream
Environment - The global environment for task execution. The instance will be shared by every task.
Constants§
- ENABLE_
BARRIER_ AGGREGATION - If enabled, all actors will be grouped in the same tracing span within one epoch. Note that this option will significantly increase the overhead of tracing.
Type Aliases§
- Actor
Handle - ActorId
- Atomic
U64Ref - Consumable
Channel Pair - Dispatcher
Id - Fragment
Id - UpDown
Actor Ids - (
upstream_actor_id
,downstream_actor_id
) - UpDown
Fragment Ids