Expand description
§Stream Task Management Architecture
This module contains the core stream task management system that handles streaming computation actors and barrier coordination in RisingWave.
§Architecture Overview
The stream task management system consists of the following layered components:
§External Interface Layer
- Meta Service: Central coordination service
LocalStreamManager: Public API handler for StreamService/ExchangeService
§Core Control Layer
LocalBarrierWorker: Central event coordinator and barrier processor- Owns
ControlStreamHandle: Bidirectional communication with Meta Service - Manages
ManagedBarrierState: Multi-database barrier state coordinatorDatabaseManagedBarrierState: Per-database barrier state manager- Uses
StreamActorManager: Actor factory and lifecycle manager - Manages
PartialGraphManagedBarrierState: Per-partial-graph barrier coordination
- Uses
- Owns
§Actor Execution Layer
- Stream Actors: Individual computation units
LocalBarrierManager: Actor-to-system event bridge
§Key Event Types
- [
risingwave_pb::stream_service::streaming_control_stream_request::Request]: Barrier injection events sent from Meta Service toControlStreamHandle LocalActorOperation: Meta control events sent fromLocalStreamManagertobarrier_worker::LocalBarrierWorkerLocalBarrierEvent: Events sent from actors viaLocalBarrierManagertobarrier_worker::managed_state::DatabaseManagedBarrierState
§Data Flow and Event Processing
§1. Barrier Flow
This is the primary coordination mechanism for checkpoints and barriers:
- Meta Service sends [
risingwave_pb::stream_service::InjectBarrierRequest] viastreaming_control_stream barrier_worker::ControlStreamHandle(owned bybarrier_worker::LocalBarrierWorker) receives the requestbarrier_worker::LocalBarrierWorkerprocesses the request and callsbarrier_worker::LocalBarrierWorker::send_barrier()barrier_worker::managed_state::DatabaseManagedBarrierState::transform_to_issuedcreates new actors if neededbarrier_worker::managed_state::PartialGraphManagedBarrierState::transform_to_issuedtransitions toIssuedstatebarrier_worker::managed_state::InflightActorState::issue_barriersends barriers to individual actors
- Stream Actors receive barriers and process them
- Stream Actors finish processing barriers and send
LocalBarrierEvent::ReportActorCollectedviaLocalBarrierManager::collect barrier_worker::managed_state::DatabaseManagedBarrierState::poll_next_eventprocesses the collection viaDatabaseManagedBarrierState::collectbarrier_worker::LocalBarrierWorker::complete_barrierinitiates state store sync if neededbarrier_worker::LocalBarrierWorker::on_epoch_completedsends [risingwave_pb::stream_service::BarrierCompleteResponse] to Meta Service
§2. Actor Lifecycle Management
How actors are created, managed, and destroyed:
- Meta Service sends [
risingwave_pb::stream_service::InjectBarrierRequest] withactors_to_build barrier_worker::managed_state::DatabaseManagedBarrierState::transform_to_issuedprocesses new actorsStreamActorManager::spawn_actorcreates and starts actor tasksbarrier_worker::managed_state::InflightActorState::starttracks actor in system
§3. Error Handling Flow
How errors propagate through the system:
- Stream Actors encounter errors and call
LocalBarrierManager::notify_failure LocalBarrierManagersends error viaactor_failure_rxbarrier_worker::managed_state::DatabaseManagedBarrierState::poll_next_eventsendsActorErroreventbarrier_worker::LocalBarrierWorker::on_database_failuresuspends database and reports to Meta Service- Meta Service responds with [
risingwave_pb::stream_service::streaming_control_stream_request::ResetDatabaseRequest] barrier_worker::LocalBarrierWorker::reset_databasestarts database reset processbarrier_worker::managed_state::SuspendedDatabaseState::resetcleans up actors and state
Modules§
- actor_
manager 🔒 - await_
tree_ key - barrier_
manager 🔒 - barrier_
worker 🔒 - cdc_
progress - env 🔒
- managed_
state - progress
- stream_
manager 🔒
Structs§
- Actor
Eval Error Report - Report expression evaluation errors to the actor context.
- Barrier
Complete Result - Collect result of some barrier on current compute node. Will be reported to the meta service in
LocalBarrierWorker::on_epoch_completed. - Create
Mview Progress Reporter - The progress held by the backfill executors to report to the local barrier manager.
- Executor
Params - Parameters to construct executors.
- Local
Barrier Manager - Can send
LocalBarrierEventtosuper::barrier_worker::managed_state::DatabaseManagedBarrierState::poll_next_event - Local
Stream Manager LocalStreamManagerdirectly handles public API for streaming, e.g.,StreamService,ExchangeService.- Partial
Graph 🔒Id - Stream
Environment - The global environment for task execution. The instance will be shared by every task.
Constants§
- ENABLE_
BARRIER_ AGGREGATION - If enabled, all actors will be grouped in the same tracing span within one epoch. Note that this option will significantly increase the overhead of tracing.
Type Aliases§
- Actor
Handle - ActorId
- Atomic
U64Ref - Consumable
Channel Pair - Dispatcher
Id - Fragment
Id - UpDown
Actor Ids - (
upstream_actor_id,downstream_actor_id) - UpDown
Fragment Ids