1use core::time::Duration;
16use std::collections::{HashMap, HashSet};
17use std::fmt::Debug;
18use std::sync::Arc;
19
20use async_recursion::async_recursion;
21use futures::{FutureExt, TryFutureExt};
22use itertools::Itertools;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{ColumnId, Field, Schema, TableId};
25use risingwave_common::config::MetricLevel;
26use risingwave_common::must_match;
27use risingwave_common::operator::{unique_executor_id, unique_operator_id};
28use risingwave_common::util::runtime::BackgroundShutdownRuntime;
29use risingwave_pb::plan_common::StorageTableDesc;
30use risingwave_pb::stream_plan::stream_node::NodeBody;
31use risingwave_pb::stream_plan::{self, StreamNode, StreamScanNode, StreamScanType};
32use risingwave_pb::stream_service::inject_barrier_request::BuildActorInfo;
33use risingwave_storage::monitor::HummockTraceFutureExt;
34use risingwave_storage::table::batch_table::BatchTable;
35use risingwave_storage::{StateStore, dispatch_state_store};
36use thiserror_ext::AsReport;
37use tokio::sync::mpsc::UnboundedReceiver;
38use tokio::task::JoinHandle;
39
40use crate::common::table::state_table::StateTableBuilder;
41use crate::error::StreamResult;
42use crate::executor::monitor::StreamingMetrics;
43use crate::executor::subtask::SubtaskHandle;
44use crate::executor::{
45 Actor, ActorContext, ActorContextRef, DispatchExecutor, Execute, Executor, ExecutorInfo,
46 MergeExecutorInput, SnapshotBackfillExecutor, TroublemakerExecutor, WrapperExecutor,
47};
48use crate::from_proto::{MergeExecutorBuilder, create_executor};
49use crate::task::{
50 ActorEvalErrorReport, ActorId, AtomicU64Ref, FragmentId, LocalBarrierManager, NewOutputRequest,
51 StreamEnvironment, await_tree_key,
52};
53
54pub(crate) struct StreamActorManager {
58 pub(super) env: StreamEnvironment,
59 pub(super) streaming_metrics: Arc<StreamingMetrics>,
60
61 pub(super) watermark_epoch: AtomicU64Ref,
63
64 pub(super) await_tree_reg: Option<await_tree::Registry>,
66
67 pub(super) runtime: BackgroundShutdownRuntime,
69}
70
71impl StreamActorManager {
72 fn get_executor_id(actor_context: &ActorContext, node: &StreamNode) -> u64 {
73 unique_executor_id(actor_context.id, node.operator_id)
76 }
77
78 fn get_executor_info(node: &StreamNode, executor_id: u64) -> ExecutorInfo {
79 let schema: Schema = node.fields.iter().map(Field::from).collect();
80
81 let pk_indices = node
82 .get_stream_key()
83 .iter()
84 .map(|idx| *idx as usize)
85 .collect::<Vec<_>>();
86
87 let stream_kind = node.stream_kind();
88
89 let identity = format!("{} {:X}", node.get_node_body().unwrap(), executor_id);
90
91 ExecutorInfo {
92 schema,
93 pk_indices,
94 stream_kind,
95 identity,
96 id: executor_id,
97 }
98 }
99
100 async fn create_snapshot_backfill_input(
101 &self,
102 upstream_node: &StreamNode,
103 actor_context: &ActorContextRef,
104 local_barrier_manager: &LocalBarrierManager,
105 chunk_size: usize,
106 ) -> StreamResult<MergeExecutorInput> {
107 let info = Self::get_executor_info(
108 upstream_node,
109 Self::get_executor_id(actor_context, upstream_node),
110 );
111
112 let upstream_merge = must_match!(upstream_node.get_node_body().unwrap(), NodeBody::Merge(upstream_merge) => {
113 upstream_merge
114 });
115
116 MergeExecutorBuilder::new_input(
117 local_barrier_manager.clone(),
118 self.streaming_metrics.clone(),
119 actor_context.clone(),
120 info,
121 upstream_merge,
122 chunk_size,
123 )
124 .await
125 }
126
127 #[expect(clippy::too_many_arguments)]
128 async fn create_snapshot_backfill_node(
129 &self,
130 stream_node: &StreamNode,
131 node: &StreamScanNode,
132 actor_context: &ActorContextRef,
133 vnode_bitmap: Option<Bitmap>,
134 env: StreamEnvironment,
135 local_barrier_manager: &LocalBarrierManager,
136 state_store: impl StateStore,
137 ) -> StreamResult<Executor> {
138 let [upstream_node, _]: &[_; 2] = stream_node.input.as_slice().try_into().unwrap();
139 let chunk_size = env.config().developer.chunk_size;
140 let upstream = self
141 .create_snapshot_backfill_input(
142 upstream_node,
143 actor_context,
144 local_barrier_manager,
145 chunk_size,
146 )
147 .await?;
148
149 let table_desc: &StorageTableDesc = node.get_table_desc()?;
150
151 let output_indices = node
152 .output_indices
153 .iter()
154 .map(|&i| i as usize)
155 .collect_vec();
156
157 let column_ids = node
158 .upstream_column_ids
159 .iter()
160 .map(ColumnId::from)
161 .collect_vec();
162
163 let progress = local_barrier_manager.register_create_mview_progress(actor_context.id);
164
165 let vnodes = vnode_bitmap.map(Arc::new);
166 let barrier_rx = local_barrier_manager.subscribe_barrier(actor_context.id);
167
168 let upstream_table =
169 BatchTable::new_partial(state_store.clone(), column_ids, vnodes.clone(), table_desc);
170
171 let state_table = node.get_state_table()?;
172 let state_table = StateTableBuilder::new(state_table, state_store.clone(), vnodes)
173 .enable_preload_all_rows_by_config(&actor_context.streaming_config)
174 .build()
175 .await;
176
177 let executor = SnapshotBackfillExecutor::new(
178 upstream_table,
179 state_table,
180 upstream,
181 output_indices,
182 actor_context.clone(),
183 progress,
184 chunk_size,
185 node.rate_limit.into(),
186 barrier_rx,
187 self.streaming_metrics.clone(),
188 node.snapshot_backfill_epoch,
189 )
190 .boxed();
191
192 let info = Self::get_executor_info(
193 stream_node,
194 Self::get_executor_id(actor_context, stream_node),
195 );
196
197 if crate::consistency::insane() {
198 let mut troubled_info = info.clone();
199 troubled_info.identity = format!("{} (troubled)", info.identity);
200 Ok((
201 info,
202 TroublemakerExecutor::new((troubled_info, executor).into(), chunk_size),
203 )
204 .into())
205 } else {
206 Ok((info, executor).into())
207 }
208 }
209
210 #[expect(clippy::too_many_arguments)]
212 #[async_recursion]
213 async fn create_nodes_inner(
214 &self,
215 fragment_id: FragmentId,
216 node: &stream_plan::StreamNode,
217 env: StreamEnvironment,
218 store: impl StateStore,
219 actor_context: &ActorContextRef,
220 vnode_bitmap: Option<Bitmap>,
221 has_stateful: bool,
222 subtasks: &mut Vec<SubtaskHandle>,
223 local_barrier_manager: &LocalBarrierManager,
224 ) -> StreamResult<Executor> {
225 if let NodeBody::StreamScan(stream_scan) = node.get_node_body().unwrap()
226 && let Ok(StreamScanType::SnapshotBackfill) = stream_scan.get_stream_scan_type()
227 {
228 return dispatch_state_store!(env.state_store(), store, {
229 self.create_snapshot_backfill_node(
230 node,
231 stream_scan,
232 actor_context,
233 vnode_bitmap,
234 env,
235 local_barrier_manager,
236 store,
237 )
238 .await
239 });
240 }
241
242 fn is_stateful_executor(stream_node: &StreamNode) -> bool {
245 matches!(
246 stream_node.get_node_body().unwrap(),
247 NodeBody::HashAgg(_)
248 | NodeBody::HashJoin(_)
249 | NodeBody::DeltaIndexJoin(_)
250 | NodeBody::Lookup(_)
251 | NodeBody::StreamScan(_)
252 | NodeBody::StreamCdcScan(_)
253 | NodeBody::DynamicFilter(_)
254 | NodeBody::GroupTopN(_)
255 | NodeBody::Now(_)
256 )
257 }
258 let is_stateful = is_stateful_executor(node);
259
260 let mut input = Vec::with_capacity(node.input.iter().len());
262 for input_stream_node in &node.input {
263 input.push(
264 self.create_nodes_inner(
265 fragment_id,
266 input_stream_node,
267 env.clone(),
268 store.clone(),
269 actor_context,
270 vnode_bitmap.clone(),
271 has_stateful || is_stateful,
272 subtasks,
273 local_barrier_manager,
274 )
275 .await?,
276 );
277 }
278
279 self.generate_executor_from_inputs(
280 fragment_id,
281 node,
282 env,
283 store,
284 actor_context,
285 vnode_bitmap,
286 has_stateful || is_stateful,
287 subtasks,
288 local_barrier_manager,
289 input,
290 )
291 .await
292 }
293
294 #[expect(clippy::too_many_arguments)]
295 async fn generate_executor_from_inputs(
296 &self,
297 fragment_id: FragmentId,
298 node: &stream_plan::StreamNode,
299 env: StreamEnvironment,
300 store: impl StateStore,
301 actor_context: &ActorContextRef,
302 vnode_bitmap: Option<Bitmap>,
303 has_stateful: bool,
304 subtasks: &mut Vec<SubtaskHandle>,
305 local_barrier_manager: &LocalBarrierManager,
306 input: Vec<Executor>,
307 ) -> StreamResult<Executor> {
308 let op_info = node.get_identity().clone();
309
310 let executor_id = Self::get_executor_id(actor_context, node);
313 let operator_id = unique_operator_id(fragment_id, node.operator_id);
314
315 let info = Self::get_executor_info(node, executor_id);
316
317 let eval_error_report = ActorEvalErrorReport {
318 actor_context: actor_context.clone(),
319 identity: info.identity.clone().into(),
320 };
321
322 let executor_params = ExecutorParams {
324 env: env.clone(),
325
326 info: info.clone(),
327 executor_id,
328 operator_id,
329 op_info,
330 input,
331 fragment_id,
332 executor_stats: self.streaming_metrics.clone(),
333 actor_context: actor_context.clone(),
334 vnode_bitmap,
335 eval_error_report,
336 watermark_epoch: self.watermark_epoch.clone(),
337 local_barrier_manager: local_barrier_manager.clone(),
338 };
339
340 let executor = create_executor(executor_params, node, store).await?;
341
342 let wrapped = WrapperExecutor::new(
344 executor,
345 actor_context.clone(),
346 env.config().developer.enable_executor_row_count,
347 env.config().developer.enable_explain_analyze_stats,
348 );
349 let executor = (info, wrapped).into();
350
351 let executor = if has_stateful {
353 let _ = subtasks;
359 executor
360 } else {
361 executor
362 };
363
364 Ok(executor)
365 }
366
367 async fn create_nodes(
369 &self,
370 fragment_id: FragmentId,
371 node: &stream_plan::StreamNode,
372 env: StreamEnvironment,
373 actor_context: &ActorContextRef,
374 vnode_bitmap: Option<Bitmap>,
375 local_barrier_manager: &LocalBarrierManager,
376 ) -> StreamResult<(Executor, Vec<SubtaskHandle>)> {
377 let mut subtasks = vec![];
378
379 let executor = dispatch_state_store!(env.state_store(), store, {
380 self.create_nodes_inner(
381 fragment_id,
382 node,
383 env,
384 store,
385 actor_context,
386 vnode_bitmap,
387 false,
388 &mut subtasks,
389 local_barrier_manager,
390 )
391 .await
392 })?;
393
394 Ok((executor, subtasks))
395 }
396
397 async fn create_actor(
398 self: Arc<Self>,
399 actor: BuildActorInfo,
400 fragment_id: FragmentId,
401 node: Arc<StreamNode>,
402 related_subscriptions: Arc<HashMap<TableId, HashSet<u32>>>,
403 local_barrier_manager: LocalBarrierManager,
404 new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
405 ) -> StreamResult<Actor<DispatchExecutor>> {
406 {
407 let actor_id = actor.actor_id;
408 let streaming_config = self.env.config().clone();
409 let actor_context = ActorContext::create(
410 &actor,
411 fragment_id,
412 self.env.total_mem_usage(),
413 self.streaming_metrics.clone(),
414 related_subscriptions,
415 self.env.meta_client().clone(),
416 streaming_config,
417 );
418 let vnode_bitmap = actor.vnode_bitmap.as_ref().map(|b| b.into());
419 let expr_context = actor.expr_context.clone().unwrap();
420
421 let (executor, subtasks) = self
422 .create_nodes(
423 fragment_id,
424 &node,
425 self.env.clone(),
426 &actor_context,
427 vnode_bitmap,
428 &local_barrier_manager,
429 )
430 .await?;
431
432 let dispatcher = DispatchExecutor::new(
433 executor,
434 new_output_request_rx,
435 actor.dispatchers,
436 actor_id,
437 fragment_id,
438 local_barrier_manager.clone(),
439 self.streaming_metrics.clone(),
440 )
441 .await?;
442 let actor = Actor::new(
443 dispatcher,
444 subtasks,
445 self.streaming_metrics.clone(),
446 actor_context.clone(),
447 expr_context,
448 local_barrier_manager,
449 );
450 Ok(actor)
451 }
452 }
453
454 pub(super) fn spawn_actor(
455 self: &Arc<Self>,
456 actor: BuildActorInfo,
457 fragment_id: FragmentId,
458 node: Arc<StreamNode>,
459 related_subscriptions: Arc<HashMap<TableId, HashSet<u32>>>,
460 local_barrier_manager: LocalBarrierManager,
461 new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
462 ) -> (JoinHandle<()>, Option<JoinHandle<()>>) {
463 {
464 let monitor = tokio_metrics::TaskMonitor::new();
465 let stream_actor_ref = &actor;
466 let actor_id = stream_actor_ref.actor_id;
467 let handle = {
468 let trace_span =
469 format!("Actor {actor_id}: `{}`", stream_actor_ref.mview_definition);
470 let barrier_manager = local_barrier_manager.clone();
471 let actor = self
473 .clone()
474 .create_actor(
475 actor,
476 fragment_id,
477 node,
478 related_subscriptions,
479 barrier_manager.clone(),
480 new_output_request_rx
481 ).boxed().and_then(|actor| actor.run()).map(move |result| {
482 if let Err(err) = result {
483 tracing::error!(actor_id, error = ?err.as_report(), "actor exit with error");
486 barrier_manager.notify_failure(actor_id, err);
487 }
488 });
489 let traced = match &self.await_tree_reg {
490 Some(m) => m
491 .register(await_tree_key::Actor(actor_id), trace_span)
492 .instrument(actor)
493 .left_future(),
494 None => actor.right_future(),
495 };
496 let instrumented = monitor.instrument(traced);
497 let with_config = crate::CONFIG.scope(self.env.config().clone(), instrumented);
498 let may_track_hummock = with_config.may_trace_hummock();
500
501 self.runtime.spawn(may_track_hummock)
502 };
503
504 let monitor_handle = if self.streaming_metrics.level >= MetricLevel::Debug
505 || self.env.config().developer.enable_actor_tokio_metrics
506 {
507 tracing::info!("Tokio metrics are enabled.");
508 let streaming_metrics = self.streaming_metrics.clone();
509 let actor_monitor_task = self.runtime.spawn(async move {
510 let metrics = streaming_metrics.new_actor_metrics(actor_id);
511 loop {
512 let task_metrics = monitor.cumulative();
513 metrics
514 .actor_execution_time
515 .set(task_metrics.total_poll_duration.as_secs_f64());
516 metrics
517 .actor_fast_poll_duration
518 .set(task_metrics.total_fast_poll_duration.as_secs_f64());
519 metrics
520 .actor_fast_poll_cnt
521 .set(task_metrics.total_fast_poll_count as i64);
522 metrics
523 .actor_slow_poll_duration
524 .set(task_metrics.total_slow_poll_duration.as_secs_f64());
525 metrics
526 .actor_slow_poll_cnt
527 .set(task_metrics.total_slow_poll_count as i64);
528 metrics
529 .actor_poll_duration
530 .set(task_metrics.total_poll_duration.as_secs_f64());
531 metrics
532 .actor_poll_cnt
533 .set(task_metrics.total_poll_count as i64);
534 metrics
535 .actor_idle_duration
536 .set(task_metrics.total_idle_duration.as_secs_f64());
537 metrics
538 .actor_idle_cnt
539 .set(task_metrics.total_idled_count as i64);
540 metrics
541 .actor_scheduled_duration
542 .set(task_metrics.total_scheduled_duration.as_secs_f64());
543 metrics
544 .actor_scheduled_cnt
545 .set(task_metrics.total_scheduled_count as i64);
546 tokio::time::sleep(Duration::from_secs(1)).await;
547 }
548 });
549 Some(actor_monitor_task)
550 } else {
551 None
552 };
553 (handle, monitor_handle)
554 }
555 }
556}
557
558pub struct ExecutorParams {
562 pub env: StreamEnvironment,
563
564 pub info: ExecutorInfo,
566
567 pub executor_id: u64,
569
570 pub operator_id: u64,
572
573 pub op_info: String,
576
577 pub input: Vec<Executor>,
579
580 pub fragment_id: FragmentId,
582
583 pub executor_stats: Arc<StreamingMetrics>,
585
586 pub actor_context: ActorContextRef,
588
589 pub vnode_bitmap: Option<Bitmap>,
591
592 pub eval_error_report: ActorEvalErrorReport,
594
595 pub watermark_epoch: AtomicU64Ref,
597
598 pub local_barrier_manager: LocalBarrierManager,
599}
600
601impl Debug for ExecutorParams {
602 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
603 f.debug_struct("ExecutorParams")
604 .field("info", &self.info)
605 .field("executor_id", &self.executor_id)
606 .field("operator_id", &self.operator_id)
607 .field("op_info", &self.op_info)
608 .field("input", &self.input.len())
609 .field("actor_id", &self.actor_context.id)
610 .finish_non_exhaustive()
611 }
612}