risingwave_stream/task/mod.rs
1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! # Stream Task Management Architecture
16//!
17//! This module contains the core stream task management system that handles streaming
18//! computation actors and barrier coordination in RisingWave.
19//!
20//! ## Architecture Overview
21//!
22//! The stream task management system consists of the following layered components:
23//!
24//! ### External Interface Layer
25//! - Meta Service: Central coordination service
26//! - [`LocalStreamManager`]: Public API handler for StreamService/StreamExchangeService
27//!
28//! ### Core Control Layer
29//! - [`LocalBarrierWorker`]: Central event coordinator and barrier processor
30//! - Owns [`ControlStreamHandle`]: Bidirectional communication with Meta Service
31//! - Manages [`ManagedBarrierState`]: Multi-partial-graph barrier state coordinator
32//! + [`PartialGraphState`]: Per-partial-graph barrier state manager
33//! - Uses [`StreamActorManager`]: Actor factory and lifecycle manager
34//! - Manages [`PartialGraphManagedBarrierState`]: Per-partial-graph barrier coordination
35//!
36//! ### Actor Execution Layer
37//! - Stream Actors: Individual computation units
38//! - [`LocalBarrierManager`]: Actor-to-system event bridge
39//!
40//! ## Key Event Types
41//!
42//! - [`risingwave_pb::stream_service::streaming_control_stream_request::Request`]: Barrier injection events sent from Meta Service to [`ControlStreamHandle`]
43//! - [`LocalActorOperation`]: Meta control events sent from [`LocalStreamManager`] to [`barrier_worker::LocalBarrierWorker`]
44//! - [`LocalBarrierEvent`]: Events sent from actors via [`LocalBarrierManager`] to [`barrier_worker::managed_state::PartialGraphState`]
45//!
46//! ## Data Flow and Event Processing
47//!
48//! ### 1. Barrier Flow
49//! This is the primary coordination mechanism for checkpoints and barriers:
50//!
51//! 1. Meta Service sends [`risingwave_pb::stream_service::InjectBarrierRequest`] via `streaming_control_stream`
52//! 2. [`barrier_worker::ControlStreamHandle`] (owned by [`barrier_worker::LocalBarrierWorker`]) receives the request
53//! 3. [`barrier_worker::LocalBarrierWorker`] processes the request and calls [`barrier_worker::LocalBarrierWorker::send_barrier()`]
54//! - [`barrier_worker::managed_state::PartialGraphState::transform_to_issued`] creates new actors if needed
55//! - [`barrier_worker::managed_state::PartialGraphManagedBarrierState::transform_to_issued`] transitions to `Issued` state
56//! - [`barrier_worker::managed_state::InflightActorState::issue_barrier`] sends barriers to individual actors
57//! 4. Stream Actors receive barriers and process them
58//! 5. Stream Actors finish processing barriers and send [`LocalBarrierEvent::ReportActorCollected`] via [`LocalBarrierManager::collect`]
59//! 6. [`barrier_worker::managed_state::PartialGraphState::poll_next_event`] processes the collection via [`PartialGraphState::collect`]
60//! 6. [`barrier_worker::LocalBarrierWorker::complete_barrier`] initiates state store sync if needed
61//! 7. [`barrier_worker::LocalBarrierWorker::on_epoch_completed`] sends [`risingwave_pb::stream_service::BarrierCompleteResponse`] to Meta Service
62//!
63//! ### 2. Actor Lifecycle Management
64//! How actors are created, managed, and destroyed:
65//!
66//! 1. Meta Service sends [`risingwave_pb::stream_service::InjectBarrierRequest`] with `actors_to_build`
67//! 2. [`barrier_worker::managed_state::PartialGraphState::transform_to_issued`] processes new actors
68//! 3. [`StreamActorManager::spawn_actor`] creates and starts actor tasks
69//! 4. [`barrier_worker::managed_state::InflightActorState::start`] tracks actor in system
70//!
71//! ### 3. Error Handling Flow
72//! How errors propagate through the system:
73//!
74//! 1. Stream Actors encounter errors and call [`LocalBarrierManager::notify_failure`]
75//! 2. [`LocalBarrierManager`] sends error via `actor_failure_rx`
76//! 3. [`barrier_worker::managed_state::PartialGraphState::poll_next_event`] sends `ActorError` event
77//! 4. [`barrier_worker::LocalBarrierWorker::on_partial_graph_failure`] suspends partial graph and reports to Meta Service
78//! 5. Meta Service responds with [`risingwave_pb::stream_service::streaming_control_stream_request::ResetPartialGraphsRequest`]
79//! 6. [`barrier_worker::LocalBarrierWorker::reset_partial_graphs`] starts partial graph reset process
80//! 7. [`barrier_worker::managed_state::SuspendedPartialGraphState::reset`] cleans up actors and state
81
82#[expect(unused_imports, reason = "used for doc-link")]
83use barrier_worker::managed_state::{
84 ManagedBarrierState, PartialGraphManagedBarrierState, PartialGraphState,
85};
86
87use crate::executor::exchange::permit::{Receiver, Sender};
88mod actor_manager;
89mod barrier_manager;
90mod barrier_worker;
91mod env;
92mod stream_manager;
93
94pub use actor_manager::*;
95pub use barrier_manager::*;
96pub use barrier_worker::*;
97pub use env::*;
98pub use risingwave_common::id::{ActorId, FragmentId};
99use risingwave_pb::id::PartialGraphId;
100pub use stream_manager::*;
101
102pub type ConsumableChannelPair = (Option<Sender>, Option<Receiver>);
103pub type DispatcherId = FragmentId;
104/// (`upstream_actor_id`, `downstream_actor_id`)
105pub type UpDownActorIds = (ActorId, ActorId);
106pub type UpDownFragmentIds = (FragmentId, FragmentId);
107
108#[cfg(test)]
109pub(crate) const TEST_PARTIAL_GRAPH_ID: PartialGraphId = PartialGraphId::new(233);