risingwave_stream/task/
stream_manager.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::sync::atomic::AtomicU64;
17use std::time::Instant;
18
19use futures::FutureExt;
20use futures::stream::BoxStream;
21use risingwave_common::catalog::DatabaseId;
22use risingwave_pb::stream_service::streaming_control_stream_request::InitRequest;
23use risingwave_pb::stream_service::{
24    StreamingControlStreamRequest, StreamingControlStreamResponse,
25};
26use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
27use tokio::task::JoinHandle;
28use tonic::Status;
29
30use crate::error::StreamResult;
31use crate::executor::ActorContextRef;
32use crate::executor::exchange::permit::Receiver;
33use crate::executor::monitor::StreamingMetrics;
34use crate::task::barrier_worker::{
35    ControlStreamHandle, EventSender, LocalActorOperation, LocalBarrierWorker,
36};
37use crate::task::{StreamEnvironment, UpDownActorIds};
38
39#[cfg(test)]
40pub static LOCAL_TEST_ADDR: std::sync::LazyLock<risingwave_common::util::addr::HostAddr> =
41    std::sync::LazyLock::new(|| "127.0.0.1:2333".parse().unwrap());
42
43pub type ActorHandle = JoinHandle<()>;
44
45pub type AtomicU64Ref = Arc<AtomicU64>;
46
47pub mod await_tree_key {
48    /// Await-tree key type for actors.
49    #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
50    pub struct Actor(pub crate::task::ActorId);
51
52    /// Await-tree key type for barriers.
53    #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
54    pub struct BarrierAwait {
55        pub prev_epoch: u64,
56    }
57}
58
59/// `LocalStreamManager` directly handles public API for streaming, e.g., `StreamService`, `ExchangeService`.
60///
61/// Interacts with meta, and sends [`LocalActorOperation`] events to [`LocalBarrierWorker`].
62/// Note: barriers are handled by [`ControlStreamHandle`]. The control stream is established in [`Self::handle_new_control_stream`],
63/// but the handle lives in [`LocalBarrierWorker`].
64///
65/// See [`crate::task`] for architecture overview.
66#[derive(Clone)]
67pub struct LocalStreamManager {
68    await_tree_reg: Option<await_tree::Registry>,
69
70    pub env: StreamEnvironment,
71
72    actor_op_tx: EventSender<LocalActorOperation>,
73}
74
75/// Report expression evaluation errors to the actor context.
76///
77/// The struct can be cheaply cloned.
78#[derive(Clone)]
79pub struct ActorEvalErrorReport {
80    pub actor_context: ActorContextRef,
81    pub identity: Arc<str>,
82}
83
84impl risingwave_expr::expr::EvalErrorReport for ActorEvalErrorReport {
85    fn report(&self, err: risingwave_expr::ExprError) {
86        self.actor_context.on_compute_error(err, &self.identity);
87    }
88}
89
90impl LocalStreamManager {
91    pub fn new(
92        env: StreamEnvironment,
93        streaming_metrics: Arc<StreamingMetrics>,
94        await_tree_config: Option<await_tree::Config>,
95        watermark_epoch: AtomicU64Ref,
96    ) -> Self {
97        if !env.config().unsafe_enable_strict_consistency {
98            // If strict consistency is disabled, should disable storage sanity check.
99            // Since this is a special config, we have to check it here.
100            risingwave_storage::hummock::utils::disable_sanity_check();
101        }
102
103        let await_tree_reg = await_tree_config.clone().map(await_tree::Registry::new);
104
105        let (actor_op_tx, actor_op_rx) = unbounded_channel();
106
107        let _join_handle = LocalBarrierWorker::spawn(
108            env.clone(),
109            streaming_metrics,
110            await_tree_reg.clone(),
111            watermark_epoch,
112            actor_op_rx,
113        );
114        Self {
115            await_tree_reg,
116            env,
117            actor_op_tx: EventSender(actor_op_tx),
118        }
119    }
120
121    /// Get the registry of await-trees.
122    pub fn await_tree_reg(&self) -> Option<&await_tree::Registry> {
123        self.await_tree_reg.as_ref()
124    }
125
126    /// Receive a new control stream request from meta. Notify the barrier worker to reset the CN and use the new control stream
127    /// to receive control message from meta
128    pub fn handle_new_control_stream(
129        &self,
130        sender: UnboundedSender<Result<StreamingControlStreamResponse, Status>>,
131        request_stream: BoxStream<'static, Result<StreamingControlStreamRequest, Status>>,
132        init_request: InitRequest,
133    ) {
134        self.actor_op_tx
135            .send_event(LocalActorOperation::NewControlStream {
136                handle: ControlStreamHandle::new(sender, request_stream),
137                init_request,
138            })
139    }
140
141    pub async fn take_receiver(
142        &self,
143        database_id: DatabaseId,
144        term_id: String,
145        ids: UpDownActorIds,
146    ) -> StreamResult<Receiver> {
147        self.actor_op_tx
148            .send_and_await(|result_sender| LocalActorOperation::TakeReceiver {
149                database_id,
150                term_id,
151                ids,
152                result_sender,
153            })
154            .await?
155    }
156
157    pub async fn inspect_barrier_state(&self) -> StreamResult<String> {
158        info!("start inspecting barrier state");
159        let start = Instant::now();
160        self.actor_op_tx
161            .send_and_await(|result_sender| LocalActorOperation::InspectState { result_sender })
162            .inspect(|result| {
163                info!(
164                    ok = result.is_ok(),
165                    time = ?start.elapsed(),
166                    "finish inspecting barrier state"
167                );
168            })
169            .await
170    }
171
172    pub async fn shutdown(&self) -> StreamResult<()> {
173        self.actor_op_tx
174            .send_and_await(|result_sender| LocalActorOperation::Shutdown { result_sender })
175            .await
176    }
177}
178
179#[cfg(test)]
180pub mod test_utils {
181    use risingwave_pb::common::{ActorInfo, HostAddress};
182
183    use super::*;
184
185    pub fn helper_make_local_actor(actor_id: u32) -> ActorInfo {
186        ActorInfo {
187            actor_id,
188            host: Some(HostAddress {
189                host: LOCAL_TEST_ADDR.host.clone(),
190                port: LOCAL_TEST_ADDR.port as i32,
191            }),
192        }
193    }
194}