1use core::time::Duration;
16use std::fmt::Debug;
17use std::sync::Arc;
18
19use async_recursion::async_recursion;
20use futures::{FutureExt, TryFutureExt};
21use itertools::Itertools;
22use risingwave_common::bitmap::Bitmap;
23use risingwave_common::catalog::{ColumnId, Field, Schema};
24use risingwave_common::config::MetricLevel;
25use risingwave_common::must_match;
26use risingwave_common::operator::{unique_executor_id, unique_operator_id};
27use risingwave_common::util::runtime::BackgroundShutdownRuntime;
28use risingwave_pb::plan_common::StorageTableDesc;
29use risingwave_pb::stream_plan::stream_node::NodeBody;
30use risingwave_pb::stream_plan::{self, StreamNode, StreamScanNode, StreamScanType};
31use risingwave_pb::stream_service::inject_barrier_request::BuildActorInfo;
32use risingwave_storage::monitor::HummockTraceFutureExt;
33use risingwave_storage::table::batch_table::BatchTable;
34use risingwave_storage::{StateStore, dispatch_state_store};
35use thiserror_ext::AsReport;
36use tokio::sync::mpsc::UnboundedReceiver;
37use tokio::task::JoinHandle;
38
39use crate::common::table::state_table::StateTableBuilder;
40use crate::error::StreamResult;
41use crate::executor::monitor::StreamingMetrics;
42use crate::executor::subtask::SubtaskHandle;
43use crate::executor::{
44 Actor, ActorContext, ActorContextRef, DispatchExecutor, Execute, Executor, ExecutorInfo,
45 MergeExecutorInput, SnapshotBackfillExecutor, TroublemakerExecutor, WrapperExecutor,
46};
47use crate::from_proto::{MergeExecutorBuilder, create_executor};
48use crate::task::{
49 ActorEvalErrorReport, ActorId, AtomicU64Ref, FragmentId, LocalBarrierManager, NewOutputRequest,
50 StreamEnvironment, await_tree_key,
51};
52
53pub(crate) struct StreamActorManager {
57 pub(super) env: StreamEnvironment,
58 pub(super) streaming_metrics: Arc<StreamingMetrics>,
59
60 pub(super) watermark_epoch: AtomicU64Ref,
62
63 pub(super) await_tree_reg: Option<await_tree::Registry>,
65
66 pub(super) runtime: BackgroundShutdownRuntime,
68}
69
70impl StreamActorManager {
71 fn get_executor_id(actor_context: &ActorContext, node: &StreamNode) -> u64 {
72 unique_executor_id(actor_context.id, node.operator_id)
75 }
76
77 fn get_executor_info(node: &StreamNode, executor_id: u64) -> ExecutorInfo {
78 let schema: Schema = node.fields.iter().map(Field::from).collect();
79
80 let pk_indices = node
81 .get_stream_key()
82 .iter()
83 .map(|idx| *idx as usize)
84 .collect::<Vec<_>>();
85
86 let stream_kind = node.stream_kind();
87
88 let identity = format!("{} {:X}", node.get_node_body().unwrap(), executor_id);
89
90 ExecutorInfo {
91 schema,
92 pk_indices,
93 stream_kind,
94 identity,
95 id: executor_id,
96 }
97 }
98
99 async fn create_snapshot_backfill_input(
100 &self,
101 upstream_node: &StreamNode,
102 actor_context: &ActorContextRef,
103 local_barrier_manager: &LocalBarrierManager,
104 chunk_size: usize,
105 ) -> StreamResult<MergeExecutorInput> {
106 let info = Self::get_executor_info(
107 upstream_node,
108 Self::get_executor_id(actor_context, upstream_node),
109 );
110
111 let upstream_merge = must_match!(upstream_node.get_node_body().unwrap(), NodeBody::Merge(upstream_merge) => {
112 upstream_merge
113 });
114
115 MergeExecutorBuilder::new_input(
116 local_barrier_manager.clone(),
117 self.streaming_metrics.clone(),
118 actor_context.clone(),
119 info,
120 upstream_merge,
121 chunk_size,
122 )
123 .await
124 }
125
126 #[expect(clippy::too_many_arguments)]
127 async fn create_snapshot_backfill_node(
128 &self,
129 stream_node: &StreamNode,
130 node: &StreamScanNode,
131 actor_context: &ActorContextRef,
132 vnode_bitmap: Option<Bitmap>,
133 env: StreamEnvironment,
134 local_barrier_manager: &LocalBarrierManager,
135 state_store: impl StateStore,
136 ) -> StreamResult<Executor> {
137 let [upstream_node, _]: &[_; 2] = stream_node.input.as_slice().try_into().unwrap();
138 let chunk_size = env.config().developer.chunk_size;
139 let upstream = self
140 .create_snapshot_backfill_input(
141 upstream_node,
142 actor_context,
143 local_barrier_manager,
144 chunk_size,
145 )
146 .await?;
147
148 let table_desc: &StorageTableDesc = node.get_table_desc()?;
149
150 let output_indices = node
151 .output_indices
152 .iter()
153 .map(|&i| i as usize)
154 .collect_vec();
155
156 let column_ids = node
157 .upstream_column_ids
158 .iter()
159 .map(ColumnId::from)
160 .collect_vec();
161
162 let progress = local_barrier_manager.register_create_mview_progress(actor_context.id);
163
164 let vnodes = vnode_bitmap.map(Arc::new);
165 let barrier_rx = local_barrier_manager.subscribe_barrier(actor_context.id);
166
167 let upstream_table =
168 BatchTable::new_partial(state_store.clone(), column_ids, vnodes.clone(), table_desc);
169
170 let state_table = node.get_state_table()?;
171 let state_table = StateTableBuilder::new(state_table, state_store.clone(), vnodes)
172 .enable_preload_all_rows_by_config(&actor_context.streaming_config)
173 .build()
174 .await;
175
176 let executor = SnapshotBackfillExecutor::new(
177 upstream_table,
178 state_table,
179 upstream,
180 output_indices,
181 actor_context.clone(),
182 progress,
183 chunk_size,
184 node.rate_limit.into(),
185 barrier_rx,
186 self.streaming_metrics.clone(),
187 node.snapshot_backfill_epoch,
188 )
189 .boxed();
190
191 let info = Self::get_executor_info(
192 stream_node,
193 Self::get_executor_id(actor_context, stream_node),
194 );
195
196 if crate::consistency::insane() {
197 let mut troubled_info = info.clone();
198 troubled_info.identity = format!("{} (troubled)", info.identity);
199 Ok((
200 info,
201 TroublemakerExecutor::new((troubled_info, executor).into(), chunk_size),
202 )
203 .into())
204 } else {
205 Ok((info, executor).into())
206 }
207 }
208
209 #[expect(clippy::too_many_arguments)]
211 #[async_recursion]
212 async fn create_nodes_inner(
213 &self,
214 fragment_id: FragmentId,
215 node: &stream_plan::StreamNode,
216 env: StreamEnvironment,
217 store: impl StateStore,
218 actor_context: &ActorContextRef,
219 vnode_bitmap: Option<Bitmap>,
220 has_stateful: bool,
221 subtasks: &mut Vec<SubtaskHandle>,
222 local_barrier_manager: &LocalBarrierManager,
223 ) -> StreamResult<Executor> {
224 if let NodeBody::StreamScan(stream_scan) = node.get_node_body().unwrap()
225 && let Ok(StreamScanType::SnapshotBackfill) = stream_scan.get_stream_scan_type()
226 {
227 return dispatch_state_store!(env.state_store(), store, {
228 self.create_snapshot_backfill_node(
229 node,
230 stream_scan,
231 actor_context,
232 vnode_bitmap,
233 env,
234 local_barrier_manager,
235 store,
236 )
237 .await
238 });
239 }
240
241 fn is_stateful_executor(stream_node: &StreamNode) -> bool {
244 matches!(
245 stream_node.get_node_body().unwrap(),
246 NodeBody::HashAgg(_)
247 | NodeBody::HashJoin(_)
248 | NodeBody::DeltaIndexJoin(_)
249 | NodeBody::Lookup(_)
250 | NodeBody::StreamScan(_)
251 | NodeBody::StreamCdcScan(_)
252 | NodeBody::DynamicFilter(_)
253 | NodeBody::GroupTopN(_)
254 | NodeBody::Now(_)
255 )
256 }
257 let is_stateful = is_stateful_executor(node);
258
259 let mut input = Vec::with_capacity(node.input.iter().len());
261 for input_stream_node in &node.input {
262 input.push(
263 self.create_nodes_inner(
264 fragment_id,
265 input_stream_node,
266 env.clone(),
267 store.clone(),
268 actor_context,
269 vnode_bitmap.clone(),
270 has_stateful || is_stateful,
271 subtasks,
272 local_barrier_manager,
273 )
274 .await?,
275 );
276 }
277
278 self.generate_executor_from_inputs(
279 fragment_id,
280 node,
281 env,
282 store,
283 actor_context,
284 vnode_bitmap,
285 has_stateful || is_stateful,
286 subtasks,
287 local_barrier_manager,
288 input,
289 )
290 .await
291 }
292
293 #[expect(clippy::too_many_arguments)]
294 async fn generate_executor_from_inputs(
295 &self,
296 fragment_id: FragmentId,
297 node: &stream_plan::StreamNode,
298 env: StreamEnvironment,
299 store: impl StateStore,
300 actor_context: &ActorContextRef,
301 vnode_bitmap: Option<Bitmap>,
302 has_stateful: bool,
303 subtasks: &mut Vec<SubtaskHandle>,
304 local_barrier_manager: &LocalBarrierManager,
305 input: Vec<Executor>,
306 ) -> StreamResult<Executor> {
307 let op_info = node.get_identity().clone();
308
309 let executor_id = Self::get_executor_id(actor_context, node);
312 let operator_id = unique_operator_id(fragment_id, node.operator_id);
313
314 let info = Self::get_executor_info(node, executor_id);
315
316 let eval_error_report = ActorEvalErrorReport {
317 actor_context: actor_context.clone(),
318 identity: info.identity.clone().into(),
319 };
320
321 let executor_params = ExecutorParams {
323 env: env.clone(),
324
325 info: info.clone(),
326 executor_id,
327 operator_id,
328 op_info,
329 input,
330 fragment_id,
331 executor_stats: self.streaming_metrics.clone(),
332 actor_context: actor_context.clone(),
333 vnode_bitmap,
334 eval_error_report,
335 watermark_epoch: self.watermark_epoch.clone(),
336 local_barrier_manager: local_barrier_manager.clone(),
337 };
338
339 let executor = create_executor(executor_params, node, store).await?;
340
341 let wrapped = WrapperExecutor::new(
343 executor,
344 actor_context.clone(),
345 env.config().developer.enable_executor_row_count,
346 env.config().developer.enable_explain_analyze_stats,
347 );
348 let executor = (info, wrapped).into();
349
350 let executor = if has_stateful {
352 let _ = subtasks;
358 executor
359 } else {
360 executor
361 };
362
363 Ok(executor)
364 }
365
366 async fn create_nodes(
368 &self,
369 fragment_id: FragmentId,
370 node: &stream_plan::StreamNode,
371 env: StreamEnvironment,
372 actor_context: &ActorContextRef,
373 vnode_bitmap: Option<Bitmap>,
374 local_barrier_manager: &LocalBarrierManager,
375 ) -> StreamResult<(Executor, Vec<SubtaskHandle>)> {
376 let mut subtasks = vec![];
377
378 let executor = dispatch_state_store!(env.state_store(), store, {
379 self.create_nodes_inner(
380 fragment_id,
381 node,
382 env,
383 store,
384 actor_context,
385 vnode_bitmap,
386 false,
387 &mut subtasks,
388 local_barrier_manager,
389 )
390 .await
391 })?;
392
393 Ok((executor, subtasks))
394 }
395
396 async fn create_actor(
397 self: Arc<Self>,
398 actor: BuildActorInfo,
399 fragment_id: FragmentId,
400 node: Arc<StreamNode>,
401 local_barrier_manager: LocalBarrierManager,
402 new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
403 ) -> StreamResult<Actor<DispatchExecutor>> {
404 {
405 let actor_id = actor.actor_id;
406 let streaming_config = self.env.config().clone();
407 let actor_context = ActorContext::create(
408 &actor,
409 fragment_id,
410 self.env.total_mem_usage(),
411 self.streaming_metrics.clone(),
412 self.env.meta_client(),
413 streaming_config,
414 self.env.clone(),
415 );
416 let vnode_bitmap = actor.vnode_bitmap.as_ref().map(|b| b.into());
417 let expr_context = actor.expr_context.clone().unwrap();
418
419 let (executor, subtasks) = self
420 .create_nodes(
421 fragment_id,
422 &node,
423 self.env.clone(),
424 &actor_context,
425 vnode_bitmap,
426 &local_barrier_manager,
427 )
428 .await?;
429
430 let dispatcher = DispatchExecutor::new(
431 executor,
432 new_output_request_rx,
433 actor.dispatchers,
434 actor_id,
435 fragment_id,
436 local_barrier_manager.clone(),
437 self.streaming_metrics.clone(),
438 )
439 .await?;
440 let actor = Actor::new(
441 dispatcher,
442 subtasks,
443 self.streaming_metrics.clone(),
444 actor_context.clone(),
445 expr_context,
446 local_barrier_manager,
447 );
448 Ok(actor)
449 }
450 }
451
452 pub(super) fn spawn_actor(
453 self: &Arc<Self>,
454 actor: BuildActorInfo,
455 fragment_id: FragmentId,
456 node: Arc<StreamNode>,
457 local_barrier_manager: LocalBarrierManager,
458 new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
459 ) -> (JoinHandle<()>, Option<JoinHandle<()>>) {
460 {
461 let monitor = tokio_metrics::TaskMonitor::new();
462 let stream_actor_ref = &actor;
463 let actor_id = stream_actor_ref.actor_id;
464 let handle = {
465 let trace_span =
466 format!("Actor {actor_id}: `{}`", stream_actor_ref.mview_definition);
467 let barrier_manager = local_barrier_manager;
468 let actor = self
470 .clone()
471 .create_actor(
472 actor,
473 fragment_id,
474 node,
475 barrier_manager.clone(),
476 new_output_request_rx
477 ).boxed().and_then(|actor| actor.run()).map(move |result| {
478 if let Err(err) = result {
479 tracing::error!(actor_id, error = ?err.as_report(), "actor exit with error");
482 barrier_manager.notify_failure(actor_id, err);
483 }
484 });
485 let traced = match &self.await_tree_reg {
486 Some(m) => m
487 .register(await_tree_key::Actor(actor_id), trace_span)
488 .instrument(actor)
489 .left_future(),
490 None => actor.right_future(),
491 };
492 let instrumented = monitor.instrument(traced);
493 let with_config = crate::CONFIG.scope(self.env.config().clone(), instrumented);
494 let may_track_hummock = with_config.may_trace_hummock();
496
497 self.runtime.spawn(may_track_hummock)
498 };
499
500 let monitor_handle = if self.streaming_metrics.level >= MetricLevel::Debug
501 || self.env.config().developer.enable_actor_tokio_metrics
502 {
503 tracing::info!("Tokio metrics are enabled.");
504 let streaming_metrics = self.streaming_metrics.clone();
505 let actor_monitor_task = self.runtime.spawn(async move {
506 let metrics = streaming_metrics.new_actor_metrics(actor_id);
507 loop {
508 let task_metrics = monitor.cumulative();
509 metrics
510 .actor_execution_time
511 .set(task_metrics.total_poll_duration.as_secs_f64());
512 metrics
513 .actor_fast_poll_duration
514 .set(task_metrics.total_fast_poll_duration.as_secs_f64());
515 metrics
516 .actor_fast_poll_cnt
517 .set(task_metrics.total_fast_poll_count as i64);
518 metrics
519 .actor_slow_poll_duration
520 .set(task_metrics.total_slow_poll_duration.as_secs_f64());
521 metrics
522 .actor_slow_poll_cnt
523 .set(task_metrics.total_slow_poll_count as i64);
524 metrics
525 .actor_poll_duration
526 .set(task_metrics.total_poll_duration.as_secs_f64());
527 metrics
528 .actor_poll_cnt
529 .set(task_metrics.total_poll_count as i64);
530 metrics
531 .actor_idle_duration
532 .set(task_metrics.total_idle_duration.as_secs_f64());
533 metrics
534 .actor_idle_cnt
535 .set(task_metrics.total_idled_count as i64);
536 metrics
537 .actor_scheduled_duration
538 .set(task_metrics.total_scheduled_duration.as_secs_f64());
539 metrics
540 .actor_scheduled_cnt
541 .set(task_metrics.total_scheduled_count as i64);
542 tokio::time::sleep(Duration::from_secs(1)).await;
543 }
544 });
545 Some(actor_monitor_task)
546 } else {
547 None
548 };
549 (handle, monitor_handle)
550 }
551 }
552}
553
554pub struct ExecutorParams {
558 pub env: StreamEnvironment,
559
560 pub info: ExecutorInfo,
562
563 pub executor_id: u64,
565
566 pub operator_id: u64,
568
569 pub op_info: String,
572
573 pub input: Vec<Executor>,
575
576 pub fragment_id: FragmentId,
578
579 pub executor_stats: Arc<StreamingMetrics>,
581
582 pub actor_context: ActorContextRef,
584
585 pub vnode_bitmap: Option<Bitmap>,
587
588 pub eval_error_report: ActorEvalErrorReport,
590
591 pub watermark_epoch: AtomicU64Ref,
593
594 pub local_barrier_manager: LocalBarrierManager,
595}
596
597impl Debug for ExecutorParams {
598 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
599 f.debug_struct("ExecutorParams")
600 .field("info", &self.info)
601 .field("executor_id", &self.executor_id)
602 .field("operator_id", &self.operator_id)
603 .field("op_info", &self.op_info)
604 .field("input", &self.input.len())
605 .field("actor_id", &self.actor_context.id)
606 .finish_non_exhaustive()
607 }
608}