1use core::time::Duration;
16use std::fmt::Debug;
17use std::sync::Arc;
18
19use async_recursion::async_recursion;
20use futures::{FutureExt, TryFutureExt};
21use itertools::Itertools;
22use risingwave_common::bail;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{ColumnId, Field, Schema};
25use risingwave_common::config::{MetricLevel, StreamingConfig, merge_streaming_config_section};
26use risingwave_common::operator::{unique_executor_id, unique_operator_id};
27use risingwave_common::util::runtime::BackgroundShutdownRuntime;
28use risingwave_pb::id::{ExecutorId, GlobalOperatorId};
29use risingwave_pb::stream_plan::stream_node::NodeBody;
30use risingwave_pb::stream_plan::{
31 self, StreamNode, StreamScanNode, StreamScanType, SyncLogStoreNode,
32};
33use risingwave_pb::stream_service::inject_barrier_request::BuildActorInfo;
34use risingwave_storage::monitor::HummockTraceFutureExt;
35use risingwave_storage::table::batch_table::BatchTable;
36use risingwave_storage::{StateStore, dispatch_state_store};
37use thiserror_ext::AsReport;
38use tokio::sync::mpsc::UnboundedReceiver;
39use tokio::task::JoinHandle;
40
41use crate::common::table::state_table::StateTableBuilder;
42use crate::error::StreamResult;
43use crate::executor::monitor::StreamingMetrics;
44use crate::executor::subtask::SubtaskHandle;
45use crate::executor::{
46 Actor, ActorContext, ActorContextRef, DispatchExecutor, Execute, Executor, ExecutorInfo,
47 SnapshotBackfillExecutor, SyncLogStoreDispatchExecutor, TroublemakerExecutor, WrapperExecutor,
48};
49use crate::from_proto::{MergeExecutorBuilder, create_executor};
50use crate::task::{
51 ActorEvalErrorReport, ActorId, AtomicU64Ref, FragmentId, LocalBarrierManager, NewOutputRequest,
52 StreamEnvironment, await_tree_key,
53};
54
55pub const CONFIG_OVERRIDE_CACHE_DEFAULT_CAPACITY: u64 = 256;
58pub type ConfigOverrideCache = moka::sync::Cache<String, Arc<StreamingConfig>>;
59
60pub(crate) struct StreamActorManager {
64 pub(super) env: StreamEnvironment,
65 pub(super) streaming_metrics: Arc<StreamingMetrics>,
66
67 pub(super) watermark_epoch: AtomicU64Ref,
69
70 pub(super) await_tree_reg: Option<await_tree::Registry>,
72
73 pub(super) runtime: BackgroundShutdownRuntime,
75
76 pub(super) config_override_cache: ConfigOverrideCache,
81}
82
83impl StreamActorManager {
84 fn get_executor_id(actor_context: &ActorContext, node: &StreamNode) -> ExecutorId {
85 unique_executor_id(actor_context.id, node.operator_id)
88 }
89
90 fn get_executor_info(node: &StreamNode, executor_id: ExecutorId) -> ExecutorInfo {
91 let schema: Schema = node.fields.iter().map(Field::from).collect();
92
93 let stream_key = node
94 .get_stream_key()
95 .iter()
96 .map(|idx| *idx as usize)
97 .collect::<Vec<_>>();
98
99 let stream_kind = node.stream_kind();
100
101 let identity = format!("{} {:X}", node.get_node_body().unwrap(), executor_id);
102
103 ExecutorInfo {
104 schema,
105 stream_key,
106 stream_kind,
107 identity,
108 id: executor_id,
109 }
110 }
111
112 async fn create_snapshot_backfill_node(
113 &self,
114 stream_node: &StreamNode,
115 node: &StreamScanNode,
116 actor_context: &ActorContextRef,
117 vnode_bitmap: Option<Bitmap>,
118 local_barrier_manager: &LocalBarrierManager,
119 state_store: impl StateStore,
120 ) -> StreamResult<Executor> {
121 let [upstream_node, _]: &[_; 2] = stream_node.input.as_slice().try_into().unwrap();
122 let chunk_size = actor_context.config.developer.chunk_size;
123
124 let upstream_info = Self::get_executor_info(
125 upstream_node,
126 Self::get_executor_id(actor_context, upstream_node),
127 );
128
129 let NodeBody::Merge(upstream_merge) = upstream_node.get_node_body()? else {
130 bail!("expect Merge as input of SnapshotBackfill");
131 };
132
133 let upstream = MergeExecutorBuilder::new_input(
134 local_barrier_manager.clone(),
135 self.streaming_metrics.clone(),
136 actor_context.clone(),
137 upstream_info,
138 upstream_merge,
139 chunk_size,
140 )
141 .await?;
142
143 let table_desc = node.get_table_desc()?;
144
145 let output_indices = node
146 .output_indices
147 .iter()
148 .map(|&i| i as usize)
149 .collect_vec();
150 let info = Self::get_executor_info(
151 stream_node,
152 Self::get_executor_id(actor_context, stream_node),
153 );
154 let stream_key = info.stream_key.clone();
155
156 let column_ids = node
157 .upstream_column_ids
158 .iter()
159 .map(ColumnId::from)
160 .collect_vec();
161
162 let progress = local_barrier_manager.register_create_mview_progress(actor_context);
163
164 let vnodes = vnode_bitmap.map(Arc::new);
165 let barrier_rx = local_barrier_manager.subscribe_barrier(actor_context.id);
166
167 let upstream_table =
168 BatchTable::new_partial(state_store.clone(), column_ids, vnodes.clone(), table_desc);
169
170 let state_table = node.get_state_table()?;
171 let state_table = StateTableBuilder::new(state_table, state_store.clone(), vnodes)
172 .enable_preload_all_rows_by_config(&actor_context.config)
173 .build()
174 .await;
175
176 let executor = SnapshotBackfillExecutor::new(
177 upstream_table,
178 state_table,
179 upstream,
180 node.pk_scan_range.as_ref(),
181 output_indices,
182 stream_key,
183 actor_context.clone(),
184 progress,
185 chunk_size,
186 node.rate_limit.into(),
187 barrier_rx,
188 self.streaming_metrics.clone(),
189 node.snapshot_backfill_epoch,
190 )?
191 .boxed();
192
193 if crate::consistency::insane() {
194 let mut troubled_info = info.clone();
195 troubled_info.identity = format!("{} (troubled)", info.identity);
196 Ok((
197 info,
198 TroublemakerExecutor::new((troubled_info, executor).into(), chunk_size),
199 )
200 .into())
201 } else {
202 Ok((info, executor).into())
203 }
204 }
205
206 #[expect(clippy::too_many_arguments)]
208 #[async_recursion]
209 async fn create_nodes_inner(
210 &self,
211 fragment_id: FragmentId,
212 node: &stream_plan::StreamNode,
213 env: StreamEnvironment,
214 store: impl StateStore,
215 actor_context: &ActorContextRef,
216 vnode_bitmap: Option<Bitmap>,
217 has_stateful: bool,
218 subtasks: &mut Vec<SubtaskHandle>,
219 local_barrier_manager: &LocalBarrierManager,
220 ) -> StreamResult<Executor> {
221 fn is_stateful_executor(stream_node: &StreamNode) -> bool {
224 matches!(
225 stream_node.get_node_body().unwrap(),
226 NodeBody::HashAgg(_)
227 | NodeBody::HashJoin(_)
228 | NodeBody::DeltaIndexJoin(_)
229 | NodeBody::Lookup(_)
230 | NodeBody::StreamScan(_)
231 | NodeBody::StreamCdcScan(_)
232 | NodeBody::DynamicFilter(_)
233 | NodeBody::GroupTopN(_)
234 | NodeBody::Now(_)
235 )
236 }
237 let is_stateful = is_stateful_executor(node);
238
239 let executor = if let NodeBody::StreamScan(stream_scan) = node.get_node_body().unwrap()
240 && let Ok(StreamScanType::SnapshotBackfill) = stream_scan.get_stream_scan_type()
241 {
242 dispatch_state_store!(env.state_store(), store, {
243 self.create_snapshot_backfill_node(
244 node,
245 stream_scan,
246 actor_context,
247 vnode_bitmap,
248 local_barrier_manager,
249 store,
250 )
251 .await
252 })?
253 } else {
254 let mut input = Vec::with_capacity(node.input.iter().len());
256 for input_stream_node in &node.input {
257 input.push(
258 self.create_nodes_inner(
259 fragment_id,
260 input_stream_node,
261 env.clone(),
262 store.clone(),
263 actor_context,
264 vnode_bitmap.clone(),
265 has_stateful || is_stateful,
266 subtasks,
267 local_barrier_manager,
268 )
269 .await?,
270 );
271 }
272
273 self.generate_executor_from_inputs(
274 fragment_id,
275 node,
276 env,
277 store,
278 actor_context,
279 vnode_bitmap,
280 local_barrier_manager,
281 input,
282 )
283 .await?
284 };
285 Ok(Self::wrap_executor(
286 executor,
287 actor_context,
288 has_stateful || is_stateful,
289 subtasks,
290 ))
291 }
292
293 #[expect(clippy::too_many_arguments)]
294 async fn generate_executor_from_inputs(
295 &self,
296 fragment_id: FragmentId,
297 node: &stream_plan::StreamNode,
298 env: StreamEnvironment,
299 store: impl StateStore,
300 actor_context: &ActorContextRef,
301 vnode_bitmap: Option<Bitmap>,
302 local_barrier_manager: &LocalBarrierManager,
303 input: Vec<Executor>,
304 ) -> StreamResult<Executor> {
305 let op_info = node.get_identity().clone();
306
307 let executor_id = Self::get_executor_id(actor_context, node);
310 let operator_id = unique_operator_id(fragment_id, node.operator_id);
311
312 let info = Self::get_executor_info(node, executor_id);
313
314 let eval_error_report = ActorEvalErrorReport {
315 actor_context: actor_context.clone(),
316 identity: info.identity.clone().into(),
317 };
318
319 let executor_params = ExecutorParams {
321 env: env.clone(),
322
323 info: info.clone(),
324 executor_id,
325 operator_id,
326 op_info,
327 input,
328 fragment_id,
329 executor_stats: self.streaming_metrics.clone(),
330 actor_context: actor_context.clone(),
331 vnode_bitmap,
332 eval_error_report,
333 watermark_epoch: self.watermark_epoch.clone(),
334 local_barrier_manager: local_barrier_manager.clone(),
335 config: actor_context.config.clone(),
336 };
337
338 create_executor(executor_params, node, store).await
339 }
340
341 fn wrap_executor(
342 executor: Executor,
343 actor_context: &ActorContextRef,
344 has_stateful: bool,
345 subtasks: &mut Vec<SubtaskHandle>,
346 ) -> Executor {
347 let info = executor.info().clone();
348 let wrapped = WrapperExecutor::new(executor, actor_context.clone());
350 let executor = (info, wrapped).into();
351
352 if has_stateful {
354 let _ = subtasks;
360 }
361 executor
362 }
363
364 async fn create_nodes(
366 &self,
367 fragment_id: FragmentId,
368 node: &stream_plan::StreamNode,
369 env: StreamEnvironment,
370 actor_context: &ActorContextRef,
371 vnode_bitmap: Option<Bitmap>,
372 local_barrier_manager: &LocalBarrierManager,
373 ) -> StreamResult<(Executor, Vec<SubtaskHandle>)> {
374 let mut subtasks = vec![];
375
376 let executor = dispatch_state_store!(env.state_store(), store, {
377 self.create_nodes_inner(
378 fragment_id,
379 node,
380 env,
381 store,
382 actor_context,
383 vnode_bitmap,
384 false,
385 &mut subtasks,
386 local_barrier_manager,
387 )
388 .await
389 })?;
390
391 Ok((executor, subtasks))
392 }
393
394 fn get_overridden_config(
396 &self,
397 config_override: &str,
398 actor_id: ActorId,
399 ) -> Arc<StreamingConfig> {
400 self.config_override_cache
401 .get_with_by_ref(config_override, || {
402 let global = self.env.global_config();
403 match merge_streaming_config_section(global.as_ref(), config_override) {
404 Ok(Some(config)) => {
405 tracing::info!(%actor_id, "applied configuration override");
406 Arc::new(config)
407 }
408 Ok(None) => global.clone(), Err(e) => {
410 tracing::error!(
414 error = %e.as_report(),
415 %actor_id,
416 "failed to apply configuration override, use global config instead",
417 );
418 global.clone()
419 }
420 }
421 })
422 }
423
424 async fn create_actor(
425 self: Arc<Self>,
426 actor: BuildActorInfo,
427 fragment_id: FragmentId,
428 node: Arc<StreamNode>,
429 local_barrier_manager: LocalBarrierManager,
430 new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
431 actor_config: Arc<StreamingConfig>,
432 ) -> StreamResult<Actor<DispatchExecutor>> {
433 let actor_context = ActorContext::create(
434 &actor,
435 fragment_id,
436 self.env.total_mem_usage(),
437 self.streaming_metrics.clone(),
438 self.env.meta_client(),
439 actor_config,
440 self.env.clone(),
441 );
442 let vnode_bitmap = actor.vnode_bitmap.as_ref().map(|b| b.into());
443 let expr_context = actor.expr_context.clone().unwrap();
444
445 let (executor, subtasks) = self
446 .create_nodes(
447 fragment_id,
448 &node,
449 self.env.clone(),
450 &actor_context,
451 vnode_bitmap,
452 &local_barrier_manager,
453 )
454 .await?;
455
456 let dispatcher = DispatchExecutor::new(
457 executor,
458 new_output_request_rx,
459 actor.dispatchers,
460 &actor_context,
461 )
462 .await?;
463
464 let actor = Actor::new(
465 dispatcher,
466 subtasks,
467 self.streaming_metrics.clone(),
468 actor_context.clone(),
469 expr_context,
470 local_barrier_manager,
471 );
472 Ok(actor)
473 }
474
475 #[expect(clippy::too_many_arguments)]
476 async fn create_actor_with_log_store_dispatcher<S: StateStore>(
477 self: Arc<Self>,
478 actor: BuildActorInfo,
479 fragment_id: FragmentId,
480 node: Arc<StreamNode>,
481 local_barrier_manager: LocalBarrierManager,
482 new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
483 actor_config: Arc<StreamingConfig>,
484 sync: Box<SyncLogStoreNode>,
485 state_store: S,
486 ) -> StreamResult<Actor<SyncLogStoreDispatchExecutor<S>>> {
487 let actor_context = ActorContext::create(
488 &actor,
489 fragment_id,
490 self.env.total_mem_usage(),
491 self.streaming_metrics.clone(),
492 self.env.meta_client(),
493 actor_config,
494 self.env.clone(),
495 );
496 let vnode_bitmap = actor.vnode_bitmap.as_ref().map(|b| b.into());
497 let expr_context = actor.expr_context.clone().unwrap();
498
499 let [input] = node.input.as_slice() else {
500 bail!("SyncLogStoreNode should have exactly one input");
501 };
502
503 let (executor, subtasks) = self
504 .create_nodes(
505 fragment_id,
506 input,
507 self.env.clone(),
508 &actor_context,
509 vnode_bitmap.clone(),
510 &local_barrier_manager,
511 )
512 .await?;
513
514 let dispatcher = SyncLogStoreDispatchExecutor::new(
515 executor,
516 new_output_request_rx,
517 actor.dispatchers,
518 &actor_context,
519 sync.as_ref(),
520 vnode_bitmap,
521 state_store,
522 )
523 .await?;
524
525 let actor = Actor::new(
526 dispatcher,
527 subtasks,
528 self.streaming_metrics.clone(),
529 actor_context.clone(),
530 expr_context,
531 local_barrier_manager,
532 );
533 Ok(actor)
534 }
535
536 pub(super) fn spawn_actor(
537 self: &Arc<Self>,
538 actor: BuildActorInfo,
539 fragment_id: FragmentId,
540 node: Arc<StreamNode>,
541 local_barrier_manager: LocalBarrierManager,
542 new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
543 ) -> (JoinHandle<()>, Option<JoinHandle<()>>) {
544 let stream_actor_ref = &actor;
545 let actor_id = stream_actor_ref.actor_id;
546 let actor_config = self.get_overridden_config(&actor.config_override, actor_id);
547
548 let monitor = actor_config
549 .developer
550 .enable_actor_tokio_metrics
551 .then(tokio_metrics::TaskMonitor::new);
552
553 let handle = {
554 let trace_span = format!("Actor {actor_id}: `{}`", stream_actor_ref.mview_definition);
555 let barrier_manager = local_barrier_manager;
556 let node_body = node.get_node_body().unwrap().clone();
557 let use_sync_log_store_dispatcher =
558 !actor_config.developer.disable_sync_log_store_dispatcher;
559 let actor = match node_body {
561 NodeBody::SyncLogStore(sync) if use_sync_log_store_dispatcher => {
562 tracing::info!(
563 "SyncLogStoreDispatchExecutor is created for fragment {}",
564 fragment_id
565 );
566 dispatch_state_store!(self.env.state_store(), store, {
567 self
568 .clone()
569 .create_actor_with_log_store_dispatcher(
570 actor,
571 fragment_id,
572 node,
573 barrier_manager.clone(),
574 new_output_request_rx,
575 actor_config,
576 sync,
577 store,
578 )
579 .and_then(|actor| actor.run())
580 .map(move |result| {
581 if let Err(err) = result {
582 tracing::error!(%actor_id, error = ?err.as_report(), "actor exit with error");
583 barrier_manager.notify_failure(actor_id, err);
584 }
585 })
586 .boxed()
587 })
588 }
589 _ => {
590 self
591 .clone()
592 .create_actor(
593 actor,
594 fragment_id,
595 node,
596 barrier_manager.clone(),
597 new_output_request_rx,
598 actor_config,
599 )
600 .and_then(|actor| actor.run())
601 .map(move |result| {
602 if let Err(err) = result {
603 tracing::error!(%actor_id, error = ?err.as_report(), "actor exit with error");
606 barrier_manager.notify_failure(actor_id, err);
607 }
608 })
609 .boxed()
610 }
611 };
612 let traced = match &self.await_tree_reg {
613 Some(m) => m
614 .register(await_tree_key::Actor(actor_id), trace_span)
615 .instrument(actor)
616 .left_future(),
617 None => actor.right_future(),
618 };
619 let instrumented = match &monitor {
620 Some(m) => m.instrument(traced).left_future(),
621 None => traced.right_future(),
622 };
623 let may_track_hummock = instrumented.may_trace_hummock();
625
626 self.runtime.spawn(may_track_hummock)
627 };
628
629 let enable_count_metrics = self.streaming_metrics.level >= MetricLevel::Debug;
630 let monitor_handle = if let Some(monitor) = monitor {
631 let streaming_metrics = self.streaming_metrics.clone();
632 let actor_monitor_task = self.runtime.spawn(async move {
633 let metrics = streaming_metrics.new_actor_metrics(actor_id, fragment_id);
634 let mut interval = tokio::time::interval(Duration::from_secs(15));
635 for task_metrics in monitor.intervals() {
636 interval.tick().await; metrics
638 .actor_poll_duration
639 .inc_by(task_metrics.total_poll_duration.as_nanos() as u64);
640 metrics
641 .actor_idle_duration
642 .inc_by(task_metrics.total_idle_duration.as_nanos() as u64);
643 metrics
644 .actor_scheduled_duration
645 .inc_by(task_metrics.total_scheduled_duration.as_nanos() as u64);
646
647 if enable_count_metrics {
648 metrics.actor_poll_cnt.inc_by(task_metrics.total_poll_count);
649 metrics
650 .actor_idle_cnt
651 .inc_by(task_metrics.total_idled_count);
652 metrics
653 .actor_scheduled_cnt
654 .inc_by(task_metrics.total_scheduled_count);
655 }
656 }
657 });
658 Some(actor_monitor_task)
659 } else {
660 None
661 };
662 (handle, monitor_handle)
663 }
664}
665
666pub struct ExecutorParams {
670 pub env: StreamEnvironment,
671
672 pub info: ExecutorInfo,
674
675 pub executor_id: ExecutorId,
677
678 pub operator_id: GlobalOperatorId,
680
681 pub op_info: String,
684
685 pub input: Vec<Executor>,
687
688 pub fragment_id: FragmentId,
690
691 pub executor_stats: Arc<StreamingMetrics>,
693
694 pub actor_context: ActorContextRef,
696
697 pub vnode_bitmap: Option<Bitmap>,
699
700 pub eval_error_report: ActorEvalErrorReport,
702
703 pub watermark_epoch: AtomicU64Ref,
705
706 pub local_barrier_manager: LocalBarrierManager,
707
708 pub config: Arc<StreamingConfig>,
712}
713
714impl Debug for ExecutorParams {
715 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
716 f.debug_struct("ExecutorParams")
717 .field("info", &self.info)
718 .field("executor_id", &self.executor_id)
719 .field("operator_id", &self.operator_id)
720 .field("op_info", &self.op_info)
721 .field("input", &self.input.len())
722 .field("actor_id", &self.actor_context.id)
723 .finish_non_exhaustive()
724 }
725}