risingwave_stream/task/
stream_manager.rs1use std::sync::Arc;
16use std::sync::atomic::AtomicU64;
17use std::time::Instant;
18
19use futures::FutureExt;
20use futures::stream::BoxStream;
21use risingwave_common::catalog::DatabaseId;
22use risingwave_pb::stream_service::streaming_control_stream_request::InitRequest;
23use risingwave_pb::stream_service::{
24 StreamingControlStreamRequest, StreamingControlStreamResponse,
25};
26use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
27use tokio::task::JoinHandle;
28use tonic::Status;
29
30use crate::error::StreamResult;
31use crate::executor::ActorContextRef;
32use crate::executor::exchange::permit::Receiver;
33use crate::executor::monitor::StreamingMetrics;
34use crate::task::barrier_worker::{
35 ControlStreamHandle, EventSender, LocalActorOperation, LocalBarrierWorker,
36};
37use crate::task::{StreamEnvironment, UpDownActorIds};
38
39#[cfg(test)]
40pub static LOCAL_TEST_ADDR: std::sync::LazyLock<risingwave_common::util::addr::HostAddr> =
41 std::sync::LazyLock::new(|| "127.0.0.1:2333".parse().unwrap());
42
43pub type ActorHandle = JoinHandle<()>;
44
45pub type AtomicU64Ref = Arc<AtomicU64>;
46
47pub mod await_tree_key {
48 #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
50 pub struct Actor(pub crate::task::ActorId);
51
52 #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
54 pub struct BarrierAwait {
55 pub prev_epoch: u64,
56 }
57}
58
59#[derive(Clone)]
67pub struct LocalStreamManager {
68 await_tree_reg: Option<await_tree::Registry>,
69
70 pub env: StreamEnvironment,
71
72 actor_op_tx: EventSender<LocalActorOperation>,
73}
74
75#[derive(Clone)]
79pub struct ActorEvalErrorReport {
80 pub actor_context: ActorContextRef,
81 pub identity: Arc<str>,
82}
83
84impl risingwave_expr::expr::EvalErrorReport for ActorEvalErrorReport {
85 fn report(&self, err: risingwave_expr::ExprError) {
86 self.actor_context.on_compute_error(err, &self.identity);
87 }
88}
89
90impl LocalStreamManager {
91 pub fn new(
92 env: StreamEnvironment,
93 streaming_metrics: Arc<StreamingMetrics>,
94 await_tree_config: Option<await_tree::Config>,
95 watermark_epoch: AtomicU64Ref,
96 ) -> Self {
97 if !env.config().unsafe_enable_strict_consistency {
98 risingwave_storage::hummock::utils::disable_sanity_check();
101 }
102
103 let await_tree_reg = await_tree_config.clone().map(await_tree::Registry::new);
104
105 let (actor_op_tx, actor_op_rx) = unbounded_channel();
106
107 let _join_handle = LocalBarrierWorker::spawn(
108 env.clone(),
109 streaming_metrics,
110 await_tree_reg.clone(),
111 watermark_epoch,
112 actor_op_rx,
113 );
114 Self {
115 await_tree_reg,
116 env,
117 actor_op_tx: EventSender(actor_op_tx),
118 }
119 }
120
121 pub fn await_tree_reg(&self) -> Option<&await_tree::Registry> {
123 self.await_tree_reg.as_ref()
124 }
125
126 pub fn handle_new_control_stream(
129 &self,
130 sender: UnboundedSender<Result<StreamingControlStreamResponse, Status>>,
131 request_stream: BoxStream<'static, Result<StreamingControlStreamRequest, Status>>,
132 init_request: InitRequest,
133 ) {
134 self.actor_op_tx
135 .send_event(LocalActorOperation::NewControlStream {
136 handle: ControlStreamHandle::new(sender, request_stream),
137 init_request,
138 })
139 }
140
141 pub async fn take_receiver(
142 &self,
143 database_id: DatabaseId,
144 term_id: String,
145 ids: UpDownActorIds,
146 ) -> StreamResult<Receiver> {
147 self.actor_op_tx
148 .send_and_await(|result_sender| LocalActorOperation::TakeReceiver {
149 database_id,
150 term_id,
151 ids,
152 result_sender,
153 })
154 .await?
155 }
156
157 pub async fn inspect_barrier_state(&self) -> StreamResult<String> {
158 info!("start inspecting barrier state");
159 let start = Instant::now();
160 self.actor_op_tx
161 .send_and_await(|result_sender| LocalActorOperation::InspectState { result_sender })
162 .inspect(|result| {
163 info!(
164 ok = result.is_ok(),
165 time = ?start.elapsed(),
166 "finish inspecting barrier state"
167 );
168 })
169 .await
170 }
171
172 pub async fn shutdown(&self) -> StreamResult<()> {
173 self.actor_op_tx
174 .send_and_await(|result_sender| LocalActorOperation::Shutdown { result_sender })
175 .await
176 }
177}
178
179#[cfg(test)]
180pub mod test_utils {
181 use risingwave_pb::common::{ActorInfo, HostAddress};
182
183 use super::*;
184
185 pub fn helper_make_local_actor(actor_id: u32) -> ActorInfo {
186 ActorInfo {
187 actor_id,
188 host: Some(HostAddress {
189 host: LOCAL_TEST_ADDR.host.clone(),
190 port: LOCAL_TEST_ADDR.port as i32,
191 }),
192 }
193 }
194}