1use core::time::Duration;
16use std::fmt::Debug;
17use std::sync::Arc;
18
19use async_recursion::async_recursion;
20use futures::{FutureExt, TryFutureExt};
21use itertools::Itertools;
22use risingwave_common::bitmap::Bitmap;
23use risingwave_common::catalog::{ColumnId, Field, Schema};
24use risingwave_common::config::{MetricLevel, StreamingConfig, merge_streaming_config_section};
25use risingwave_common::must_match;
26use risingwave_common::operator::{unique_executor_id, unique_operator_id};
27use risingwave_common::util::runtime::BackgroundShutdownRuntime;
28use risingwave_pb::plan_common::StorageTableDesc;
29use risingwave_pb::stream_plan::stream_node::NodeBody;
30use risingwave_pb::stream_plan::{self, StreamNode, StreamScanNode, StreamScanType};
31use risingwave_pb::stream_service::inject_barrier_request::BuildActorInfo;
32use risingwave_storage::monitor::HummockTraceFutureExt;
33use risingwave_storage::table::batch_table::BatchTable;
34use risingwave_storage::{StateStore, dispatch_state_store};
35use thiserror_ext::AsReport;
36use tokio::sync::mpsc::UnboundedReceiver;
37use tokio::task::JoinHandle;
38
39use crate::common::table::state_table::StateTableBuilder;
40use crate::error::StreamResult;
41use crate::executor::monitor::StreamingMetrics;
42use crate::executor::subtask::SubtaskHandle;
43use crate::executor::{
44 Actor, ActorContext, ActorContextRef, DispatchExecutor, Execute, Executor, ExecutorInfo,
45 MergeExecutorInput, SnapshotBackfillExecutor, TroublemakerExecutor, WrapperExecutor,
46};
47use crate::from_proto::{MergeExecutorBuilder, create_executor};
48use crate::task::{
49 ActorEvalErrorReport, ActorId, AtomicU64Ref, FragmentId, LocalBarrierManager, NewOutputRequest,
50 StreamEnvironment, await_tree_key,
51};
52
53pub const CONFIG_OVERRIDE_CACHE_DEFAULT_CAPACITY: u64 = 256;
56pub type ConfigOverrideCache = moka::sync::Cache<String, Arc<StreamingConfig>>;
57
58pub(crate) struct StreamActorManager {
62 pub(super) env: StreamEnvironment,
63 pub(super) streaming_metrics: Arc<StreamingMetrics>,
64
65 pub(super) watermark_epoch: AtomicU64Ref,
67
68 pub(super) await_tree_reg: Option<await_tree::Registry>,
70
71 pub(super) runtime: BackgroundShutdownRuntime,
73
74 pub(super) config_override_cache: ConfigOverrideCache,
79}
80
81impl StreamActorManager {
82 fn get_executor_id(actor_context: &ActorContext, node: &StreamNode) -> u64 {
83 unique_executor_id(actor_context.id, node.operator_id)
86 }
87
88 fn get_executor_info(node: &StreamNode, executor_id: u64) -> ExecutorInfo {
89 let schema: Schema = node.fields.iter().map(Field::from).collect();
90
91 let stream_key = node
92 .get_stream_key()
93 .iter()
94 .map(|idx| *idx as usize)
95 .collect::<Vec<_>>();
96
97 let stream_kind = node.stream_kind();
98
99 let identity = format!("{} {:X}", node.get_node_body().unwrap(), executor_id);
100
101 ExecutorInfo {
102 schema,
103 stream_key,
104 stream_kind,
105 identity,
106 id: executor_id,
107 }
108 }
109
110 async fn create_snapshot_backfill_input(
111 &self,
112 upstream_node: &StreamNode,
113 actor_context: &ActorContextRef,
114 local_barrier_manager: &LocalBarrierManager,
115 chunk_size: usize,
116 ) -> StreamResult<MergeExecutorInput> {
117 let info = Self::get_executor_info(
118 upstream_node,
119 Self::get_executor_id(actor_context, upstream_node),
120 );
121
122 let upstream_merge = must_match!(upstream_node.get_node_body().unwrap(), NodeBody::Merge(upstream_merge) => {
123 upstream_merge
124 });
125
126 MergeExecutorBuilder::new_input(
127 local_barrier_manager.clone(),
128 self.streaming_metrics.clone(),
129 actor_context.clone(),
130 info,
131 upstream_merge,
132 chunk_size,
133 )
134 .await
135 }
136
137 async fn create_snapshot_backfill_node(
138 &self,
139 stream_node: &StreamNode,
140 node: &StreamScanNode,
141 actor_context: &ActorContextRef,
142 vnode_bitmap: Option<Bitmap>,
143 local_barrier_manager: &LocalBarrierManager,
144 state_store: impl StateStore,
145 ) -> StreamResult<Executor> {
146 let [upstream_node, _]: &[_; 2] = stream_node.input.as_slice().try_into().unwrap();
147 let chunk_size = actor_context.config.developer.chunk_size;
148 let upstream = self
149 .create_snapshot_backfill_input(
150 upstream_node,
151 actor_context,
152 local_barrier_manager,
153 chunk_size,
154 )
155 .await?;
156
157 let table_desc: &StorageTableDesc = node.get_table_desc()?;
158
159 let output_indices = node
160 .output_indices
161 .iter()
162 .map(|&i| i as usize)
163 .collect_vec();
164
165 let column_ids = node
166 .upstream_column_ids
167 .iter()
168 .map(ColumnId::from)
169 .collect_vec();
170
171 let progress = local_barrier_manager.register_create_mview_progress(actor_context);
172
173 let vnodes = vnode_bitmap.map(Arc::new);
174 let barrier_rx = local_barrier_manager.subscribe_barrier(actor_context.id);
175
176 let upstream_table =
177 BatchTable::new_partial(state_store.clone(), column_ids, vnodes.clone(), table_desc);
178
179 let state_table = node.get_state_table()?;
180 let state_table = StateTableBuilder::new(state_table, state_store.clone(), vnodes)
181 .enable_preload_all_rows_by_config(&actor_context.config)
182 .build()
183 .await;
184
185 let executor = SnapshotBackfillExecutor::new(
186 upstream_table,
187 state_table,
188 upstream,
189 output_indices,
190 actor_context.clone(),
191 progress,
192 chunk_size,
193 node.rate_limit.into(),
194 barrier_rx,
195 self.streaming_metrics.clone(),
196 node.snapshot_backfill_epoch,
197 )
198 .boxed();
199
200 let info = Self::get_executor_info(
201 stream_node,
202 Self::get_executor_id(actor_context, stream_node),
203 );
204
205 if crate::consistency::insane() {
206 let mut troubled_info = info.clone();
207 troubled_info.identity = format!("{} (troubled)", info.identity);
208 Ok((
209 info,
210 TroublemakerExecutor::new((troubled_info, executor).into(), chunk_size),
211 )
212 .into())
213 } else {
214 Ok((info, executor).into())
215 }
216 }
217
218 #[expect(clippy::too_many_arguments)]
220 #[async_recursion]
221 async fn create_nodes_inner(
222 &self,
223 fragment_id: FragmentId,
224 node: &stream_plan::StreamNode,
225 env: StreamEnvironment,
226 store: impl StateStore,
227 actor_context: &ActorContextRef,
228 vnode_bitmap: Option<Bitmap>,
229 has_stateful: bool,
230 subtasks: &mut Vec<SubtaskHandle>,
231 local_barrier_manager: &LocalBarrierManager,
232 ) -> StreamResult<Executor> {
233 if let NodeBody::StreamScan(stream_scan) = node.get_node_body().unwrap()
234 && let Ok(StreamScanType::SnapshotBackfill) = stream_scan.get_stream_scan_type()
235 {
236 return dispatch_state_store!(env.state_store(), store, {
237 self.create_snapshot_backfill_node(
238 node,
239 stream_scan,
240 actor_context,
241 vnode_bitmap,
242 local_barrier_manager,
243 store,
244 )
245 .await
246 });
247 }
248
249 fn is_stateful_executor(stream_node: &StreamNode) -> bool {
252 matches!(
253 stream_node.get_node_body().unwrap(),
254 NodeBody::HashAgg(_)
255 | NodeBody::HashJoin(_)
256 | NodeBody::DeltaIndexJoin(_)
257 | NodeBody::Lookup(_)
258 | NodeBody::StreamScan(_)
259 | NodeBody::StreamCdcScan(_)
260 | NodeBody::DynamicFilter(_)
261 | NodeBody::GroupTopN(_)
262 | NodeBody::Now(_)
263 )
264 }
265 let is_stateful = is_stateful_executor(node);
266
267 let mut input = Vec::with_capacity(node.input.iter().len());
269 for input_stream_node in &node.input {
270 input.push(
271 self.create_nodes_inner(
272 fragment_id,
273 input_stream_node,
274 env.clone(),
275 store.clone(),
276 actor_context,
277 vnode_bitmap.clone(),
278 has_stateful || is_stateful,
279 subtasks,
280 local_barrier_manager,
281 )
282 .await?,
283 );
284 }
285
286 self.generate_executor_from_inputs(
287 fragment_id,
288 node,
289 env,
290 store,
291 actor_context,
292 vnode_bitmap,
293 has_stateful || is_stateful,
294 subtasks,
295 local_barrier_manager,
296 input,
297 )
298 .await
299 }
300
301 #[expect(clippy::too_many_arguments)]
302 async fn generate_executor_from_inputs(
303 &self,
304 fragment_id: FragmentId,
305 node: &stream_plan::StreamNode,
306 env: StreamEnvironment,
307 store: impl StateStore,
308 actor_context: &ActorContextRef,
309 vnode_bitmap: Option<Bitmap>,
310 has_stateful: bool,
311 subtasks: &mut Vec<SubtaskHandle>,
312 local_barrier_manager: &LocalBarrierManager,
313 input: Vec<Executor>,
314 ) -> StreamResult<Executor> {
315 let op_info = node.get_identity().clone();
316
317 let executor_id = Self::get_executor_id(actor_context, node);
320 let operator_id = unique_operator_id(fragment_id, node.operator_id);
321
322 let info = Self::get_executor_info(node, executor_id);
323
324 let eval_error_report = ActorEvalErrorReport {
325 actor_context: actor_context.clone(),
326 identity: info.identity.clone().into(),
327 };
328
329 let executor_params = ExecutorParams {
331 env: env.clone(),
332
333 info: info.clone(),
334 executor_id,
335 operator_id,
336 op_info,
337 input,
338 fragment_id,
339 executor_stats: self.streaming_metrics.clone(),
340 actor_context: actor_context.clone(),
341 vnode_bitmap,
342 eval_error_report,
343 watermark_epoch: self.watermark_epoch.clone(),
344 local_barrier_manager: local_barrier_manager.clone(),
345 config: actor_context.config.clone(),
346 };
347
348 let executor = create_executor(executor_params, node, store).await?;
349
350 let wrapped = WrapperExecutor::new(executor, actor_context.clone());
352 let executor = (info, wrapped).into();
353
354 let executor = if has_stateful {
356 let _ = subtasks;
362 executor
363 } else {
364 executor
365 };
366
367 Ok(executor)
368 }
369
370 async fn create_nodes(
372 &self,
373 fragment_id: FragmentId,
374 node: &stream_plan::StreamNode,
375 env: StreamEnvironment,
376 actor_context: &ActorContextRef,
377 vnode_bitmap: Option<Bitmap>,
378 local_barrier_manager: &LocalBarrierManager,
379 ) -> StreamResult<(Executor, Vec<SubtaskHandle>)> {
380 let mut subtasks = vec![];
381
382 let executor = dispatch_state_store!(env.state_store(), store, {
383 self.create_nodes_inner(
384 fragment_id,
385 node,
386 env,
387 store,
388 actor_context,
389 vnode_bitmap,
390 false,
391 &mut subtasks,
392 local_barrier_manager,
393 )
394 .await
395 })?;
396
397 Ok((executor, subtasks))
398 }
399
400 fn get_overridden_config(
402 &self,
403 config_override: &str,
404 actor_id: ActorId,
405 ) -> Arc<StreamingConfig> {
406 self.config_override_cache
407 .get_with_by_ref(config_override, || {
408 let global = self.env.global_config();
409 match merge_streaming_config_section(global.as_ref(), config_override) {
410 Ok(Some(config)) => {
411 tracing::info!(%actor_id, "applied configuration override");
412 Arc::new(config)
413 }
414 Ok(None) => global.clone(), Err(e) => {
416 tracing::error!(
420 error = %e.as_report(),
421 %actor_id,
422 "failed to apply configuration override, use global config instead",
423 );
424 global.clone()
425 }
426 }
427 })
428 }
429
430 async fn create_actor(
431 self: Arc<Self>,
432 actor: BuildActorInfo,
433 fragment_id: FragmentId,
434 node: Arc<StreamNode>,
435 local_barrier_manager: LocalBarrierManager,
436 new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
437 ) -> StreamResult<Actor<DispatchExecutor>> {
438 let actor_id = actor.actor_id;
439 let actor_config = self.get_overridden_config(&actor.config_override, actor_id);
440
441 let actor_context = ActorContext::create(
442 &actor,
443 fragment_id,
444 self.env.total_mem_usage(),
445 self.streaming_metrics.clone(),
446 self.env.meta_client(),
447 actor_config,
448 self.env.clone(),
449 );
450 let vnode_bitmap = actor.vnode_bitmap.as_ref().map(|b| b.into());
451 let expr_context = actor.expr_context.clone().unwrap();
452
453 let (executor, subtasks) = self
454 .create_nodes(
455 fragment_id,
456 &node,
457 self.env.clone(),
458 &actor_context,
459 vnode_bitmap,
460 &local_barrier_manager,
461 )
462 .await?;
463
464 let dispatcher = DispatchExecutor::new(
465 executor,
466 new_output_request_rx,
467 actor.dispatchers,
468 actor_id,
469 fragment_id,
470 local_barrier_manager.clone(),
471 self.streaming_metrics.clone(),
472 )
473 .await?;
474 let actor = Actor::new(
475 dispatcher,
476 subtasks,
477 self.streaming_metrics.clone(),
478 actor_context.clone(),
479 expr_context,
480 local_barrier_manager,
481 );
482 Ok(actor)
483 }
484
485 pub(super) fn spawn_actor(
486 self: &Arc<Self>,
487 actor: BuildActorInfo,
488 fragment_id: FragmentId,
489 node: Arc<StreamNode>,
490 local_barrier_manager: LocalBarrierManager,
491 new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
492 ) -> (JoinHandle<()>, Option<JoinHandle<()>>) {
493 let monitor = tokio_metrics::TaskMonitor::new();
494 let stream_actor_ref = &actor;
495 let actor_id = stream_actor_ref.actor_id;
496 let handle = {
497 let trace_span = format!("Actor {actor_id}: `{}`", stream_actor_ref.mview_definition);
498 let barrier_manager = local_barrier_manager;
499 let actor = self
501 .clone()
502 .create_actor(
503 actor,
504 fragment_id,
505 node,
506 barrier_manager.clone(),
507 new_output_request_rx,
508 )
509 .boxed()
510 .and_then(|actor| actor.run())
511 .map(move |result| {
512 if let Err(err) = result {
513 tracing::error!(%actor_id, error = ?err.as_report(), "actor exit with error");
516 barrier_manager.notify_failure(actor_id, err);
517 }
518 });
519 let traced = match &self.await_tree_reg {
520 Some(m) => m
521 .register(await_tree_key::Actor(actor_id), trace_span)
522 .instrument(actor)
523 .left_future(),
524 None => actor.right_future(),
525 };
526 let instrumented = monitor.instrument(traced);
527 let may_track_hummock = instrumented.may_trace_hummock();
529
530 self.runtime.spawn(may_track_hummock)
531 };
532
533 let enable_count_metrics = self.streaming_metrics.level >= MetricLevel::Debug;
534 let monitor_handle = if self
535 .env
536 .global_config()
537 .developer
538 .enable_actor_tokio_metrics
539 {
540 let streaming_metrics = self.streaming_metrics.clone();
541 let actor_monitor_task = self.runtime.spawn(async move {
542 let metrics = streaming_metrics.new_actor_metrics(actor_id, fragment_id);
543 let mut interval = tokio::time::interval(Duration::from_secs(15));
544 for task_metrics in monitor.intervals() {
545 interval.tick().await; metrics
547 .actor_poll_duration
548 .inc_by(task_metrics.total_poll_duration.as_nanos() as u64);
549 metrics
550 .actor_idle_duration
551 .inc_by(task_metrics.total_idle_duration.as_nanos() as u64);
552 metrics
553 .actor_scheduled_duration
554 .inc_by(task_metrics.total_scheduled_duration.as_nanos() as u64);
555
556 if enable_count_metrics {
557 metrics.actor_poll_cnt.inc_by(task_metrics.total_poll_count);
558 metrics
559 .actor_idle_cnt
560 .inc_by(task_metrics.total_idled_count);
561 metrics
562 .actor_scheduled_cnt
563 .inc_by(task_metrics.total_scheduled_count);
564 }
565 }
566 });
567 Some(actor_monitor_task)
568 } else {
569 None
570 };
571 (handle, monitor_handle)
572 }
573}
574
575pub struct ExecutorParams {
579 pub env: StreamEnvironment,
580
581 pub info: ExecutorInfo,
583
584 pub executor_id: u64,
586
587 pub operator_id: u64,
589
590 pub op_info: String,
593
594 pub input: Vec<Executor>,
596
597 pub fragment_id: FragmentId,
599
600 pub executor_stats: Arc<StreamingMetrics>,
602
603 pub actor_context: ActorContextRef,
605
606 pub vnode_bitmap: Option<Bitmap>,
608
609 pub eval_error_report: ActorEvalErrorReport,
611
612 pub watermark_epoch: AtomicU64Ref,
614
615 pub local_barrier_manager: LocalBarrierManager,
616
617 pub config: Arc<StreamingConfig>,
621}
622
623impl Debug for ExecutorParams {
624 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
625 f.debug_struct("ExecutorParams")
626 .field("info", &self.info)
627 .field("executor_id", &self.executor_id)
628 .field("operator_id", &self.operator_id)
629 .field("op_info", &self.op_info)
630 .field("input", &self.input.len())
631 .field("actor_id", &self.actor_context.id)
632 .finish_non_exhaustive()
633 }
634}