1use core::time::Duration;
16use std::fmt::Debug;
17use std::sync::Arc;
18
19use async_recursion::async_recursion;
20use futures::{FutureExt, TryFutureExt};
21use itertools::Itertools;
22use risingwave_common::bail;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{ColumnId, Field, Schema};
25use risingwave_common::config::{MetricLevel, StreamingConfig, merge_streaming_config_section};
26use risingwave_common::operator::{unique_executor_id, unique_operator_id};
27use risingwave_common::util::runtime::BackgroundShutdownRuntime;
28use risingwave_pb::id::{ExecutorId, GlobalOperatorId};
29use risingwave_pb::plan_common::StorageTableDesc;
30use risingwave_pb::stream_plan::stream_node::NodeBody;
31use risingwave_pb::stream_plan::{self, StreamNode, StreamScanNode, StreamScanType};
32use risingwave_pb::stream_service::inject_barrier_request::BuildActorInfo;
33use risingwave_storage::monitor::HummockTraceFutureExt;
34use risingwave_storage::table::batch_table::BatchTable;
35use risingwave_storage::{StateStore, dispatch_state_store};
36use thiserror_ext::AsReport;
37use tokio::sync::mpsc::UnboundedReceiver;
38use tokio::task::JoinHandle;
39
40use crate::common::table::state_table::StateTableBuilder;
41use crate::error::StreamResult;
42use crate::executor::monitor::StreamingMetrics;
43use crate::executor::subtask::SubtaskHandle;
44use crate::executor::{
45 Actor, ActorContext, ActorContextRef, DispatchExecutor, Execute, Executor, ExecutorInfo,
46 SnapshotBackfillExecutor, TroublemakerExecutor, WrapperExecutor,
47};
48use crate::from_proto::{MergeExecutorBuilder, create_executor};
49use crate::task::{
50 ActorEvalErrorReport, ActorId, AtomicU64Ref, FragmentId, LocalBarrierManager, NewOutputRequest,
51 StreamEnvironment, await_tree_key,
52};
53
54pub const CONFIG_OVERRIDE_CACHE_DEFAULT_CAPACITY: u64 = 256;
57pub type ConfigOverrideCache = moka::sync::Cache<String, Arc<StreamingConfig>>;
58
59pub(crate) struct StreamActorManager {
63 pub(super) env: StreamEnvironment,
64 pub(super) streaming_metrics: Arc<StreamingMetrics>,
65
66 pub(super) watermark_epoch: AtomicU64Ref,
68
69 pub(super) await_tree_reg: Option<await_tree::Registry>,
71
72 pub(super) runtime: BackgroundShutdownRuntime,
74
75 pub(super) config_override_cache: ConfigOverrideCache,
80}
81
82impl StreamActorManager {
83 fn get_executor_id(actor_context: &ActorContext, node: &StreamNode) -> ExecutorId {
84 unique_executor_id(actor_context.id, node.operator_id)
87 }
88
89 fn get_executor_info(node: &StreamNode, executor_id: ExecutorId) -> ExecutorInfo {
90 let schema: Schema = node.fields.iter().map(Field::from).collect();
91
92 let stream_key = node
93 .get_stream_key()
94 .iter()
95 .map(|idx| *idx as usize)
96 .collect::<Vec<_>>();
97
98 let stream_kind = node.stream_kind();
99
100 let identity = format!("{} {:X}", node.get_node_body().unwrap(), executor_id);
101
102 ExecutorInfo {
103 schema,
104 stream_key,
105 stream_kind,
106 identity,
107 id: executor_id,
108 }
109 }
110
111 async fn create_snapshot_backfill_node(
112 &self,
113 stream_node: &StreamNode,
114 node: &StreamScanNode,
115 actor_context: &ActorContextRef,
116 vnode_bitmap: Option<Bitmap>,
117 local_barrier_manager: &LocalBarrierManager,
118 state_store: impl StateStore,
119 ) -> StreamResult<Executor> {
120 let [upstream_node, _]: &[_; 2] = stream_node.input.as_slice().try_into().unwrap();
121 let chunk_size = actor_context.config.developer.chunk_size;
122
123 let info = Self::get_executor_info(
124 upstream_node,
125 Self::get_executor_id(actor_context, upstream_node),
126 );
127
128 let NodeBody::Merge(upstream_merge) = upstream_node.get_node_body()? else {
129 bail!("expect Merge as input of SnapshotBackfill");
130 };
131
132 let upstream = MergeExecutorBuilder::new_input(
133 local_barrier_manager.clone(),
134 self.streaming_metrics.clone(),
135 actor_context.clone(),
136 info,
137 upstream_merge,
138 chunk_size,
139 )
140 .await?;
141
142 let table_desc: &StorageTableDesc = node.get_table_desc()?;
143
144 let output_indices = node
145 .output_indices
146 .iter()
147 .map(|&i| i as usize)
148 .collect_vec();
149
150 let column_ids = node
151 .upstream_column_ids
152 .iter()
153 .map(ColumnId::from)
154 .collect_vec();
155
156 let progress = local_barrier_manager.register_create_mview_progress(actor_context);
157
158 let vnodes = vnode_bitmap.map(Arc::new);
159 let barrier_rx = local_barrier_manager.subscribe_barrier(actor_context.id);
160
161 let upstream_table =
162 BatchTable::new_partial(state_store.clone(), column_ids, vnodes.clone(), table_desc);
163
164 let state_table = node.get_state_table()?;
165 let state_table = StateTableBuilder::new(state_table, state_store.clone(), vnodes)
166 .enable_preload_all_rows_by_config(&actor_context.config)
167 .build()
168 .await;
169
170 let executor = SnapshotBackfillExecutor::new(
171 upstream_table,
172 state_table,
173 upstream,
174 output_indices,
175 actor_context.clone(),
176 progress,
177 chunk_size,
178 node.rate_limit.into(),
179 barrier_rx,
180 self.streaming_metrics.clone(),
181 node.snapshot_backfill_epoch,
182 )
183 .boxed();
184
185 let info = Self::get_executor_info(
186 stream_node,
187 Self::get_executor_id(actor_context, stream_node),
188 );
189
190 if crate::consistency::insane() {
191 let mut troubled_info = info.clone();
192 troubled_info.identity = format!("{} (troubled)", info.identity);
193 Ok((
194 info,
195 TroublemakerExecutor::new((troubled_info, executor).into(), chunk_size),
196 )
197 .into())
198 } else {
199 Ok((info, executor).into())
200 }
201 }
202
203 #[expect(clippy::too_many_arguments)]
205 #[async_recursion]
206 async fn create_nodes_inner(
207 &self,
208 fragment_id: FragmentId,
209 node: &stream_plan::StreamNode,
210 env: StreamEnvironment,
211 store: impl StateStore,
212 actor_context: &ActorContextRef,
213 vnode_bitmap: Option<Bitmap>,
214 has_stateful: bool,
215 subtasks: &mut Vec<SubtaskHandle>,
216 local_barrier_manager: &LocalBarrierManager,
217 ) -> StreamResult<Executor> {
218 fn is_stateful_executor(stream_node: &StreamNode) -> bool {
221 matches!(
222 stream_node.get_node_body().unwrap(),
223 NodeBody::HashAgg(_)
224 | NodeBody::HashJoin(_)
225 | NodeBody::DeltaIndexJoin(_)
226 | NodeBody::Lookup(_)
227 | NodeBody::StreamScan(_)
228 | NodeBody::StreamCdcScan(_)
229 | NodeBody::DynamicFilter(_)
230 | NodeBody::GroupTopN(_)
231 | NodeBody::Now(_)
232 )
233 }
234 let is_stateful = is_stateful_executor(node);
235
236 let executor = if let NodeBody::StreamScan(stream_scan) = node.get_node_body().unwrap()
237 && let Ok(StreamScanType::SnapshotBackfill) = stream_scan.get_stream_scan_type()
238 {
239 dispatch_state_store!(env.state_store(), store, {
240 self.create_snapshot_backfill_node(
241 node,
242 stream_scan,
243 actor_context,
244 vnode_bitmap,
245 local_barrier_manager,
246 store,
247 )
248 .await
249 })?
250 } else {
251 let mut input = Vec::with_capacity(node.input.iter().len());
253 for input_stream_node in &node.input {
254 input.push(
255 self.create_nodes_inner(
256 fragment_id,
257 input_stream_node,
258 env.clone(),
259 store.clone(),
260 actor_context,
261 vnode_bitmap.clone(),
262 has_stateful || is_stateful,
263 subtasks,
264 local_barrier_manager,
265 )
266 .await?,
267 );
268 }
269
270 self.generate_executor_from_inputs(
271 fragment_id,
272 node,
273 env,
274 store,
275 actor_context,
276 vnode_bitmap,
277 local_barrier_manager,
278 input,
279 )
280 .await?
281 };
282 Ok(Self::wrap_executor(
283 executor,
284 actor_context,
285 has_stateful || is_stateful,
286 subtasks,
287 ))
288 }
289
290 #[expect(clippy::too_many_arguments)]
291 async fn generate_executor_from_inputs(
292 &self,
293 fragment_id: FragmentId,
294 node: &stream_plan::StreamNode,
295 env: StreamEnvironment,
296 store: impl StateStore,
297 actor_context: &ActorContextRef,
298 vnode_bitmap: Option<Bitmap>,
299 local_barrier_manager: &LocalBarrierManager,
300 input: Vec<Executor>,
301 ) -> StreamResult<Executor> {
302 let op_info = node.get_identity().clone();
303
304 let executor_id = Self::get_executor_id(actor_context, node);
307 let operator_id = unique_operator_id(fragment_id, node.operator_id);
308
309 let info = Self::get_executor_info(node, executor_id);
310
311 let eval_error_report = ActorEvalErrorReport {
312 actor_context: actor_context.clone(),
313 identity: info.identity.clone().into(),
314 };
315
316 let executor_params = ExecutorParams {
318 env: env.clone(),
319
320 info: info.clone(),
321 executor_id,
322 operator_id,
323 op_info,
324 input,
325 fragment_id,
326 executor_stats: self.streaming_metrics.clone(),
327 actor_context: actor_context.clone(),
328 vnode_bitmap,
329 eval_error_report,
330 watermark_epoch: self.watermark_epoch.clone(),
331 local_barrier_manager: local_barrier_manager.clone(),
332 config: actor_context.config.clone(),
333 };
334
335 create_executor(executor_params, node, store).await
336 }
337
338 fn wrap_executor(
339 executor: Executor,
340 actor_context: &ActorContextRef,
341 has_stateful: bool,
342 subtasks: &mut Vec<SubtaskHandle>,
343 ) -> Executor {
344 let info = executor.info().clone();
345 let wrapped = WrapperExecutor::new(executor, actor_context.clone());
347 let executor = (info, wrapped).into();
348
349 if has_stateful {
351 let _ = subtasks;
357 }
358 executor
359 }
360
361 async fn create_nodes(
363 &self,
364 fragment_id: FragmentId,
365 node: &stream_plan::StreamNode,
366 env: StreamEnvironment,
367 actor_context: &ActorContextRef,
368 vnode_bitmap: Option<Bitmap>,
369 local_barrier_manager: &LocalBarrierManager,
370 ) -> StreamResult<(Executor, Vec<SubtaskHandle>)> {
371 let mut subtasks = vec![];
372
373 let executor = dispatch_state_store!(env.state_store(), store, {
374 self.create_nodes_inner(
375 fragment_id,
376 node,
377 env,
378 store,
379 actor_context,
380 vnode_bitmap,
381 false,
382 &mut subtasks,
383 local_barrier_manager,
384 )
385 .await
386 })?;
387
388 Ok((executor, subtasks))
389 }
390
391 fn get_overridden_config(
393 &self,
394 config_override: &str,
395 actor_id: ActorId,
396 ) -> Arc<StreamingConfig> {
397 self.config_override_cache
398 .get_with_by_ref(config_override, || {
399 let global = self.env.global_config();
400 match merge_streaming_config_section(global.as_ref(), config_override) {
401 Ok(Some(config)) => {
402 tracing::info!(%actor_id, "applied configuration override");
403 Arc::new(config)
404 }
405 Ok(None) => global.clone(), Err(e) => {
407 tracing::error!(
411 error = %e.as_report(),
412 %actor_id,
413 "failed to apply configuration override, use global config instead",
414 );
415 global.clone()
416 }
417 }
418 })
419 }
420
421 async fn create_actor(
422 self: Arc<Self>,
423 actor: BuildActorInfo,
424 fragment_id: FragmentId,
425 node: Arc<StreamNode>,
426 local_barrier_manager: LocalBarrierManager,
427 new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
428 actor_config: Arc<StreamingConfig>,
429 ) -> StreamResult<Actor<DispatchExecutor>> {
430 let actor_context = ActorContext::create(
431 &actor,
432 fragment_id,
433 self.env.total_mem_usage(),
434 self.streaming_metrics.clone(),
435 self.env.meta_client(),
436 actor_config,
437 self.env.clone(),
438 );
439 let vnode_bitmap = actor.vnode_bitmap.as_ref().map(|b| b.into());
440 let expr_context = actor.expr_context.clone().unwrap();
441
442 let (executor, subtasks) = self
443 .create_nodes(
444 fragment_id,
445 &node,
446 self.env.clone(),
447 &actor_context,
448 vnode_bitmap,
449 &local_barrier_manager,
450 )
451 .await?;
452
453 let dispatcher = DispatchExecutor::new(
454 executor,
455 new_output_request_rx,
456 actor.dispatchers,
457 &actor_context,
458 )
459 .await?;
460
461 let actor = Actor::new(
462 dispatcher,
463 subtasks,
464 self.streaming_metrics.clone(),
465 actor_context.clone(),
466 expr_context,
467 local_barrier_manager,
468 );
469 Ok(actor)
470 }
471
472 pub(super) fn spawn_actor(
473 self: &Arc<Self>,
474 actor: BuildActorInfo,
475 fragment_id: FragmentId,
476 node: Arc<StreamNode>,
477 local_barrier_manager: LocalBarrierManager,
478 new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
479 ) -> (JoinHandle<()>, Option<JoinHandle<()>>) {
480 let stream_actor_ref = &actor;
481 let actor_id = stream_actor_ref.actor_id;
482 let actor_config = self.get_overridden_config(&actor.config_override, actor_id);
483
484 let monitor = actor_config
485 .developer
486 .enable_actor_tokio_metrics
487 .then(tokio_metrics::TaskMonitor::new);
488
489 let handle = {
490 let trace_span = format!("Actor {actor_id}: `{}`", stream_actor_ref.mview_definition);
491 let barrier_manager = local_barrier_manager;
492 let actor = self
494 .clone()
495 .create_actor(
496 actor,
497 fragment_id,
498 node,
499 barrier_manager.clone(),
500 new_output_request_rx,
501 actor_config,
502 )
503 .boxed()
504 .and_then(|actor| actor.run())
505 .map(move |result| {
506 if let Err(err) = result {
507 tracing::error!(%actor_id, error = ?err.as_report(), "actor exit with error");
510 barrier_manager.notify_failure(actor_id, err);
511 }
512 });
513 let traced = match &self.await_tree_reg {
514 Some(m) => m
515 .register(await_tree_key::Actor(actor_id), trace_span)
516 .instrument(actor)
517 .left_future(),
518 None => actor.right_future(),
519 };
520 let instrumented = match &monitor {
521 Some(m) => m.instrument(traced).left_future(),
522 None => traced.right_future(),
523 };
524 let may_track_hummock = instrumented.may_trace_hummock();
526
527 self.runtime.spawn(may_track_hummock)
528 };
529
530 let enable_count_metrics = self.streaming_metrics.level >= MetricLevel::Debug;
531 let monitor_handle = if let Some(monitor) = monitor {
532 let streaming_metrics = self.streaming_metrics.clone();
533 let actor_monitor_task = self.runtime.spawn(async move {
534 let metrics = streaming_metrics.new_actor_metrics(actor_id, fragment_id);
535 let mut interval = tokio::time::interval(Duration::from_secs(15));
536 for task_metrics in monitor.intervals() {
537 interval.tick().await; metrics
539 .actor_poll_duration
540 .inc_by(task_metrics.total_poll_duration.as_nanos() as u64);
541 metrics
542 .actor_idle_duration
543 .inc_by(task_metrics.total_idle_duration.as_nanos() as u64);
544 metrics
545 .actor_scheduled_duration
546 .inc_by(task_metrics.total_scheduled_duration.as_nanos() as u64);
547
548 if enable_count_metrics {
549 metrics.actor_poll_cnt.inc_by(task_metrics.total_poll_count);
550 metrics
551 .actor_idle_cnt
552 .inc_by(task_metrics.total_idled_count);
553 metrics
554 .actor_scheduled_cnt
555 .inc_by(task_metrics.total_scheduled_count);
556 }
557 }
558 });
559 Some(actor_monitor_task)
560 } else {
561 None
562 };
563 (handle, monitor_handle)
564 }
565}
566
567pub struct ExecutorParams {
571 pub env: StreamEnvironment,
572
573 pub info: ExecutorInfo,
575
576 pub executor_id: ExecutorId,
578
579 pub operator_id: GlobalOperatorId,
581
582 pub op_info: String,
585
586 pub input: Vec<Executor>,
588
589 pub fragment_id: FragmentId,
591
592 pub executor_stats: Arc<StreamingMetrics>,
594
595 pub actor_context: ActorContextRef,
597
598 pub vnode_bitmap: Option<Bitmap>,
600
601 pub eval_error_report: ActorEvalErrorReport,
603
604 pub watermark_epoch: AtomicU64Ref,
606
607 pub local_barrier_manager: LocalBarrierManager,
608
609 pub config: Arc<StreamingConfig>,
613}
614
615impl Debug for ExecutorParams {
616 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
617 f.debug_struct("ExecutorParams")
618 .field("info", &self.info)
619 .field("executor_id", &self.executor_id)
620 .field("operator_id", &self.operator_id)
621 .field("op_info", &self.op_info)
622 .field("input", &self.input.len())
623 .field("actor_id", &self.actor_context.id)
624 .finish_non_exhaustive()
625 }
626}