1use core::time::Duration;
16use std::collections::{HashMap, HashSet};
17use std::fmt::Debug;
18use std::sync::Arc;
19
20use async_recursion::async_recursion;
21use futures::{FutureExt, TryFutureExt};
22use itertools::Itertools;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{ColumnId, Field, Schema, TableId};
25use risingwave_common::config::MetricLevel;
26use risingwave_common::must_match;
27use risingwave_common::operator::{unique_executor_id, unique_operator_id};
28use risingwave_common::util::runtime::BackgroundShutdownRuntime;
29use risingwave_pb::plan_common::StorageTableDesc;
30use risingwave_pb::stream_plan::stream_node::NodeBody;
31use risingwave_pb::stream_plan::{self, StreamNode, StreamScanNode, StreamScanType};
32use risingwave_pb::stream_service::inject_barrier_request::BuildActorInfo;
33use risingwave_storage::monitor::HummockTraceFutureExt;
34use risingwave_storage::table::batch_table::BatchTable;
35use risingwave_storage::{StateStore, dispatch_state_store};
36use thiserror_ext::AsReport;
37use tokio::sync::mpsc::UnboundedReceiver;
38use tokio::task::JoinHandle;
39
40use crate::common::table::state_table::StateTableBuilder;
41use crate::error::StreamResult;
42use crate::executor::monitor::StreamingMetrics;
43use crate::executor::subtask::SubtaskHandle;
44use crate::executor::{
45 Actor, ActorContext, ActorContextRef, DispatchExecutor, Execute, Executor, ExecutorInfo,
46 MergeExecutorInput, SnapshotBackfillExecutor, TroublemakerExecutor, WrapperExecutor,
47};
48use crate::from_proto::{MergeExecutorBuilder, create_executor};
49use crate::task::{
50 ActorEvalErrorReport, ActorId, AtomicU64Ref, FragmentId, LocalBarrierManager, NewOutputRequest,
51 StreamEnvironment, await_tree_key,
52};
53
54pub(crate) struct StreamActorManager {
58 pub(super) env: StreamEnvironment,
59 pub(super) streaming_metrics: Arc<StreamingMetrics>,
60
61 pub(super) watermark_epoch: AtomicU64Ref,
63
64 pub(super) await_tree_reg: Option<await_tree::Registry>,
66
67 pub(super) runtime: BackgroundShutdownRuntime,
69}
70
71impl StreamActorManager {
72 fn get_executor_id(actor_context: &ActorContext, node: &StreamNode) -> u64 {
73 unique_executor_id(actor_context.id, node.operator_id)
76 }
77
78 fn get_executor_info(node: &StreamNode, executor_id: u64) -> ExecutorInfo {
79 let schema: Schema = node.fields.iter().map(Field::from).collect();
80
81 let pk_indices = node
82 .get_stream_key()
83 .iter()
84 .map(|idx| *idx as usize)
85 .collect::<Vec<_>>();
86
87 let stream_kind = node.stream_kind();
88
89 let identity = format!("{} {:X}", node.get_node_body().unwrap(), executor_id);
90
91 ExecutorInfo {
92 schema,
93 pk_indices,
94 stream_kind,
95 identity,
96 id: executor_id,
97 }
98 }
99
100 async fn create_snapshot_backfill_input(
101 &self,
102 upstream_node: &StreamNode,
103 actor_context: &ActorContextRef,
104 local_barrier_manager: &LocalBarrierManager,
105 chunk_size: usize,
106 ) -> StreamResult<MergeExecutorInput> {
107 let info = Self::get_executor_info(
108 upstream_node,
109 Self::get_executor_id(actor_context, upstream_node),
110 );
111
112 let upstream_merge = must_match!(upstream_node.get_node_body().unwrap(), NodeBody::Merge(upstream_merge) => {
113 upstream_merge
114 });
115
116 MergeExecutorBuilder::new_input(
117 local_barrier_manager.clone(),
118 self.streaming_metrics.clone(),
119 actor_context.clone(),
120 info,
121 upstream_merge,
122 chunk_size,
123 )
124 .await
125 }
126
127 #[expect(clippy::too_many_arguments)]
128 async fn create_snapshot_backfill_node(
129 &self,
130 stream_node: &StreamNode,
131 node: &StreamScanNode,
132 actor_context: &ActorContextRef,
133 vnode_bitmap: Option<Bitmap>,
134 env: StreamEnvironment,
135 local_barrier_manager: &LocalBarrierManager,
136 state_store: impl StateStore,
137 ) -> StreamResult<Executor> {
138 let [upstream_node, _]: &[_; 2] = stream_node.input.as_slice().try_into().unwrap();
139 let chunk_size = env.config().developer.chunk_size;
140 let upstream = self
141 .create_snapshot_backfill_input(
142 upstream_node,
143 actor_context,
144 local_barrier_manager,
145 chunk_size,
146 )
147 .await?;
148
149 let table_desc: &StorageTableDesc = node.get_table_desc()?;
150
151 let output_indices = node
152 .output_indices
153 .iter()
154 .map(|&i| i as usize)
155 .collect_vec();
156
157 let column_ids = node
158 .upstream_column_ids
159 .iter()
160 .map(ColumnId::from)
161 .collect_vec();
162
163 let progress = local_barrier_manager.register_create_mview_progress(actor_context.id);
164
165 let vnodes = vnode_bitmap.map(Arc::new);
166 let barrier_rx = local_barrier_manager.subscribe_barrier(actor_context.id);
167
168 let upstream_table =
169 BatchTable::new_partial(state_store.clone(), column_ids, vnodes.clone(), table_desc);
170
171 let state_table = node.get_state_table()?;
172 let state_table = StateTableBuilder::new(state_table, state_store.clone(), vnodes)
173 .enable_preload_all_rows_by_config(&actor_context.streaming_config)
174 .build()
175 .await;
176
177 let executor = SnapshotBackfillExecutor::new(
178 upstream_table,
179 state_table,
180 upstream,
181 output_indices,
182 actor_context.clone(),
183 progress,
184 chunk_size,
185 node.rate_limit.into(),
186 barrier_rx,
187 self.streaming_metrics.clone(),
188 node.snapshot_backfill_epoch,
189 )
190 .boxed();
191
192 let info = Self::get_executor_info(
193 stream_node,
194 Self::get_executor_id(actor_context, stream_node),
195 );
196
197 if crate::consistency::insane() {
198 let mut troubled_info = info.clone();
199 troubled_info.identity = format!("{} (troubled)", info.identity);
200 Ok((
201 info,
202 TroublemakerExecutor::new((troubled_info, executor).into(), chunk_size),
203 )
204 .into())
205 } else {
206 Ok((info, executor).into())
207 }
208 }
209
210 #[expect(clippy::too_many_arguments)]
212 #[async_recursion]
213 async fn create_nodes_inner(
214 &self,
215 fragment_id: FragmentId,
216 node: &stream_plan::StreamNode,
217 env: StreamEnvironment,
218 store: impl StateStore,
219 actor_context: &ActorContextRef,
220 vnode_bitmap: Option<Bitmap>,
221 has_stateful: bool,
222 subtasks: &mut Vec<SubtaskHandle>,
223 local_barrier_manager: &LocalBarrierManager,
224 ) -> StreamResult<Executor> {
225 if let NodeBody::StreamScan(stream_scan) = node.get_node_body().unwrap()
226 && let Ok(StreamScanType::SnapshotBackfill) = stream_scan.get_stream_scan_type()
227 {
228 return dispatch_state_store!(env.state_store(), store, {
229 self.create_snapshot_backfill_node(
230 node,
231 stream_scan,
232 actor_context,
233 vnode_bitmap,
234 env,
235 local_barrier_manager,
236 store,
237 )
238 .await
239 });
240 }
241
242 fn is_stateful_executor(stream_node: &StreamNode) -> bool {
245 matches!(
246 stream_node.get_node_body().unwrap(),
247 NodeBody::HashAgg(_)
248 | NodeBody::HashJoin(_)
249 | NodeBody::DeltaIndexJoin(_)
250 | NodeBody::Lookup(_)
251 | NodeBody::StreamScan(_)
252 | NodeBody::StreamCdcScan(_)
253 | NodeBody::DynamicFilter(_)
254 | NodeBody::GroupTopN(_)
255 | NodeBody::Now(_)
256 )
257 }
258 let is_stateful = is_stateful_executor(node);
259
260 let mut input = Vec::with_capacity(node.input.iter().len());
262 for input_stream_node in &node.input {
263 input.push(
264 self.create_nodes_inner(
265 fragment_id,
266 input_stream_node,
267 env.clone(),
268 store.clone(),
269 actor_context,
270 vnode_bitmap.clone(),
271 has_stateful || is_stateful,
272 subtasks,
273 local_barrier_manager,
274 )
275 .await?,
276 );
277 }
278
279 self.generate_executor_from_inputs(
280 fragment_id,
281 node,
282 env,
283 store,
284 actor_context,
285 vnode_bitmap,
286 has_stateful || is_stateful,
287 subtasks,
288 local_barrier_manager,
289 input,
290 )
291 .await
292 }
293
294 #[expect(clippy::too_many_arguments)]
295 async fn generate_executor_from_inputs(
296 &self,
297 fragment_id: FragmentId,
298 node: &stream_plan::StreamNode,
299 env: StreamEnvironment,
300 store: impl StateStore,
301 actor_context: &ActorContextRef,
302 vnode_bitmap: Option<Bitmap>,
303 has_stateful: bool,
304 subtasks: &mut Vec<SubtaskHandle>,
305 local_barrier_manager: &LocalBarrierManager,
306 input: Vec<Executor>,
307 ) -> StreamResult<Executor> {
308 let op_info = node.get_identity().clone();
309
310 let executor_id = Self::get_executor_id(actor_context, node);
313 let operator_id = unique_operator_id(fragment_id, node.operator_id);
314
315 let info = Self::get_executor_info(node, executor_id);
316
317 let eval_error_report = ActorEvalErrorReport {
318 actor_context: actor_context.clone(),
319 identity: info.identity.clone().into(),
320 };
321
322 let executor_params = ExecutorParams {
324 env: env.clone(),
325
326 info: info.clone(),
327 executor_id,
328 operator_id,
329 op_info,
330 input,
331 fragment_id,
332 executor_stats: self.streaming_metrics.clone(),
333 actor_context: actor_context.clone(),
334 vnode_bitmap,
335 eval_error_report,
336 watermark_epoch: self.watermark_epoch.clone(),
337 local_barrier_manager: local_barrier_manager.clone(),
338 };
339
340 let executor = create_executor(executor_params, node, store).await?;
341
342 let wrapped = WrapperExecutor::new(
344 executor,
345 actor_context.clone(),
346 env.config().developer.enable_executor_row_count,
347 env.config().developer.enable_explain_analyze_stats,
348 );
349 let executor = (info, wrapped).into();
350
351 let executor = if has_stateful {
353 let _ = subtasks;
359 executor
360 } else {
361 executor
362 };
363
364 Ok(executor)
365 }
366
367 async fn create_nodes(
369 &self,
370 fragment_id: FragmentId,
371 node: &stream_plan::StreamNode,
372 env: StreamEnvironment,
373 actor_context: &ActorContextRef,
374 vnode_bitmap: Option<Bitmap>,
375 local_barrier_manager: &LocalBarrierManager,
376 ) -> StreamResult<(Executor, Vec<SubtaskHandle>)> {
377 let mut subtasks = vec![];
378
379 let executor = dispatch_state_store!(env.state_store(), store, {
380 self.create_nodes_inner(
381 fragment_id,
382 node,
383 env,
384 store,
385 actor_context,
386 vnode_bitmap,
387 false,
388 &mut subtasks,
389 local_barrier_manager,
390 )
391 .await
392 })?;
393
394 Ok((executor, subtasks))
395 }
396
397 async fn create_actor(
398 self: Arc<Self>,
399 actor: BuildActorInfo,
400 fragment_id: FragmentId,
401 node: Arc<StreamNode>,
402 related_subscriptions: Arc<HashMap<TableId, HashSet<u32>>>,
403 local_barrier_manager: LocalBarrierManager,
404 new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
405 ) -> StreamResult<Actor<DispatchExecutor>> {
406 {
407 let actor_id = actor.actor_id;
408 let streaming_config = self.env.config().clone();
409 let actor_context = ActorContext::create(
410 &actor,
411 fragment_id,
412 self.env.total_mem_usage(),
413 self.streaming_metrics.clone(),
414 related_subscriptions,
415 self.env.meta_client().clone(),
416 streaming_config,
417 self.env.clone(),
418 );
419 let vnode_bitmap = actor.vnode_bitmap.as_ref().map(|b| b.into());
420 let expr_context = actor.expr_context.clone().unwrap();
421
422 let (executor, subtasks) = self
423 .create_nodes(
424 fragment_id,
425 &node,
426 self.env.clone(),
427 &actor_context,
428 vnode_bitmap,
429 &local_barrier_manager,
430 )
431 .await?;
432
433 let dispatcher = DispatchExecutor::new(
434 executor,
435 new_output_request_rx,
436 actor.dispatchers,
437 actor_id,
438 fragment_id,
439 local_barrier_manager.clone(),
440 self.streaming_metrics.clone(),
441 )
442 .await?;
443 let actor = Actor::new(
444 dispatcher,
445 subtasks,
446 self.streaming_metrics.clone(),
447 actor_context.clone(),
448 expr_context,
449 local_barrier_manager,
450 );
451 Ok(actor)
452 }
453 }
454
455 pub(super) fn spawn_actor(
456 self: &Arc<Self>,
457 actor: BuildActorInfo,
458 fragment_id: FragmentId,
459 node: Arc<StreamNode>,
460 related_subscriptions: Arc<HashMap<TableId, HashSet<u32>>>,
461 local_barrier_manager: LocalBarrierManager,
462 new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
463 ) -> (JoinHandle<()>, Option<JoinHandle<()>>) {
464 {
465 let monitor = tokio_metrics::TaskMonitor::new();
466 let stream_actor_ref = &actor;
467 let actor_id = stream_actor_ref.actor_id;
468 let handle = {
469 let trace_span =
470 format!("Actor {actor_id}: `{}`", stream_actor_ref.mview_definition);
471 let barrier_manager = local_barrier_manager.clone();
472 let actor = self
474 .clone()
475 .create_actor(
476 actor,
477 fragment_id,
478 node,
479 related_subscriptions,
480 barrier_manager.clone(),
481 new_output_request_rx
482 ).boxed().and_then(|actor| actor.run()).map(move |result| {
483 if let Err(err) = result {
484 tracing::error!(actor_id, error = ?err.as_report(), "actor exit with error");
487 barrier_manager.notify_failure(actor_id, err);
488 }
489 });
490 let traced = match &self.await_tree_reg {
491 Some(m) => m
492 .register(await_tree_key::Actor(actor_id), trace_span)
493 .instrument(actor)
494 .left_future(),
495 None => actor.right_future(),
496 };
497 let instrumented = monitor.instrument(traced);
498 let with_config = crate::CONFIG.scope(self.env.config().clone(), instrumented);
499 let may_track_hummock = with_config.may_trace_hummock();
501
502 self.runtime.spawn(may_track_hummock)
503 };
504
505 let monitor_handle = if self.streaming_metrics.level >= MetricLevel::Debug
506 || self.env.config().developer.enable_actor_tokio_metrics
507 {
508 tracing::info!("Tokio metrics are enabled.");
509 let streaming_metrics = self.streaming_metrics.clone();
510 let actor_monitor_task = self.runtime.spawn(async move {
511 let metrics = streaming_metrics.new_actor_metrics(actor_id);
512 loop {
513 let task_metrics = monitor.cumulative();
514 metrics
515 .actor_execution_time
516 .set(task_metrics.total_poll_duration.as_secs_f64());
517 metrics
518 .actor_fast_poll_duration
519 .set(task_metrics.total_fast_poll_duration.as_secs_f64());
520 metrics
521 .actor_fast_poll_cnt
522 .set(task_metrics.total_fast_poll_count as i64);
523 metrics
524 .actor_slow_poll_duration
525 .set(task_metrics.total_slow_poll_duration.as_secs_f64());
526 metrics
527 .actor_slow_poll_cnt
528 .set(task_metrics.total_slow_poll_count as i64);
529 metrics
530 .actor_poll_duration
531 .set(task_metrics.total_poll_duration.as_secs_f64());
532 metrics
533 .actor_poll_cnt
534 .set(task_metrics.total_poll_count as i64);
535 metrics
536 .actor_idle_duration
537 .set(task_metrics.total_idle_duration.as_secs_f64());
538 metrics
539 .actor_idle_cnt
540 .set(task_metrics.total_idled_count as i64);
541 metrics
542 .actor_scheduled_duration
543 .set(task_metrics.total_scheduled_duration.as_secs_f64());
544 metrics
545 .actor_scheduled_cnt
546 .set(task_metrics.total_scheduled_count as i64);
547 tokio::time::sleep(Duration::from_secs(1)).await;
548 }
549 });
550 Some(actor_monitor_task)
551 } else {
552 None
553 };
554 (handle, monitor_handle)
555 }
556 }
557}
558
559pub struct ExecutorParams {
563 pub env: StreamEnvironment,
564
565 pub info: ExecutorInfo,
567
568 pub executor_id: u64,
570
571 pub operator_id: u64,
573
574 pub op_info: String,
577
578 pub input: Vec<Executor>,
580
581 pub fragment_id: FragmentId,
583
584 pub executor_stats: Arc<StreamingMetrics>,
586
587 pub actor_context: ActorContextRef,
589
590 pub vnode_bitmap: Option<Bitmap>,
592
593 pub eval_error_report: ActorEvalErrorReport,
595
596 pub watermark_epoch: AtomicU64Ref,
598
599 pub local_barrier_manager: LocalBarrierManager,
600}
601
602impl Debug for ExecutorParams {
603 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
604 f.debug_struct("ExecutorParams")
605 .field("info", &self.info)
606 .field("executor_id", &self.executor_id)
607 .field("operator_id", &self.operator_id)
608 .field("op_info", &self.op_info)
609 .field("input", &self.input.len())
610 .field("actor_id", &self.actor_context.id)
611 .finish_non_exhaustive()
612 }
613}