Module task

Source
Expand description

§Stream Task Management Architecture

This module contains the core stream task management system that handles streaming computation actors and barrier coordination in RisingWave.

§Architecture Overview

The stream task management system consists of the following layered components:

§External Interface Layer

  • Meta Service: Central coordination service
  • LocalStreamManager: Public API handler for StreamService/ExchangeService

§Core Control Layer

§Actor Execution Layer

  • Stream Actors: Individual computation units
  • LocalBarrierManager: Actor-to-system event bridge

§Key Event Types

§Data Flow and Event Processing

§1. Barrier Flow

This is the primary coordination mechanism for checkpoints and barriers:

  1. Meta Service sends risingwave_pb::stream_service::InjectBarrierRequest via streaming_control_stream
  2. barrier_worker::ControlStreamHandle (owned by barrier_worker::LocalBarrierWorker) receives the request
  3. barrier_worker::LocalBarrierWorker processes the request and calls barrier_worker::LocalBarrierWorker::send_barrier()
  4. Stream Actors receive barriers and process them
  5. Stream Actors finish processing barriers and send LocalBarrierEvent::ReportActorCollected via LocalBarrierManager::collect
  6. barrier_worker::managed_state::DatabaseManagedBarrierState::poll_next_event processes the collection via DatabaseManagedBarrierState::collect
  7. barrier_worker::LocalBarrierWorker::complete_barrier initiates state store sync if needed
  8. barrier_worker::LocalBarrierWorker::on_epoch_completed sends risingwave_pb::stream_service::BarrierCompleteResponse to Meta Service

§2. Actor Lifecycle Management

How actors are created, managed, and destroyed:

  1. Meta Service sends risingwave_pb::stream_service::InjectBarrierRequest with actors_to_build
  2. barrier_worker::managed_state::DatabaseManagedBarrierState::transform_to_issued processes new actors
  3. StreamActorManager::spawn_actor creates and starts actor tasks
  4. barrier_worker::managed_state::InflightActorState::start tracks actor in system

§3. Error Handling Flow

How errors propagate through the system:

  1. Stream Actors encounter errors and call LocalBarrierManager::notify_failure
  2. LocalBarrierManager sends error via actor_failure_rx
  3. barrier_worker::managed_state::DatabaseManagedBarrierState::poll_next_event sends ActorError event
  4. barrier_worker::LocalBarrierWorker::on_database_failure suspends database and reports to Meta Service
  5. Meta Service responds with risingwave_pb::stream_service::streaming_control_stream_request::ResetDatabaseRequest
  6. barrier_worker::LocalBarrierWorker::reset_database starts database reset process
  7. barrier_worker::managed_state::SuspendedDatabaseState::reset cleans up actors and state

Modules§

actor_manager 🔒
await_tree_key
barrier_manager 🔒
barrier_worker 🔒
env 🔒
managed_state
progress
stream_manager 🔒

Structs§

ActorEvalErrorReport
Report expression evaluation errors to the actor context.
BarrierCompleteResult
Collect result of some barrier on current compute node. Will be reported to the meta service in LocalBarrierWorker::on_epoch_completed.
CreateMviewProgressReporter
The progress held by the backfill executors to report to the local barrier manager.
ExecutorParams
Parameters to construct executors.
LocalBarrierManager
Can send LocalBarrierEvent to super::barrier_worker::managed_state::DatabaseManagedBarrierState::poll_next_event
LocalStreamManager
LocalStreamManager directly handles public API for streaming, e.g., StreamService, ExchangeService.
PartialGraphId 🔒
StreamEnvironment
The global environment for task execution. The instance will be shared by every task.

Constants§

ENABLE_BARRIER_AGGREGATION
If enabled, all actors will be grouped in the same tracing span within one epoch. Note that this option will significantly increase the overhead of tracing.

Type Aliases§

ActorHandle
ActorId
AtomicU64Ref
ConsumableChannelPair
DispatcherId
FragmentId
UpDownActorIds
(upstream_actor_id, downstream_actor_id)
UpDownFragmentIds