1use core::time::Duration;
16use std::collections::{HashMap, HashSet};
17use std::fmt::Debug;
18use std::sync::Arc;
19
20use async_recursion::async_recursion;
21use futures::{FutureExt, TryFutureExt};
22use itertools::Itertools;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{ColumnId, Field, Schema, TableId};
25use risingwave_common::config::MetricLevel;
26use risingwave_common::must_match;
27use risingwave_common::operator::{unique_executor_id, unique_operator_id};
28use risingwave_common::util::runtime::BackgroundShutdownRuntime;
29use risingwave_expr::expr::build_non_strict_from_prost;
30use risingwave_pb::plan_common::StorageTableDesc;
31use risingwave_pb::stream_plan;
32use risingwave_pb::stream_plan::stream_node::NodeBody;
33use risingwave_pb::stream_plan::{StreamNode, StreamScanNode, StreamScanType};
34use risingwave_pb::stream_service::inject_barrier_request::BuildActorInfo;
35use risingwave_storage::monitor::HummockTraceFutureExt;
36use risingwave_storage::table::batch_table::BatchTable;
37use risingwave_storage::{StateStore, dispatch_state_store};
38use thiserror_ext::AsReport;
39use tokio::sync::mpsc::UnboundedReceiver;
40use tokio::task::JoinHandle;
41
42use crate::common::table::state_table::StateTable;
43use crate::error::StreamResult;
44use crate::executor::monitor::StreamingMetrics;
45use crate::executor::subtask::SubtaskHandle;
46use crate::executor::{
47 Actor, ActorContext, ActorContextRef, DispatchExecutor, Execute, Executor, ExecutorInfo,
48 MergeExecutorInput, SnapshotBackfillExecutor, TroublemakerExecutor, UpstreamSinkUnionExecutor,
49 WrapperExecutor,
50};
51use crate::from_proto::{MergeExecutorBuilder, create_executor};
52use crate::task::{
53 ActorEvalErrorReport, ActorId, AtomicU64Ref, FragmentId, LocalBarrierManager, NewOutputRequest,
54 StreamEnvironment, await_tree_key,
55};
56
57pub(crate) struct StreamActorManager {
61 pub(super) env: StreamEnvironment,
62 pub(super) streaming_metrics: Arc<StreamingMetrics>,
63
64 pub(super) watermark_epoch: AtomicU64Ref,
66
67 pub(super) await_tree_reg: Option<await_tree::Registry>,
69
70 pub(super) runtime: BackgroundShutdownRuntime,
72}
73
74struct SinkIntoTableUnion<'a> {
75 prefix_nodes: Vec<&'a stream_plan::StreamNode>,
76 merge_projects: Vec<(&'a stream_plan::StreamNode, &'a stream_plan::StreamNode)>,
77 union_node: &'a stream_plan::StreamNode,
78}
79
80impl StreamActorManager {
81 fn get_executor_id(actor_context: &ActorContext, node: &StreamNode) -> u64 {
82 unique_executor_id(actor_context.id, node.operator_id)
85 }
86
87 fn get_executor_info(node: &StreamNode, executor_id: u64) -> ExecutorInfo {
88 let schema: Schema = node.fields.iter().map(Field::from).collect();
89
90 let pk_indices = node
91 .get_stream_key()
92 .iter()
93 .map(|idx| *idx as usize)
94 .collect::<Vec<_>>();
95
96 let identity = format!("{} {:X}", node.get_node_body().unwrap(), executor_id);
97 ExecutorInfo {
98 schema,
99 pk_indices,
100 identity,
101 id: executor_id,
102 }
103 }
104
105 async fn create_snapshot_backfill_input(
106 &self,
107 upstream_node: &StreamNode,
108 actor_context: &ActorContextRef,
109 local_barrier_manager: &LocalBarrierManager,
110 chunk_size: usize,
111 ) -> StreamResult<MergeExecutorInput> {
112 let info = Self::get_executor_info(
113 upstream_node,
114 Self::get_executor_id(actor_context, upstream_node),
115 );
116
117 let upstream_merge = must_match!(upstream_node.get_node_body().unwrap(), NodeBody::Merge(upstream_merge) => {
118 upstream_merge
119 });
120
121 MergeExecutorBuilder::new_input(
122 local_barrier_manager.clone(),
123 self.streaming_metrics.clone(),
124 actor_context.clone(),
125 info,
126 upstream_merge,
127 chunk_size,
128 )
129 .await
130 }
131
132 #[expect(clippy::too_many_arguments)]
133 async fn create_snapshot_backfill_node(
134 &self,
135 stream_node: &StreamNode,
136 node: &StreamScanNode,
137 actor_context: &ActorContextRef,
138 vnode_bitmap: Option<Bitmap>,
139 env: StreamEnvironment,
140 local_barrier_manager: &LocalBarrierManager,
141 state_store: impl StateStore,
142 ) -> StreamResult<Executor> {
143 let [upstream_node, _]: &[_; 2] = stream_node.input.as_slice().try_into().unwrap();
144 let chunk_size = env.config().developer.chunk_size;
145 let upstream = self
146 .create_snapshot_backfill_input(
147 upstream_node,
148 actor_context,
149 local_barrier_manager,
150 chunk_size,
151 )
152 .await?;
153
154 let table_desc: &StorageTableDesc = node.get_table_desc()?;
155
156 let output_indices = node
157 .output_indices
158 .iter()
159 .map(|&i| i as usize)
160 .collect_vec();
161
162 let column_ids = node
163 .upstream_column_ids
164 .iter()
165 .map(ColumnId::from)
166 .collect_vec();
167
168 let progress = local_barrier_manager.register_create_mview_progress(actor_context.id);
169
170 let vnodes = vnode_bitmap.map(Arc::new);
171 let barrier_rx = local_barrier_manager.subscribe_barrier(actor_context.id);
172
173 let upstream_table =
174 BatchTable::new_partial(state_store.clone(), column_ids, vnodes.clone(), table_desc);
175
176 let state_table = node.get_state_table()?;
177 let state_table =
178 StateTable::from_table_catalog(state_table, state_store.clone(), vnodes).await;
179
180 let executor = SnapshotBackfillExecutor::new(
181 upstream_table,
182 state_table,
183 upstream,
184 output_indices,
185 actor_context.clone(),
186 progress,
187 chunk_size,
188 node.rate_limit.into(),
189 barrier_rx,
190 self.streaming_metrics.clone(),
191 node.snapshot_backfill_epoch,
192 )
193 .boxed();
194
195 let info = Self::get_executor_info(
196 stream_node,
197 Self::get_executor_id(actor_context, stream_node),
198 );
199
200 if crate::consistency::insane() {
201 let mut troubled_info = info.clone();
202 troubled_info.identity = format!("{} (troubled)", info.identity);
203 Ok((
204 info,
205 TroublemakerExecutor::new((troubled_info, executor).into(), chunk_size),
206 )
207 .into())
208 } else {
209 Ok((info, executor).into())
210 }
211 }
212
213 #[expect(clippy::too_many_arguments)]
214 async fn create_sink_into_table_union(
215 &self,
216 fragment_id: FragmentId,
217 union_node: &stream_plan::StreamNode,
218 env: StreamEnvironment,
219 store: impl StateStore,
220 actor_context: &ActorContextRef,
221 vnode_bitmap: Option<Bitmap>,
222 has_stateful: bool,
223 subtasks: &mut Vec<SubtaskHandle>,
224 local_barrier_manager: &LocalBarrierManager,
225 prefix_nodes: Vec<&stream_plan::StreamNode>,
226 merge_projects: Vec<(&stream_plan::StreamNode, &stream_plan::StreamNode)>,
227 ) -> StreamResult<Executor> {
228 let mut input = Vec::with_capacity(union_node.get_input().len());
229
230 for input_stream_node in prefix_nodes {
231 input.push(
232 self.create_nodes_inner(
233 fragment_id,
234 input_stream_node,
235 env.clone(),
236 store.clone(),
237 actor_context,
238 vnode_bitmap.clone(),
239 has_stateful,
240 subtasks,
241 local_barrier_manager,
242 )
243 .await?,
244 );
245 }
246
247 let first_merge = merge_projects.first().unwrap().0;
249 let executor_id = Self::get_executor_id(actor_context, first_merge);
250 let mut info = Self::get_executor_info(first_merge, executor_id);
251 info.identity = format!("UpstreamSinkUnion {:X}", executor_id);
252 let eval_error_report = ActorEvalErrorReport {
253 actor_context: actor_context.clone(),
254 identity: info.identity.clone().into(),
255 };
256
257 let upstream_infos = merge_projects
258 .into_iter()
259 .map(|(merge_node, project_node)| {
260 let upstream_fragment_id = merge_node
261 .get_node_body()
262 .unwrap()
263 .as_merge()
264 .unwrap()
265 .upstream_fragment_id;
266 let merge_schema: Schema =
267 merge_node.get_fields().iter().map(Field::from).collect();
268 let project_exprs = project_node
269 .get_node_body()
270 .unwrap()
271 .as_project()
272 .unwrap()
273 .get_select_list()
274 .iter()
275 .map(|e| build_non_strict_from_prost(e, eval_error_report.clone()))
276 .try_collect()
277 .unwrap();
278 (upstream_fragment_id, merge_schema, project_exprs)
279 })
280 .collect();
281
282 let upstream_sink_union_executor = UpstreamSinkUnionExecutor::new(
283 actor_context.clone(),
284 local_barrier_manager.clone(),
285 self.streaming_metrics.clone(),
286 env.config().developer.chunk_size,
287 upstream_infos,
288 );
289 let executor = (info, upstream_sink_union_executor).into();
290 input.push(executor);
291
292 self.generate_executor_from_inputs(
293 fragment_id,
294 union_node,
295 env,
296 store,
297 actor_context,
298 vnode_bitmap,
299 has_stateful,
300 subtasks,
301 local_barrier_manager,
302 input,
303 )
304 .await
305 }
306
307 fn as_sink_into_table_union(node: &StreamNode) -> Option<SinkIntoTableUnion<'_>> {
308 let NodeBody::Union(_) = node.get_node_body().unwrap() else {
309 return None;
310 };
311
312 let mut merge_projects = Vec::new();
313 let mut remaining_nodes = Vec::new();
314
315 let mut rev_iter = node.get_input().iter().rev();
316 for union_input in rev_iter.by_ref() {
317 let mut is_sink_into = false;
318 if let NodeBody::Project(project) = union_input.get_node_body().unwrap() {
319 let project_input = union_input.get_input().first().unwrap();
320 let project_check = project.get_watermark_input_cols().is_empty()
322 && project.get_watermark_output_cols().is_empty()
323 && project.get_nondecreasing_exprs().is_empty()
324 && !project.noop_update_hint;
325 if project_check
326 && let NodeBody::Merge(merge) = project_input.get_node_body().unwrap()
327 {
328 let merge_check = merge.upstream_dispatcher_type()
329 == risingwave_pb::stream_plan::DispatcherType::Hash;
330 if merge_check {
331 is_sink_into = true;
332 tracing::debug!(
333 "replace sink into table union, merge: {:?}, project: {:?}",
334 merge,
335 project
336 );
337 merge_projects.push((project_input, union_input));
338 }
339 }
340 }
341 if !is_sink_into {
342 remaining_nodes.push(union_input);
343 break;
344 }
345 }
346
347 if merge_projects.is_empty() {
348 return None;
349 }
350
351 remaining_nodes.extend(rev_iter);
352
353 merge_projects.reverse();
354 remaining_nodes.reverse();
355
356 Some(SinkIntoTableUnion {
358 prefix_nodes: remaining_nodes,
359 merge_projects,
360 union_node: node,
361 })
362 }
363
364 #[expect(clippy::too_many_arguments)]
366 #[async_recursion]
367 async fn create_nodes_inner(
368 &self,
369 fragment_id: FragmentId,
370 node: &stream_plan::StreamNode,
371 env: StreamEnvironment,
372 store: impl StateStore,
373 actor_context: &ActorContextRef,
374 vnode_bitmap: Option<Bitmap>,
375 has_stateful: bool,
376 subtasks: &mut Vec<SubtaskHandle>,
377 local_barrier_manager: &LocalBarrierManager,
378 ) -> StreamResult<Executor> {
379 if let NodeBody::StreamScan(stream_scan) = node.get_node_body().unwrap()
380 && let Ok(StreamScanType::SnapshotBackfill) = stream_scan.get_stream_scan_type()
381 {
382 return dispatch_state_store!(env.state_store(), store, {
383 self.create_snapshot_backfill_node(
384 node,
385 stream_scan,
386 actor_context,
387 vnode_bitmap,
388 env,
389 local_barrier_manager,
390 store,
391 )
392 .await
393 });
394 }
395
396 if let Some(SinkIntoTableUnion {
397 prefix_nodes: remaining_nodes,
398 merge_projects,
399 union_node,
400 }) = Self::as_sink_into_table_union(node)
401 {
402 return self
403 .create_sink_into_table_union(
404 fragment_id,
405 union_node,
406 env,
407 store,
408 actor_context,
409 vnode_bitmap,
410 has_stateful,
411 subtasks,
412 local_barrier_manager,
413 remaining_nodes,
414 merge_projects,
415 )
416 .await;
417 }
418
419 fn is_stateful_executor(stream_node: &StreamNode) -> bool {
422 matches!(
423 stream_node.get_node_body().unwrap(),
424 NodeBody::HashAgg(_)
425 | NodeBody::HashJoin(_)
426 | NodeBody::DeltaIndexJoin(_)
427 | NodeBody::Lookup(_)
428 | NodeBody::StreamScan(_)
429 | NodeBody::StreamCdcScan(_)
430 | NodeBody::DynamicFilter(_)
431 | NodeBody::GroupTopN(_)
432 | NodeBody::Now(_)
433 )
434 }
435 let is_stateful = is_stateful_executor(node);
436
437 let mut input = Vec::with_capacity(node.input.iter().len());
439 for input_stream_node in &node.input {
440 input.push(
441 self.create_nodes_inner(
442 fragment_id,
443 input_stream_node,
444 env.clone(),
445 store.clone(),
446 actor_context,
447 vnode_bitmap.clone(),
448 has_stateful || is_stateful,
449 subtasks,
450 local_barrier_manager,
451 )
452 .await?,
453 );
454 }
455
456 self.generate_executor_from_inputs(
457 fragment_id,
458 node,
459 env,
460 store,
461 actor_context,
462 vnode_bitmap,
463 has_stateful || is_stateful,
464 subtasks,
465 local_barrier_manager,
466 input,
467 )
468 .await
469 }
470
471 #[expect(clippy::too_many_arguments)]
472 async fn generate_executor_from_inputs(
473 &self,
474 fragment_id: FragmentId,
475 node: &stream_plan::StreamNode,
476 env: StreamEnvironment,
477 store: impl StateStore,
478 actor_context: &ActorContextRef,
479 vnode_bitmap: Option<Bitmap>,
480 has_stateful: bool,
481 subtasks: &mut Vec<SubtaskHandle>,
482 local_barrier_manager: &LocalBarrierManager,
483 input: Vec<Executor>,
484 ) -> StreamResult<Executor> {
485 let op_info = node.get_identity().clone();
486
487 let executor_id = Self::get_executor_id(actor_context, node);
490 let operator_id = unique_operator_id(fragment_id, node.operator_id);
491
492 let info = Self::get_executor_info(node, executor_id);
493
494 let eval_error_report = ActorEvalErrorReport {
495 actor_context: actor_context.clone(),
496 identity: info.identity.clone().into(),
497 };
498
499 let executor_params = ExecutorParams {
501 env: env.clone(),
502
503 info: info.clone(),
504 executor_id,
505 operator_id,
506 op_info,
507 input,
508 fragment_id,
509 executor_stats: self.streaming_metrics.clone(),
510 actor_context: actor_context.clone(),
511 vnode_bitmap,
512 eval_error_report,
513 watermark_epoch: self.watermark_epoch.clone(),
514 local_barrier_manager: local_barrier_manager.clone(),
515 };
516
517 let executor = create_executor(executor_params, node, store).await?;
518
519 let wrapped = WrapperExecutor::new(
521 executor,
522 actor_context.clone(),
523 env.config().developer.enable_executor_row_count,
524 env.config().developer.enable_explain_analyze_stats,
525 );
526 let executor = (info, wrapped).into();
527
528 let executor = if has_stateful {
530 let _ = subtasks;
536 executor
537 } else {
538 executor
539 };
540
541 Ok(executor)
542 }
543
544 async fn create_nodes(
546 &self,
547 fragment_id: FragmentId,
548 node: &stream_plan::StreamNode,
549 env: StreamEnvironment,
550 actor_context: &ActorContextRef,
551 vnode_bitmap: Option<Bitmap>,
552 local_barrier_manager: &LocalBarrierManager,
553 ) -> StreamResult<(Executor, Vec<SubtaskHandle>)> {
554 let mut subtasks = vec![];
555
556 let executor = dispatch_state_store!(env.state_store(), store, {
557 self.create_nodes_inner(
558 fragment_id,
559 node,
560 env,
561 store,
562 actor_context,
563 vnode_bitmap,
564 false,
565 &mut subtasks,
566 local_barrier_manager,
567 )
568 .await
569 })?;
570
571 Ok((executor, subtasks))
572 }
573
574 async fn create_actor(
575 self: Arc<Self>,
576 actor: BuildActorInfo,
577 fragment_id: FragmentId,
578 node: Arc<StreamNode>,
579 related_subscriptions: Arc<HashMap<TableId, HashSet<u32>>>,
580 local_barrier_manager: LocalBarrierManager,
581 new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
582 ) -> StreamResult<Actor<DispatchExecutor>> {
583 {
584 let actor_id = actor.actor_id;
585 let streaming_config = self.env.config().clone();
586 let actor_context = ActorContext::create(
587 &actor,
588 fragment_id,
589 self.env.total_mem_usage(),
590 self.streaming_metrics.clone(),
591 related_subscriptions,
592 self.env.meta_client().clone(),
593 streaming_config,
594 );
595 let vnode_bitmap = actor.vnode_bitmap.as_ref().map(|b| b.into());
596 let expr_context = actor.expr_context.clone().unwrap();
597
598 let (executor, subtasks) = self
599 .create_nodes(
600 fragment_id,
601 &node,
602 self.env.clone(),
603 &actor_context,
604 vnode_bitmap,
605 &local_barrier_manager,
606 )
607 .await?;
608
609 let dispatcher = DispatchExecutor::new(
610 executor,
611 new_output_request_rx,
612 actor.dispatchers,
613 actor_id,
614 fragment_id,
615 local_barrier_manager.clone(),
616 self.streaming_metrics.clone(),
617 )
618 .await?;
619 let actor = Actor::new(
620 dispatcher,
621 subtasks,
622 self.streaming_metrics.clone(),
623 actor_context.clone(),
624 expr_context,
625 local_barrier_manager,
626 );
627 Ok(actor)
628 }
629 }
630
631 pub(super) fn spawn_actor(
632 self: &Arc<Self>,
633 actor: BuildActorInfo,
634 fragment_id: FragmentId,
635 node: Arc<StreamNode>,
636 related_subscriptions: Arc<HashMap<TableId, HashSet<u32>>>,
637 local_barrier_manager: LocalBarrierManager,
638 new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
639 ) -> (JoinHandle<()>, Option<JoinHandle<()>>) {
640 {
641 let monitor = tokio_metrics::TaskMonitor::new();
642 let stream_actor_ref = &actor;
643 let actor_id = stream_actor_ref.actor_id;
644 let handle = {
645 let trace_span =
646 format!("Actor {actor_id}: `{}`", stream_actor_ref.mview_definition);
647 let barrier_manager = local_barrier_manager.clone();
648 let actor = self
650 .clone()
651 .create_actor(
652 actor,
653 fragment_id,
654 node,
655 related_subscriptions,
656 barrier_manager.clone(),
657 new_output_request_rx
658 ).boxed().and_then(|actor| actor.run()).map(move |result| {
659 if let Err(err) = result {
660 tracing::error!(actor_id, error = ?err.as_report(), "actor exit with error");
663 barrier_manager.notify_failure(actor_id, err);
664 }
665 });
666 let traced = match &self.await_tree_reg {
667 Some(m) => m
668 .register(await_tree_key::Actor(actor_id), trace_span)
669 .instrument(actor)
670 .left_future(),
671 None => actor.right_future(),
672 };
673 let instrumented = monitor.instrument(traced);
674 let with_config = crate::CONFIG.scope(self.env.config().clone(), instrumented);
675 let may_track_hummock = with_config.may_trace_hummock();
677
678 self.runtime.spawn(may_track_hummock)
679 };
680
681 let monitor_handle = if self.streaming_metrics.level >= MetricLevel::Debug
682 || self.env.config().developer.enable_actor_tokio_metrics
683 {
684 tracing::info!("Tokio metrics are enabled.");
685 let streaming_metrics = self.streaming_metrics.clone();
686 let actor_monitor_task = self.runtime.spawn(async move {
687 let metrics = streaming_metrics.new_actor_metrics(actor_id);
688 loop {
689 let task_metrics = monitor.cumulative();
690 metrics
691 .actor_execution_time
692 .set(task_metrics.total_poll_duration.as_secs_f64());
693 metrics
694 .actor_fast_poll_duration
695 .set(task_metrics.total_fast_poll_duration.as_secs_f64());
696 metrics
697 .actor_fast_poll_cnt
698 .set(task_metrics.total_fast_poll_count as i64);
699 metrics
700 .actor_slow_poll_duration
701 .set(task_metrics.total_slow_poll_duration.as_secs_f64());
702 metrics
703 .actor_slow_poll_cnt
704 .set(task_metrics.total_slow_poll_count as i64);
705 metrics
706 .actor_poll_duration
707 .set(task_metrics.total_poll_duration.as_secs_f64());
708 metrics
709 .actor_poll_cnt
710 .set(task_metrics.total_poll_count as i64);
711 metrics
712 .actor_idle_duration
713 .set(task_metrics.total_idle_duration.as_secs_f64());
714 metrics
715 .actor_idle_cnt
716 .set(task_metrics.total_idled_count as i64);
717 metrics
718 .actor_scheduled_duration
719 .set(task_metrics.total_scheduled_duration.as_secs_f64());
720 metrics
721 .actor_scheduled_cnt
722 .set(task_metrics.total_scheduled_count as i64);
723 tokio::time::sleep(Duration::from_secs(1)).await;
724 }
725 });
726 Some(actor_monitor_task)
727 } else {
728 None
729 };
730 (handle, monitor_handle)
731 }
732 }
733}
734
735pub struct ExecutorParams {
739 pub env: StreamEnvironment,
740
741 pub info: ExecutorInfo,
743
744 pub executor_id: u64,
746
747 pub operator_id: u64,
749
750 pub op_info: String,
753
754 pub input: Vec<Executor>,
756
757 pub fragment_id: FragmentId,
759
760 pub executor_stats: Arc<StreamingMetrics>,
762
763 pub actor_context: ActorContextRef,
765
766 pub vnode_bitmap: Option<Bitmap>,
768
769 pub eval_error_report: ActorEvalErrorReport,
771
772 pub watermark_epoch: AtomicU64Ref,
774
775 pub local_barrier_manager: LocalBarrierManager,
776}
777
778impl Debug for ExecutorParams {
779 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
780 f.debug_struct("ExecutorParams")
781 .field("info", &self.info)
782 .field("executor_id", &self.executor_id)
783 .field("operator_id", &self.operator_id)
784 .field("op_info", &self.op_info)
785 .field("input", &self.input.len())
786 .field("actor_id", &self.actor_context.id)
787 .finish_non_exhaustive()
788 }
789}