risingwave_stream/task/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! # Stream Task Management Architecture
16//!
17//! This module contains the core stream task management system that handles streaming
18//! computation actors and barrier coordination in RisingWave.
19//!
20//! ## Architecture Overview
21//!
22//! The stream task management system consists of the following layered components:
23//!
24//! ### External Interface Layer
25//! - Meta Service: Central coordination service
26//! - [`LocalStreamManager`]: Public API handler for StreamService/ExchangeService
27//!
28//! ### Core Control Layer
29//! - [`LocalBarrierWorker`]: Central event coordinator and barrier processor
30//!   - Owns [`ControlStreamHandle`]: Bidirectional communication with Meta Service
31//!   - Manages [`ManagedBarrierState`]: Multi-database barrier state coordinator
32//!     + [`DatabaseManagedBarrierState`]: Per-database barrier state manager
33//!       - Uses [`StreamActorManager`]: Actor factory and lifecycle manager
34//!       - Manages [`PartialGraphManagedBarrierState`]: Per-partial-graph barrier coordination
35//!
36//! ### Actor Execution Layer
37//! - Stream Actors: Individual computation units
38//! - [`LocalBarrierManager`]: Actor-to-system event bridge
39//!
40//! ## Key Event Types
41//!
42//! - [`risingwave_pb::stream_service::streaming_control_stream_request::Request`]: Barrier injection events sent from Meta Service to [`ControlStreamHandle`]
43//! - [`LocalActorOperation`]: Meta control events sent from [`LocalStreamManager`] to [`barrier_worker::LocalBarrierWorker`]
44//! - [`LocalBarrierEvent`]: Events sent from actors via [`LocalBarrierManager`] to [`barrier_worker::managed_state::DatabaseManagedBarrierState`]
45//!
46//! ## Data Flow and Event Processing
47//!
48//! ### 1. Barrier Flow
49//! This is the primary coordination mechanism for checkpoints and barriers:
50//!
51//! 1. Meta Service sends [`risingwave_pb::stream_service::InjectBarrierRequest`] via `streaming_control_stream`
52//! 2. [`barrier_worker::ControlStreamHandle`] (owned by [`barrier_worker::LocalBarrierWorker`]) receives the request
53//! 3. [`barrier_worker::LocalBarrierWorker`] processes the request and calls [`barrier_worker::LocalBarrierWorker::send_barrier()`]
54//!    - [`barrier_worker::managed_state::DatabaseManagedBarrierState::transform_to_issued`] creates new actors if needed
55//!    - [`barrier_worker::managed_state::PartialGraphManagedBarrierState::transform_to_issued`] transitions to `Issued` state
56//!    - [`barrier_worker::managed_state::InflightActorState::issue_barrier`] sends barriers to individual actors
57//! 4. Stream Actors receive barriers and process them
58//! 5. Stream Actors finish processing barriers and send [`LocalBarrierEvent::ReportActorCollected`] via [`LocalBarrierManager::collect`]
59//! 6. [`barrier_worker::managed_state::DatabaseManagedBarrierState::poll_next_event`] processes the collection via [`DatabaseManagedBarrierState::collect`]
60//! 6. [`barrier_worker::LocalBarrierWorker::complete_barrier`] initiates state store sync if needed
61//! 7. [`barrier_worker::LocalBarrierWorker::on_epoch_completed`] sends [`risingwave_pb::stream_service::BarrierCompleteResponse`] to Meta Service
62//!
63//! ### 2. Actor Lifecycle Management
64//! How actors are created, managed, and destroyed:
65//!
66//! 1. Meta Service sends [`risingwave_pb::stream_service::InjectBarrierRequest`] with `actors_to_build`
67//! 2. [`barrier_worker::managed_state::DatabaseManagedBarrierState::transform_to_issued`] processes new actors
68//! 3. [`StreamActorManager::spawn_actor`] creates and starts actor tasks
69//! 4. [`barrier_worker::managed_state::InflightActorState::start`] tracks actor in system
70//!
71//! ### 3. Error Handling Flow
72//! How errors propagate through the system:
73//!
74//! 1. Stream Actors encounter errors and call [`LocalBarrierManager::notify_failure`]
75//! 2. [`LocalBarrierManager`] sends error via `actor_failure_rx`
76//! 3. [`barrier_worker::managed_state::DatabaseManagedBarrierState::poll_next_event`] sends `ActorError` event
77//! 4. [`barrier_worker::LocalBarrierWorker::on_database_failure`] suspends database and reports to Meta Service
78//! 5. Meta Service responds with [`risingwave_pb::stream_service::streaming_control_stream_request::ResetDatabaseRequest`]
79//! 6. [`barrier_worker::LocalBarrierWorker::reset_database`] starts database reset process
80//! 7. [`barrier_worker::managed_state::SuspendedDatabaseState::reset`] cleans up actors and state
81
82#[expect(unused_imports, reason = "used for doc-link")]
83use barrier_worker::managed_state::{
84    DatabaseManagedBarrierState, ManagedBarrierState, PartialGraphManagedBarrierState,
85};
86
87use crate::executor::exchange::permit::{Receiver, Sender};
88mod actor_manager;
89mod barrier_manager;
90mod barrier_worker;
91mod env;
92mod stream_manager;
93
94pub use actor_manager::*;
95pub use barrier_manager::*;
96pub use barrier_worker::*;
97pub use env::*;
98pub use stream_manager::*;
99
100pub type ConsumableChannelPair = (Option<Sender>, Option<Receiver>);
101pub type ActorId = u32;
102pub type FragmentId = u32;
103pub type DispatcherId = u64;
104/// (`upstream_actor_id`, `downstream_actor_id`)
105pub type UpDownActorIds = (ActorId, ActorId);
106pub type UpDownFragmentIds = (FragmentId, FragmentId);
107
108#[derive(Hash, Eq, PartialEq, Copy, Clone, Debug)]
109pub(crate) struct PartialGraphId(u32);
110
111#[cfg(test)]
112pub(crate) const TEST_DATABASE_ID: risingwave_common::catalog::DatabaseId =
113    risingwave_common::catalog::DatabaseId::new(u32::MAX);
114
115#[cfg(test)]
116pub(crate) const TEST_PARTIAL_GRAPH_ID: PartialGraphId = PartialGraphId(u32::MAX);
117
118impl PartialGraphId {
119    fn new(id: u32) -> Self {
120        Self(id)
121    }
122}
123
124impl From<PartialGraphId> for u32 {
125    fn from(val: PartialGraphId) -> u32 {
126        val.0
127    }
128}