risingwave_stream/task/
stream_manager.rs1use std::sync::Arc;
16use std::sync::atomic::AtomicU64;
17use std::time::Instant;
18
19use futures::FutureExt;
20use futures::stream::BoxStream;
21use risingwave_pb::id::PartialGraphId;
22use risingwave_pb::stream_service::streaming_control_stream_request::InitRequest;
23use risingwave_pb::stream_service::{
24 StreamingControlStreamRequest, StreamingControlStreamResponse,
25};
26use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
27use tokio::task::JoinHandle;
28use tonic::Status;
29
30use crate::error::StreamResult;
31use crate::executor::ActorContextRef;
32use crate::executor::exchange::permit::Receiver;
33use crate::executor::monitor::StreamingMetrics;
34use crate::task::barrier_worker::{
35 ControlStreamHandle, EventSender, LocalActorOperation, LocalBarrierWorker, TakeReceiverRequest,
36};
37use crate::task::{FragmentId, StreamEnvironment, UpDownActorIds};
38
39#[cfg(test)]
40pub static LOCAL_TEST_ADDR: std::sync::LazyLock<risingwave_common::util::addr::HostAddr> =
41 std::sync::LazyLock::new(|| "127.0.0.1:2333".parse().unwrap());
42
43pub type ActorHandle = JoinHandle<()>;
44
45pub type AtomicU64Ref = Arc<AtomicU64>;
46
47pub mod await_tree_key {
48 #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
50 pub struct Actor(pub crate::task::ActorId);
51
52 #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
54 pub struct BarrierAwait {
55 pub prev_epoch: u64,
56 }
57}
58
59#[derive(Clone)]
67pub struct LocalStreamManager {
68 await_tree_reg: Option<await_tree::Registry>,
69
70 pub env: StreamEnvironment,
71
72 actor_op_tx: EventSender<LocalActorOperation>,
73}
74
75#[derive(Clone)]
79pub struct ActorEvalErrorReport {
80 pub actor_context: ActorContextRef,
81 pub identity: Arc<str>,
82}
83
84impl risingwave_expr::expr::EvalErrorReport for ActorEvalErrorReport {
85 fn report(&self, err: risingwave_expr::ExprError) {
86 self.actor_context.on_compute_error(err, &self.identity);
87 }
88}
89
90impl LocalStreamManager {
91 pub fn new(
92 env: StreamEnvironment,
93 streaming_metrics: Arc<StreamingMetrics>,
94 await_tree_config: Option<await_tree::Config>,
95 watermark_epoch: AtomicU64Ref,
96 ) -> Self {
97 if env.global_config().unsafe_disable_strict_consistency {
98 risingwave_storage::hummock::utils::disable_sanity_check();
101 }
102
103 let await_tree_reg = await_tree_config.map(await_tree::Registry::new);
104
105 let (actor_op_tx, actor_op_rx) = unbounded_channel();
106
107 let _join_handle = LocalBarrierWorker::spawn(
108 env.clone(),
109 streaming_metrics,
110 await_tree_reg.clone(),
111 watermark_epoch,
112 actor_op_rx,
113 );
114 Self {
115 await_tree_reg,
116 env,
117 actor_op_tx: EventSender(actor_op_tx),
118 }
119 }
120
121 pub fn await_tree_reg(&self) -> Option<&await_tree::Registry> {
123 self.await_tree_reg.as_ref()
124 }
125
126 pub fn handle_new_control_stream(
129 &self,
130 sender: UnboundedSender<Result<StreamingControlStreamResponse, Status>>,
131 request_stream: BoxStream<'static, Result<StreamingControlStreamRequest, Status>>,
132 init_request: InitRequest,
133 ) {
134 self.actor_op_tx
135 .send_event(LocalActorOperation::NewControlStream {
136 handle: ControlStreamHandle::new(sender, request_stream),
137 init_request,
138 })
139 }
140
141 pub async fn take_receiver(
142 &self,
143 partial_graph_id: PartialGraphId,
144 term_id: String,
145 ids: UpDownActorIds,
146 upstream_fragment_id: FragmentId,
147 ) -> StreamResult<Receiver> {
148 self.actor_op_tx
149 .send_and_await(|result_sender| LocalActorOperation::TakeReceiver {
150 partial_graph_id,
151 term_id,
152 ids,
153 request: TakeReceiverRequest::Remote {
154 result_sender,
155 upstream_fragment_id,
156 },
157 })
158 .await?
159 }
160
161 pub async fn inspect_barrier_state(&self) -> StreamResult<String> {
162 info!("start inspecting barrier state");
163 let start = Instant::now();
164 self.actor_op_tx
165 .send_and_await(|result_sender| LocalActorOperation::InspectState { result_sender })
166 .inspect(|result| {
167 info!(
168 ok = result.is_ok(),
169 time = ?start.elapsed(),
170 "finish inspecting barrier state"
171 );
172 })
173 .await
174 }
175
176 pub async fn shutdown(&self) -> StreamResult<()> {
177 self.actor_op_tx
178 .send_and_await(|result_sender| LocalActorOperation::Shutdown { result_sender })
179 .await
180 }
181}
182
183#[cfg(test)]
184pub mod test_utils {
185 use risingwave_common::id::ActorId;
186 use risingwave_pb::common::{ActorInfo, HostAddress};
187
188 use super::*;
189 use crate::task::TEST_PARTIAL_GRAPH_ID;
190
191 pub fn helper_make_local_actor(actor_id: ActorId) -> ActorInfo {
192 ActorInfo {
193 actor_id,
194 host: Some(HostAddress {
195 host: LOCAL_TEST_ADDR.host.clone(),
196 port: LOCAL_TEST_ADDR.port as i32,
197 }),
198 partial_graph_id: TEST_PARTIAL_GRAPH_ID,
199 }
200 }
201}