1use core::time::Duration;
16use std::fmt::Debug;
17use std::sync::Arc;
18
19use async_recursion::async_recursion;
20use futures::{FutureExt, TryFutureExt};
21use itertools::Itertools;
22use risingwave_common::bitmap::Bitmap;
23use risingwave_common::catalog::{ColumnId, Field, Schema};
24use risingwave_common::config::{MetricLevel, StreamingConfig};
25use risingwave_common::must_match;
26use risingwave_common::operator::{unique_executor_id, unique_operator_id};
27use risingwave_common::util::runtime::BackgroundShutdownRuntime;
28use risingwave_pb::plan_common::StorageTableDesc;
29use risingwave_pb::stream_plan::stream_node::NodeBody;
30use risingwave_pb::stream_plan::{self, StreamNode, StreamScanNode, StreamScanType};
31use risingwave_pb::stream_service::inject_barrier_request::BuildActorInfo;
32use risingwave_storage::monitor::HummockTraceFutureExt;
33use risingwave_storage::table::batch_table::BatchTable;
34use risingwave_storage::{StateStore, dispatch_state_store};
35use thiserror_ext::AsReport;
36use tokio::sync::mpsc::UnboundedReceiver;
37use tokio::task::JoinHandle;
38
39use crate::common::table::state_table::StateTableBuilder;
40use crate::error::StreamResult;
41use crate::executor::monitor::StreamingMetrics;
42use crate::executor::subtask::SubtaskHandle;
43use crate::executor::{
44 Actor, ActorContext, ActorContextRef, DispatchExecutor, Execute, Executor, ExecutorInfo,
45 MergeExecutorInput, SnapshotBackfillExecutor, TroublemakerExecutor, WrapperExecutor,
46};
47use crate::from_proto::{MergeExecutorBuilder, create_executor};
48use crate::task::{
49 ActorEvalErrorReport, ActorId, AtomicU64Ref, FragmentId, LocalBarrierManager, NewOutputRequest,
50 StreamEnvironment, await_tree_key,
51};
52
53pub(crate) struct StreamActorManager {
57 pub(super) env: StreamEnvironment,
58 pub(super) streaming_metrics: Arc<StreamingMetrics>,
59
60 pub(super) watermark_epoch: AtomicU64Ref,
62
63 pub(super) await_tree_reg: Option<await_tree::Registry>,
65
66 pub(super) runtime: BackgroundShutdownRuntime,
68}
69
70impl StreamActorManager {
71 fn get_executor_id(actor_context: &ActorContext, node: &StreamNode) -> u64 {
72 unique_executor_id(actor_context.id, node.operator_id)
75 }
76
77 fn get_executor_info(node: &StreamNode, executor_id: u64) -> ExecutorInfo {
78 let schema: Schema = node.fields.iter().map(Field::from).collect();
79
80 let stream_key = node
81 .get_stream_key()
82 .iter()
83 .map(|idx| *idx as usize)
84 .collect::<Vec<_>>();
85
86 let stream_kind = node.stream_kind();
87
88 let identity = format!("{} {:X}", node.get_node_body().unwrap(), executor_id);
89
90 ExecutorInfo {
91 schema,
92 stream_key,
93 stream_kind,
94 identity,
95 id: executor_id,
96 }
97 }
98
99 async fn create_snapshot_backfill_input(
100 &self,
101 upstream_node: &StreamNode,
102 actor_context: &ActorContextRef,
103 local_barrier_manager: &LocalBarrierManager,
104 chunk_size: usize,
105 ) -> StreamResult<MergeExecutorInput> {
106 let info = Self::get_executor_info(
107 upstream_node,
108 Self::get_executor_id(actor_context, upstream_node),
109 );
110
111 let upstream_merge = must_match!(upstream_node.get_node_body().unwrap(), NodeBody::Merge(upstream_merge) => {
112 upstream_merge
113 });
114
115 MergeExecutorBuilder::new_input(
116 local_barrier_manager.clone(),
117 self.streaming_metrics.clone(),
118 actor_context.clone(),
119 info,
120 upstream_merge,
121 chunk_size,
122 )
123 .await
124 }
125
126 async fn create_snapshot_backfill_node(
127 &self,
128 stream_node: &StreamNode,
129 node: &StreamScanNode,
130 actor_context: &ActorContextRef,
131 vnode_bitmap: Option<Bitmap>,
132 local_barrier_manager: &LocalBarrierManager,
133 state_store: impl StateStore,
134 ) -> StreamResult<Executor> {
135 let [upstream_node, _]: &[_; 2] = stream_node.input.as_slice().try_into().unwrap();
136 let chunk_size = actor_context.config.developer.chunk_size;
137 let upstream = self
138 .create_snapshot_backfill_input(
139 upstream_node,
140 actor_context,
141 local_barrier_manager,
142 chunk_size,
143 )
144 .await?;
145
146 let table_desc: &StorageTableDesc = node.get_table_desc()?;
147
148 let output_indices = node
149 .output_indices
150 .iter()
151 .map(|&i| i as usize)
152 .collect_vec();
153
154 let column_ids = node
155 .upstream_column_ids
156 .iter()
157 .map(ColumnId::from)
158 .collect_vec();
159
160 let progress = local_barrier_manager.register_create_mview_progress(actor_context);
161
162 let vnodes = vnode_bitmap.map(Arc::new);
163 let barrier_rx = local_barrier_manager.subscribe_barrier(actor_context.id);
164
165 let upstream_table =
166 BatchTable::new_partial(state_store.clone(), column_ids, vnodes.clone(), table_desc);
167
168 let state_table = node.get_state_table()?;
169 let state_table = StateTableBuilder::new(state_table, state_store.clone(), vnodes)
170 .enable_preload_all_rows_by_config(&actor_context.config)
171 .build()
172 .await;
173
174 let executor = SnapshotBackfillExecutor::new(
175 upstream_table,
176 state_table,
177 upstream,
178 output_indices,
179 actor_context.clone(),
180 progress,
181 chunk_size,
182 node.rate_limit.into(),
183 barrier_rx,
184 self.streaming_metrics.clone(),
185 node.snapshot_backfill_epoch,
186 )
187 .boxed();
188
189 let info = Self::get_executor_info(
190 stream_node,
191 Self::get_executor_id(actor_context, stream_node),
192 );
193
194 if crate::consistency::insane() {
195 let mut troubled_info = info.clone();
196 troubled_info.identity = format!("{} (troubled)", info.identity);
197 Ok((
198 info,
199 TroublemakerExecutor::new((troubled_info, executor).into(), chunk_size),
200 )
201 .into())
202 } else {
203 Ok((info, executor).into())
204 }
205 }
206
207 #[expect(clippy::too_many_arguments)]
209 #[async_recursion]
210 async fn create_nodes_inner(
211 &self,
212 fragment_id: FragmentId,
213 node: &stream_plan::StreamNode,
214 env: StreamEnvironment,
215 store: impl StateStore,
216 actor_context: &ActorContextRef,
217 vnode_bitmap: Option<Bitmap>,
218 has_stateful: bool,
219 subtasks: &mut Vec<SubtaskHandle>,
220 local_barrier_manager: &LocalBarrierManager,
221 ) -> StreamResult<Executor> {
222 if let NodeBody::StreamScan(stream_scan) = node.get_node_body().unwrap()
223 && let Ok(StreamScanType::SnapshotBackfill) = stream_scan.get_stream_scan_type()
224 {
225 return dispatch_state_store!(env.state_store(), store, {
226 self.create_snapshot_backfill_node(
227 node,
228 stream_scan,
229 actor_context,
230 vnode_bitmap,
231 local_barrier_manager,
232 store,
233 )
234 .await
235 });
236 }
237
238 fn is_stateful_executor(stream_node: &StreamNode) -> bool {
241 matches!(
242 stream_node.get_node_body().unwrap(),
243 NodeBody::HashAgg(_)
244 | NodeBody::HashJoin(_)
245 | NodeBody::DeltaIndexJoin(_)
246 | NodeBody::Lookup(_)
247 | NodeBody::StreamScan(_)
248 | NodeBody::StreamCdcScan(_)
249 | NodeBody::DynamicFilter(_)
250 | NodeBody::GroupTopN(_)
251 | NodeBody::Now(_)
252 )
253 }
254 let is_stateful = is_stateful_executor(node);
255
256 let mut input = Vec::with_capacity(node.input.iter().len());
258 for input_stream_node in &node.input {
259 input.push(
260 self.create_nodes_inner(
261 fragment_id,
262 input_stream_node,
263 env.clone(),
264 store.clone(),
265 actor_context,
266 vnode_bitmap.clone(),
267 has_stateful || is_stateful,
268 subtasks,
269 local_barrier_manager,
270 )
271 .await?,
272 );
273 }
274
275 self.generate_executor_from_inputs(
276 fragment_id,
277 node,
278 env,
279 store,
280 actor_context,
281 vnode_bitmap,
282 has_stateful || is_stateful,
283 subtasks,
284 local_barrier_manager,
285 input,
286 )
287 .await
288 }
289
290 #[expect(clippy::too_many_arguments)]
291 async fn generate_executor_from_inputs(
292 &self,
293 fragment_id: FragmentId,
294 node: &stream_plan::StreamNode,
295 env: StreamEnvironment,
296 store: impl StateStore,
297 actor_context: &ActorContextRef,
298 vnode_bitmap: Option<Bitmap>,
299 has_stateful: bool,
300 subtasks: &mut Vec<SubtaskHandle>,
301 local_barrier_manager: &LocalBarrierManager,
302 input: Vec<Executor>,
303 ) -> StreamResult<Executor> {
304 let op_info = node.get_identity().clone();
305
306 let executor_id = Self::get_executor_id(actor_context, node);
309 let operator_id = unique_operator_id(fragment_id, node.operator_id);
310
311 let info = Self::get_executor_info(node, executor_id);
312
313 let eval_error_report = ActorEvalErrorReport {
314 actor_context: actor_context.clone(),
315 identity: info.identity.clone().into(),
316 };
317
318 let executor_params = ExecutorParams {
320 env: env.clone(),
321
322 info: info.clone(),
323 executor_id,
324 operator_id,
325 op_info,
326 input,
327 fragment_id,
328 executor_stats: self.streaming_metrics.clone(),
329 actor_context: actor_context.clone(),
330 vnode_bitmap,
331 eval_error_report,
332 watermark_epoch: self.watermark_epoch.clone(),
333 local_barrier_manager: local_barrier_manager.clone(),
334 config: actor_context.config.clone(),
335 };
336
337 let executor = create_executor(executor_params, node, store).await?;
338
339 let wrapped = WrapperExecutor::new(executor, actor_context.clone());
341 let executor = (info, wrapped).into();
342
343 let executor = if has_stateful {
345 let _ = subtasks;
351 executor
352 } else {
353 executor
354 };
355
356 Ok(executor)
357 }
358
359 async fn create_nodes(
361 &self,
362 fragment_id: FragmentId,
363 node: &stream_plan::StreamNode,
364 env: StreamEnvironment,
365 actor_context: &ActorContextRef,
366 vnode_bitmap: Option<Bitmap>,
367 local_barrier_manager: &LocalBarrierManager,
368 ) -> StreamResult<(Executor, Vec<SubtaskHandle>)> {
369 let mut subtasks = vec![];
370
371 let executor = dispatch_state_store!(env.state_store(), store, {
372 self.create_nodes_inner(
373 fragment_id,
374 node,
375 env,
376 store,
377 actor_context,
378 vnode_bitmap,
379 false,
380 &mut subtasks,
381 local_barrier_manager,
382 )
383 .await
384 })?;
385
386 Ok((executor, subtasks))
387 }
388
389 async fn create_actor(
390 self: Arc<Self>,
391 actor: BuildActorInfo,
392 fragment_id: FragmentId,
393 node: Arc<StreamNode>,
394 local_barrier_manager: LocalBarrierManager,
395 new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
396 ) -> StreamResult<Actor<DispatchExecutor>> {
397 let actor_id = actor.actor_id;
398 let actor_context = ActorContext::create(
399 &actor,
400 fragment_id,
401 self.env.total_mem_usage(),
402 self.streaming_metrics.clone(),
403 self.env.meta_client(),
404 self.env.global_config().clone(), self.env.clone(),
406 );
407 let vnode_bitmap = actor.vnode_bitmap.as_ref().map(|b| b.into());
408 let expr_context = actor.expr_context.clone().unwrap();
409
410 let (executor, subtasks) = self
411 .create_nodes(
412 fragment_id,
413 &node,
414 self.env.clone(),
415 &actor_context,
416 vnode_bitmap,
417 &local_barrier_manager,
418 )
419 .await?;
420
421 let dispatcher = DispatchExecutor::new(
422 executor,
423 new_output_request_rx,
424 actor.dispatchers,
425 actor_id,
426 fragment_id,
427 local_barrier_manager.clone(),
428 self.streaming_metrics.clone(),
429 )
430 .await?;
431 let actor = Actor::new(
432 dispatcher,
433 subtasks,
434 self.streaming_metrics.clone(),
435 actor_context.clone(),
436 expr_context,
437 local_barrier_manager,
438 );
439 Ok(actor)
440 }
441
442 pub(super) fn spawn_actor(
443 self: &Arc<Self>,
444 actor: BuildActorInfo,
445 fragment_id: FragmentId,
446 node: Arc<StreamNode>,
447 local_barrier_manager: LocalBarrierManager,
448 new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
449 ) -> (JoinHandle<()>, Option<JoinHandle<()>>) {
450 let monitor = tokio_metrics::TaskMonitor::new();
451 let stream_actor_ref = &actor;
452 let actor_id = stream_actor_ref.actor_id;
453 let handle = {
454 let trace_span = format!("Actor {actor_id}: `{}`", stream_actor_ref.mview_definition);
455 let barrier_manager = local_barrier_manager;
456 let actor = self
458 .clone()
459 .create_actor(
460 actor,
461 fragment_id,
462 node,
463 barrier_manager.clone(),
464 new_output_request_rx,
465 )
466 .boxed()
467 .and_then(|actor| actor.run())
468 .map(move |result| {
469 if let Err(err) = result {
470 tracing::error!(%actor_id, error = ?err.as_report(), "actor exit with error");
473 barrier_manager.notify_failure(actor_id, err);
474 }
475 });
476 let traced = match &self.await_tree_reg {
477 Some(m) => m
478 .register(await_tree_key::Actor(actor_id), trace_span)
479 .instrument(actor)
480 .left_future(),
481 None => actor.right_future(),
482 };
483 let instrumented = monitor.instrument(traced);
484 let may_track_hummock = instrumented.may_trace_hummock();
486
487 self.runtime.spawn(may_track_hummock)
488 };
489
490 let monitor_handle = if self.streaming_metrics.level >= MetricLevel::Debug
491 || self
492 .env
493 .global_config()
494 .developer
495 .enable_actor_tokio_metrics
496 {
497 tracing::info!("Tokio metrics are enabled.");
498 let streaming_metrics = self.streaming_metrics.clone();
499 let actor_monitor_task = self.runtime.spawn(async move {
500 let metrics = streaming_metrics.new_actor_metrics(actor_id);
501 loop {
502 let task_metrics = monitor.cumulative();
503 metrics
504 .actor_execution_time
505 .set(task_metrics.total_poll_duration.as_secs_f64());
506 metrics
507 .actor_fast_poll_duration
508 .set(task_metrics.total_fast_poll_duration.as_secs_f64());
509 metrics
510 .actor_fast_poll_cnt
511 .set(task_metrics.total_fast_poll_count as i64);
512 metrics
513 .actor_slow_poll_duration
514 .set(task_metrics.total_slow_poll_duration.as_secs_f64());
515 metrics
516 .actor_slow_poll_cnt
517 .set(task_metrics.total_slow_poll_count as i64);
518 metrics
519 .actor_poll_duration
520 .set(task_metrics.total_poll_duration.as_secs_f64());
521 metrics
522 .actor_poll_cnt
523 .set(task_metrics.total_poll_count as i64);
524 metrics
525 .actor_idle_duration
526 .set(task_metrics.total_idle_duration.as_secs_f64());
527 metrics
528 .actor_idle_cnt
529 .set(task_metrics.total_idled_count as i64);
530 metrics
531 .actor_scheduled_duration
532 .set(task_metrics.total_scheduled_duration.as_secs_f64());
533 metrics
534 .actor_scheduled_cnt
535 .set(task_metrics.total_scheduled_count as i64);
536 tokio::time::sleep(Duration::from_secs(1)).await;
537 }
538 });
539 Some(actor_monitor_task)
540 } else {
541 None
542 };
543 (handle, monitor_handle)
544 }
545}
546
547pub struct ExecutorParams {
551 pub env: StreamEnvironment,
552
553 pub info: ExecutorInfo,
555
556 pub executor_id: u64,
558
559 pub operator_id: u64,
561
562 pub op_info: String,
565
566 pub input: Vec<Executor>,
568
569 pub fragment_id: FragmentId,
571
572 pub executor_stats: Arc<StreamingMetrics>,
574
575 pub actor_context: ActorContextRef,
577
578 pub vnode_bitmap: Option<Bitmap>,
580
581 pub eval_error_report: ActorEvalErrorReport,
583
584 pub watermark_epoch: AtomicU64Ref,
586
587 pub local_barrier_manager: LocalBarrierManager,
588
589 pub config: Arc<StreamingConfig>,
593}
594
595impl Debug for ExecutorParams {
596 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
597 f.debug_struct("ExecutorParams")
598 .field("info", &self.info)
599 .field("executor_id", &self.executor_id)
600 .field("operator_id", &self.operator_id)
601 .field("op_info", &self.op_info)
602 .field("input", &self.input.len())
603 .field("actor_id", &self.actor_context.id)
604 .finish_non_exhaustive()
605 }
606}