risingwave_stream/task/
actor_manager.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use core::time::Duration;
16use std::collections::{HashMap, HashSet};
17use std::fmt::Debug;
18use std::sync::Arc;
19
20use async_recursion::async_recursion;
21use futures::{FutureExt, TryFutureExt};
22use itertools::Itertools;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{ColumnId, Field, Schema, TableId};
25use risingwave_common::config::MetricLevel;
26use risingwave_common::must_match;
27use risingwave_common::operator::{unique_executor_id, unique_operator_id};
28use risingwave_common::util::runtime::BackgroundShutdownRuntime;
29use risingwave_pb::plan_common::StorageTableDesc;
30use risingwave_pb::stream_plan::stream_node::NodeBody;
31use risingwave_pb::stream_plan::{self, StreamNode, StreamScanNode, StreamScanType};
32use risingwave_pb::stream_service::inject_barrier_request::BuildActorInfo;
33use risingwave_storage::monitor::HummockTraceFutureExt;
34use risingwave_storage::table::batch_table::BatchTable;
35use risingwave_storage::{StateStore, dispatch_state_store};
36use thiserror_ext::AsReport;
37use tokio::sync::mpsc::UnboundedReceiver;
38use tokio::task::JoinHandle;
39
40use crate::common::table::state_table::StateTableBuilder;
41use crate::error::StreamResult;
42use crate::executor::monitor::StreamingMetrics;
43use crate::executor::subtask::SubtaskHandle;
44use crate::executor::{
45    Actor, ActorContext, ActorContextRef, DispatchExecutor, Execute, Executor, ExecutorInfo,
46    MergeExecutorInput, SnapshotBackfillExecutor, TroublemakerExecutor, WrapperExecutor,
47};
48use crate::from_proto::{MergeExecutorBuilder, create_executor};
49use crate::task::{
50    ActorEvalErrorReport, ActorId, AtomicU64Ref, FragmentId, LocalBarrierManager, NewOutputRequest,
51    StreamEnvironment, await_tree_key,
52};
53
54/// [Spawning actors](`Self::spawn_actor`), called by [`crate::task::barrier_worker::managed_state::DatabaseManagedBarrierState`].
55///
56/// See [`crate::task`] for architecture overview.
57pub(crate) struct StreamActorManager {
58    pub(super) env: StreamEnvironment,
59    pub(super) streaming_metrics: Arc<StreamingMetrics>,
60
61    /// Watermark epoch number.
62    pub(super) watermark_epoch: AtomicU64Ref,
63
64    /// Manages the await-trees of all actors.
65    pub(super) await_tree_reg: Option<await_tree::Registry>,
66
67    /// Runtime for the streaming actors.
68    pub(super) runtime: BackgroundShutdownRuntime,
69}
70
71impl StreamActorManager {
72    fn get_executor_id(actor_context: &ActorContext, node: &StreamNode) -> u64 {
73        // We assume that the operator_id of different instances from the same RelNode will be the
74        // same.
75        unique_executor_id(actor_context.id, node.operator_id)
76    }
77
78    fn get_executor_info(node: &StreamNode, executor_id: u64) -> ExecutorInfo {
79        let schema: Schema = node.fields.iter().map(Field::from).collect();
80
81        let pk_indices = node
82            .get_stream_key()
83            .iter()
84            .map(|idx| *idx as usize)
85            .collect::<Vec<_>>();
86
87        let stream_kind = node.stream_kind();
88
89        let identity = format!("{} {:X}", node.get_node_body().unwrap(), executor_id);
90
91        ExecutorInfo {
92            schema,
93            pk_indices,
94            stream_kind,
95            identity,
96            id: executor_id,
97        }
98    }
99
100    async fn create_snapshot_backfill_input(
101        &self,
102        upstream_node: &StreamNode,
103        actor_context: &ActorContextRef,
104        local_barrier_manager: &LocalBarrierManager,
105        chunk_size: usize,
106    ) -> StreamResult<MergeExecutorInput> {
107        let info = Self::get_executor_info(
108            upstream_node,
109            Self::get_executor_id(actor_context, upstream_node),
110        );
111
112        let upstream_merge = must_match!(upstream_node.get_node_body().unwrap(), NodeBody::Merge(upstream_merge) => {
113            upstream_merge
114        });
115
116        MergeExecutorBuilder::new_input(
117            local_barrier_manager.clone(),
118            self.streaming_metrics.clone(),
119            actor_context.clone(),
120            info,
121            upstream_merge,
122            chunk_size,
123        )
124        .await
125    }
126
127    #[expect(clippy::too_many_arguments)]
128    async fn create_snapshot_backfill_node(
129        &self,
130        stream_node: &StreamNode,
131        node: &StreamScanNode,
132        actor_context: &ActorContextRef,
133        vnode_bitmap: Option<Bitmap>,
134        env: StreamEnvironment,
135        local_barrier_manager: &LocalBarrierManager,
136        state_store: impl StateStore,
137    ) -> StreamResult<Executor> {
138        let [upstream_node, _]: &[_; 2] = stream_node.input.as_slice().try_into().unwrap();
139        let chunk_size = env.config().developer.chunk_size;
140        let upstream = self
141            .create_snapshot_backfill_input(
142                upstream_node,
143                actor_context,
144                local_barrier_manager,
145                chunk_size,
146            )
147            .await?;
148
149        let table_desc: &StorageTableDesc = node.get_table_desc()?;
150
151        let output_indices = node
152            .output_indices
153            .iter()
154            .map(|&i| i as usize)
155            .collect_vec();
156
157        let column_ids = node
158            .upstream_column_ids
159            .iter()
160            .map(ColumnId::from)
161            .collect_vec();
162
163        let progress = local_barrier_manager.register_create_mview_progress(actor_context.id);
164
165        let vnodes = vnode_bitmap.map(Arc::new);
166        let barrier_rx = local_barrier_manager.subscribe_barrier(actor_context.id);
167
168        let upstream_table =
169            BatchTable::new_partial(state_store.clone(), column_ids, vnodes.clone(), table_desc);
170
171        let state_table = node.get_state_table()?;
172        let state_table = StateTableBuilder::new(state_table, state_store.clone(), vnodes)
173            .enable_preload_all_rows_by_config(&actor_context.streaming_config)
174            .build()
175            .await;
176
177        let executor = SnapshotBackfillExecutor::new(
178            upstream_table,
179            state_table,
180            upstream,
181            output_indices,
182            actor_context.clone(),
183            progress,
184            chunk_size,
185            node.rate_limit.into(),
186            barrier_rx,
187            self.streaming_metrics.clone(),
188            node.snapshot_backfill_epoch,
189        )
190        .boxed();
191
192        let info = Self::get_executor_info(
193            stream_node,
194            Self::get_executor_id(actor_context, stream_node),
195        );
196
197        if crate::consistency::insane() {
198            let mut troubled_info = info.clone();
199            troubled_info.identity = format!("{} (troubled)", info.identity);
200            Ok((
201                info,
202                TroublemakerExecutor::new((troubled_info, executor).into(), chunk_size),
203            )
204                .into())
205        } else {
206            Ok((info, executor).into())
207        }
208    }
209
210    /// Create a chain(tree) of nodes, with given `store`.
211    #[expect(clippy::too_many_arguments)]
212    #[async_recursion]
213    async fn create_nodes_inner(
214        &self,
215        fragment_id: FragmentId,
216        node: &stream_plan::StreamNode,
217        env: StreamEnvironment,
218        store: impl StateStore,
219        actor_context: &ActorContextRef,
220        vnode_bitmap: Option<Bitmap>,
221        has_stateful: bool,
222        subtasks: &mut Vec<SubtaskHandle>,
223        local_barrier_manager: &LocalBarrierManager,
224    ) -> StreamResult<Executor> {
225        if let NodeBody::StreamScan(stream_scan) = node.get_node_body().unwrap()
226            && let Ok(StreamScanType::SnapshotBackfill) = stream_scan.get_stream_scan_type()
227        {
228            return dispatch_state_store!(env.state_store(), store, {
229                self.create_snapshot_backfill_node(
230                    node,
231                    stream_scan,
232                    actor_context,
233                    vnode_bitmap,
234                    env,
235                    local_barrier_manager,
236                    store,
237                )
238                .await
239            });
240        }
241
242        // The "stateful" here means that the executor may issue read operations to the state store
243        // massively and continuously. Used to decide whether to apply the optimization of subtasks.
244        fn is_stateful_executor(stream_node: &StreamNode) -> bool {
245            matches!(
246                stream_node.get_node_body().unwrap(),
247                NodeBody::HashAgg(_)
248                    | NodeBody::HashJoin(_)
249                    | NodeBody::DeltaIndexJoin(_)
250                    | NodeBody::Lookup(_)
251                    | NodeBody::StreamScan(_)
252                    | NodeBody::StreamCdcScan(_)
253                    | NodeBody::DynamicFilter(_)
254                    | NodeBody::GroupTopN(_)
255                    | NodeBody::Now(_)
256            )
257        }
258        let is_stateful = is_stateful_executor(node);
259
260        // Create the input executor before creating itself
261        let mut input = Vec::with_capacity(node.input.iter().len());
262        for input_stream_node in &node.input {
263            input.push(
264                self.create_nodes_inner(
265                    fragment_id,
266                    input_stream_node,
267                    env.clone(),
268                    store.clone(),
269                    actor_context,
270                    vnode_bitmap.clone(),
271                    has_stateful || is_stateful,
272                    subtasks,
273                    local_barrier_manager,
274                )
275                .await?,
276            );
277        }
278
279        self.generate_executor_from_inputs(
280            fragment_id,
281            node,
282            env,
283            store,
284            actor_context,
285            vnode_bitmap,
286            has_stateful || is_stateful,
287            subtasks,
288            local_barrier_manager,
289            input,
290        )
291        .await
292    }
293
294    #[expect(clippy::too_many_arguments)]
295    async fn generate_executor_from_inputs(
296        &self,
297        fragment_id: FragmentId,
298        node: &stream_plan::StreamNode,
299        env: StreamEnvironment,
300        store: impl StateStore,
301        actor_context: &ActorContextRef,
302        vnode_bitmap: Option<Bitmap>,
303        has_stateful: bool,
304        subtasks: &mut Vec<SubtaskHandle>,
305        local_barrier_manager: &LocalBarrierManager,
306        input: Vec<Executor>,
307    ) -> StreamResult<Executor> {
308        let op_info = node.get_identity().clone();
309
310        // We assume that the operator_id of different instances from the same RelNode will be the
311        // same.
312        let executor_id = Self::get_executor_id(actor_context, node);
313        let operator_id = unique_operator_id(fragment_id, node.operator_id);
314
315        let info = Self::get_executor_info(node, executor_id);
316
317        let eval_error_report = ActorEvalErrorReport {
318            actor_context: actor_context.clone(),
319            identity: info.identity.clone().into(),
320        };
321
322        // Build the executor with params.
323        let executor_params = ExecutorParams {
324            env: env.clone(),
325
326            info: info.clone(),
327            executor_id,
328            operator_id,
329            op_info,
330            input,
331            fragment_id,
332            executor_stats: self.streaming_metrics.clone(),
333            actor_context: actor_context.clone(),
334            vnode_bitmap,
335            eval_error_report,
336            watermark_epoch: self.watermark_epoch.clone(),
337            local_barrier_manager: local_barrier_manager.clone(),
338        };
339
340        let executor = create_executor(executor_params, node, store).await?;
341
342        // Wrap the executor for debug purpose.
343        let wrapped = WrapperExecutor::new(
344            executor,
345            actor_context.clone(),
346            env.config().developer.enable_executor_row_count,
347            env.config().developer.enable_explain_analyze_stats,
348        );
349        let executor = (info, wrapped).into();
350
351        // If there're multiple stateful executors in this actor, we will wrap it into a subtask.
352        let executor = if has_stateful {
353            // TODO(bugen): subtask does not work with tracing spans.
354            // let (subtask, executor) = subtask::wrap(executor, actor_context.id);
355            // subtasks.push(subtask);
356            // executor.boxed()
357
358            let _ = subtasks;
359            executor
360        } else {
361            executor
362        };
363
364        Ok(executor)
365    }
366
367    /// Create a chain(tree) of nodes and return the head executor.
368    async fn create_nodes(
369        &self,
370        fragment_id: FragmentId,
371        node: &stream_plan::StreamNode,
372        env: StreamEnvironment,
373        actor_context: &ActorContextRef,
374        vnode_bitmap: Option<Bitmap>,
375        local_barrier_manager: &LocalBarrierManager,
376    ) -> StreamResult<(Executor, Vec<SubtaskHandle>)> {
377        let mut subtasks = vec![];
378
379        let executor = dispatch_state_store!(env.state_store(), store, {
380            self.create_nodes_inner(
381                fragment_id,
382                node,
383                env,
384                store,
385                actor_context,
386                vnode_bitmap,
387                false,
388                &mut subtasks,
389                local_barrier_manager,
390            )
391            .await
392        })?;
393
394        Ok((executor, subtasks))
395    }
396
397    async fn create_actor(
398        self: Arc<Self>,
399        actor: BuildActorInfo,
400        fragment_id: FragmentId,
401        node: Arc<StreamNode>,
402        related_subscriptions: Arc<HashMap<TableId, HashSet<u32>>>,
403        local_barrier_manager: LocalBarrierManager,
404        new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
405    ) -> StreamResult<Actor<DispatchExecutor>> {
406        {
407            let actor_id = actor.actor_id;
408            let streaming_config = self.env.config().clone();
409            let actor_context = ActorContext::create(
410                &actor,
411                fragment_id,
412                self.env.total_mem_usage(),
413                self.streaming_metrics.clone(),
414                related_subscriptions,
415                self.env.meta_client().clone(),
416                streaming_config,
417            );
418            let vnode_bitmap = actor.vnode_bitmap.as_ref().map(|b| b.into());
419            let expr_context = actor.expr_context.clone().unwrap();
420
421            let (executor, subtasks) = self
422                .create_nodes(
423                    fragment_id,
424                    &node,
425                    self.env.clone(),
426                    &actor_context,
427                    vnode_bitmap,
428                    &local_barrier_manager,
429                )
430                .await?;
431
432            let dispatcher = DispatchExecutor::new(
433                executor,
434                new_output_request_rx,
435                actor.dispatchers,
436                actor_id,
437                fragment_id,
438                local_barrier_manager.clone(),
439                self.streaming_metrics.clone(),
440            )
441            .await?;
442            let actor = Actor::new(
443                dispatcher,
444                subtasks,
445                self.streaming_metrics.clone(),
446                actor_context.clone(),
447                expr_context,
448                local_barrier_manager,
449            );
450            Ok(actor)
451        }
452    }
453
454    pub(super) fn spawn_actor(
455        self: &Arc<Self>,
456        actor: BuildActorInfo,
457        fragment_id: FragmentId,
458        node: Arc<StreamNode>,
459        related_subscriptions: Arc<HashMap<TableId, HashSet<u32>>>,
460        local_barrier_manager: LocalBarrierManager,
461        new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
462    ) -> (JoinHandle<()>, Option<JoinHandle<()>>) {
463        {
464            let monitor = tokio_metrics::TaskMonitor::new();
465            let stream_actor_ref = &actor;
466            let actor_id = stream_actor_ref.actor_id;
467            let handle = {
468                let trace_span =
469                    format!("Actor {actor_id}: `{}`", stream_actor_ref.mview_definition);
470                let barrier_manager = local_barrier_manager.clone();
471                // wrap the future of `create_actor` with `boxed` to avoid stack overflow
472                let actor = self
473                    .clone()
474                    .create_actor(
475                        actor,
476                        fragment_id,
477                        node,
478                        related_subscriptions,
479                        barrier_manager.clone(),
480                        new_output_request_rx
481                    ).boxed().and_then(|actor| actor.run()).map(move |result| {
482                    if let Err(err) = result {
483                        // TODO: check error type and panic if it's unexpected.
484                        // Intentionally use `?` on the report to also include the backtrace.
485                        tracing::error!(actor_id, error = ?err.as_report(), "actor exit with error");
486                        barrier_manager.notify_failure(actor_id, err);
487                    }
488                });
489                let traced = match &self.await_tree_reg {
490                    Some(m) => m
491                        .register(await_tree_key::Actor(actor_id), trace_span)
492                        .instrument(actor)
493                        .left_future(),
494                    None => actor.right_future(),
495                };
496                let instrumented = monitor.instrument(traced);
497                let with_config = crate::CONFIG.scope(self.env.config().clone(), instrumented);
498                // If hummock tracing is not enabled, it directly returns wrapped future.
499                let may_track_hummock = with_config.may_trace_hummock();
500
501                self.runtime.spawn(may_track_hummock)
502            };
503
504            let monitor_handle = if self.streaming_metrics.level >= MetricLevel::Debug
505                || self.env.config().developer.enable_actor_tokio_metrics
506            {
507                tracing::info!("Tokio metrics are enabled.");
508                let streaming_metrics = self.streaming_metrics.clone();
509                let actor_monitor_task = self.runtime.spawn(async move {
510                    let metrics = streaming_metrics.new_actor_metrics(actor_id);
511                    loop {
512                        let task_metrics = monitor.cumulative();
513                        metrics
514                            .actor_execution_time
515                            .set(task_metrics.total_poll_duration.as_secs_f64());
516                        metrics
517                            .actor_fast_poll_duration
518                            .set(task_metrics.total_fast_poll_duration.as_secs_f64());
519                        metrics
520                            .actor_fast_poll_cnt
521                            .set(task_metrics.total_fast_poll_count as i64);
522                        metrics
523                            .actor_slow_poll_duration
524                            .set(task_metrics.total_slow_poll_duration.as_secs_f64());
525                        metrics
526                            .actor_slow_poll_cnt
527                            .set(task_metrics.total_slow_poll_count as i64);
528                        metrics
529                            .actor_poll_duration
530                            .set(task_metrics.total_poll_duration.as_secs_f64());
531                        metrics
532                            .actor_poll_cnt
533                            .set(task_metrics.total_poll_count as i64);
534                        metrics
535                            .actor_idle_duration
536                            .set(task_metrics.total_idle_duration.as_secs_f64());
537                        metrics
538                            .actor_idle_cnt
539                            .set(task_metrics.total_idled_count as i64);
540                        metrics
541                            .actor_scheduled_duration
542                            .set(task_metrics.total_scheduled_duration.as_secs_f64());
543                        metrics
544                            .actor_scheduled_cnt
545                            .set(task_metrics.total_scheduled_count as i64);
546                        tokio::time::sleep(Duration::from_secs(1)).await;
547                    }
548                });
549                Some(actor_monitor_task)
550            } else {
551                None
552            };
553            (handle, monitor_handle)
554        }
555    }
556}
557
558/// Parameters to construct executors.
559/// - [`crate::from_proto::create_executor`]
560/// - [`StreamActorManager::create_nodes`]
561pub struct ExecutorParams {
562    pub env: StreamEnvironment,
563
564    /// Basic information about the executor.
565    pub info: ExecutorInfo,
566
567    /// Executor id, unique across all actors.
568    pub executor_id: u64,
569
570    /// Operator id, unique for each operator in fragment.
571    pub operator_id: u64,
572
573    /// Information of the operator from plan node, like `StreamHashJoin { .. }`.
574    // TODO: use it for `identity`
575    pub op_info: String,
576
577    /// The input executor.
578    pub input: Vec<Executor>,
579
580    /// `FragmentId` of the actor
581    pub fragment_id: FragmentId,
582
583    /// Metrics
584    pub executor_stats: Arc<StreamingMetrics>,
585
586    /// Actor context
587    pub actor_context: ActorContextRef,
588
589    /// Vnodes owned by this executor. Represented in bitmap.
590    pub vnode_bitmap: Option<Bitmap>,
591
592    /// Used for reporting expression evaluation errors.
593    pub eval_error_report: ActorEvalErrorReport,
594
595    /// `watermark_epoch` field in `MemoryManager`
596    pub watermark_epoch: AtomicU64Ref,
597
598    pub local_barrier_manager: LocalBarrierManager,
599}
600
601impl Debug for ExecutorParams {
602    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
603        f.debug_struct("ExecutorParams")
604            .field("info", &self.info)
605            .field("executor_id", &self.executor_id)
606            .field("operator_id", &self.operator_id)
607            .field("op_info", &self.op_info)
608            .field("input", &self.input.len())
609            .field("actor_id", &self.actor_context.id)
610            .finish_non_exhaustive()
611    }
612}