risingwave_stream/task/
actor_manager.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use core::time::Duration;
16use std::fmt::Debug;
17use std::sync::Arc;
18
19use async_recursion::async_recursion;
20use futures::{FutureExt, TryFutureExt};
21use itertools::Itertools;
22use risingwave_common::bitmap::Bitmap;
23use risingwave_common::catalog::{ColumnId, Field, Schema};
24use risingwave_common::config::{MetricLevel, StreamingConfig};
25use risingwave_common::must_match;
26use risingwave_common::operator::{unique_executor_id, unique_operator_id};
27use risingwave_common::util::runtime::BackgroundShutdownRuntime;
28use risingwave_pb::plan_common::StorageTableDesc;
29use risingwave_pb::stream_plan::stream_node::NodeBody;
30use risingwave_pb::stream_plan::{self, StreamNode, StreamScanNode, StreamScanType};
31use risingwave_pb::stream_service::inject_barrier_request::BuildActorInfo;
32use risingwave_storage::monitor::HummockTraceFutureExt;
33use risingwave_storage::table::batch_table::BatchTable;
34use risingwave_storage::{StateStore, dispatch_state_store};
35use thiserror_ext::AsReport;
36use tokio::sync::mpsc::UnboundedReceiver;
37use tokio::task::JoinHandle;
38
39use crate::common::table::state_table::StateTableBuilder;
40use crate::error::StreamResult;
41use crate::executor::monitor::StreamingMetrics;
42use crate::executor::subtask::SubtaskHandle;
43use crate::executor::{
44    Actor, ActorContext, ActorContextRef, DispatchExecutor, Execute, Executor, ExecutorInfo,
45    MergeExecutorInput, SnapshotBackfillExecutor, TroublemakerExecutor, WrapperExecutor,
46};
47use crate::from_proto::{MergeExecutorBuilder, create_executor};
48use crate::task::{
49    ActorEvalErrorReport, ActorId, AtomicU64Ref, FragmentId, LocalBarrierManager, NewOutputRequest,
50    StreamEnvironment, await_tree_key,
51};
52
53/// [Spawning actors](`Self::spawn_actor`), called by [`crate::task::barrier_worker::managed_state::DatabaseManagedBarrierState`].
54///
55/// See [`crate::task`] for architecture overview.
56pub(crate) struct StreamActorManager {
57    pub(super) env: StreamEnvironment,
58    pub(super) streaming_metrics: Arc<StreamingMetrics>,
59
60    /// Watermark epoch number.
61    pub(super) watermark_epoch: AtomicU64Ref,
62
63    /// Manages the await-trees of all actors.
64    pub(super) await_tree_reg: Option<await_tree::Registry>,
65
66    /// Runtime for the streaming actors.
67    pub(super) runtime: BackgroundShutdownRuntime,
68}
69
70impl StreamActorManager {
71    fn get_executor_id(actor_context: &ActorContext, node: &StreamNode) -> u64 {
72        // We assume that the operator_id of different instances from the same RelNode will be the
73        // same.
74        unique_executor_id(actor_context.id, node.operator_id)
75    }
76
77    fn get_executor_info(node: &StreamNode, executor_id: u64) -> ExecutorInfo {
78        let schema: Schema = node.fields.iter().map(Field::from).collect();
79
80        let stream_key = node
81            .get_stream_key()
82            .iter()
83            .map(|idx| *idx as usize)
84            .collect::<Vec<_>>();
85
86        let stream_kind = node.stream_kind();
87
88        let identity = format!("{} {:X}", node.get_node_body().unwrap(), executor_id);
89
90        ExecutorInfo {
91            schema,
92            stream_key,
93            stream_kind,
94            identity,
95            id: executor_id,
96        }
97    }
98
99    async fn create_snapshot_backfill_input(
100        &self,
101        upstream_node: &StreamNode,
102        actor_context: &ActorContextRef,
103        local_barrier_manager: &LocalBarrierManager,
104        chunk_size: usize,
105    ) -> StreamResult<MergeExecutorInput> {
106        let info = Self::get_executor_info(
107            upstream_node,
108            Self::get_executor_id(actor_context, upstream_node),
109        );
110
111        let upstream_merge = must_match!(upstream_node.get_node_body().unwrap(), NodeBody::Merge(upstream_merge) => {
112            upstream_merge
113        });
114
115        MergeExecutorBuilder::new_input(
116            local_barrier_manager.clone(),
117            self.streaming_metrics.clone(),
118            actor_context.clone(),
119            info,
120            upstream_merge,
121            chunk_size,
122        )
123        .await
124    }
125
126    async fn create_snapshot_backfill_node(
127        &self,
128        stream_node: &StreamNode,
129        node: &StreamScanNode,
130        actor_context: &ActorContextRef,
131        vnode_bitmap: Option<Bitmap>,
132        local_barrier_manager: &LocalBarrierManager,
133        state_store: impl StateStore,
134    ) -> StreamResult<Executor> {
135        let [upstream_node, _]: &[_; 2] = stream_node.input.as_slice().try_into().unwrap();
136        let chunk_size = actor_context.config.developer.chunk_size;
137        let upstream = self
138            .create_snapshot_backfill_input(
139                upstream_node,
140                actor_context,
141                local_barrier_manager,
142                chunk_size,
143            )
144            .await?;
145
146        let table_desc: &StorageTableDesc = node.get_table_desc()?;
147
148        let output_indices = node
149            .output_indices
150            .iter()
151            .map(|&i| i as usize)
152            .collect_vec();
153
154        let column_ids = node
155            .upstream_column_ids
156            .iter()
157            .map(ColumnId::from)
158            .collect_vec();
159
160        let progress = local_barrier_manager.register_create_mview_progress(actor_context);
161
162        let vnodes = vnode_bitmap.map(Arc::new);
163        let barrier_rx = local_barrier_manager.subscribe_barrier(actor_context.id);
164
165        let upstream_table =
166            BatchTable::new_partial(state_store.clone(), column_ids, vnodes.clone(), table_desc);
167
168        let state_table = node.get_state_table()?;
169        let state_table = StateTableBuilder::new(state_table, state_store.clone(), vnodes)
170            .enable_preload_all_rows_by_config(&actor_context.config)
171            .build()
172            .await;
173
174        let executor = SnapshotBackfillExecutor::new(
175            upstream_table,
176            state_table,
177            upstream,
178            output_indices,
179            actor_context.clone(),
180            progress,
181            chunk_size,
182            node.rate_limit.into(),
183            barrier_rx,
184            self.streaming_metrics.clone(),
185            node.snapshot_backfill_epoch,
186        )
187        .boxed();
188
189        let info = Self::get_executor_info(
190            stream_node,
191            Self::get_executor_id(actor_context, stream_node),
192        );
193
194        if crate::consistency::insane() {
195            let mut troubled_info = info.clone();
196            troubled_info.identity = format!("{} (troubled)", info.identity);
197            Ok((
198                info,
199                TroublemakerExecutor::new((troubled_info, executor).into(), chunk_size),
200            )
201                .into())
202        } else {
203            Ok((info, executor).into())
204        }
205    }
206
207    /// Create a chain(tree) of nodes, with given `store`.
208    #[expect(clippy::too_many_arguments)]
209    #[async_recursion]
210    async fn create_nodes_inner(
211        &self,
212        fragment_id: FragmentId,
213        node: &stream_plan::StreamNode,
214        env: StreamEnvironment,
215        store: impl StateStore,
216        actor_context: &ActorContextRef,
217        vnode_bitmap: Option<Bitmap>,
218        has_stateful: bool,
219        subtasks: &mut Vec<SubtaskHandle>,
220        local_barrier_manager: &LocalBarrierManager,
221    ) -> StreamResult<Executor> {
222        if let NodeBody::StreamScan(stream_scan) = node.get_node_body().unwrap()
223            && let Ok(StreamScanType::SnapshotBackfill) = stream_scan.get_stream_scan_type()
224        {
225            return dispatch_state_store!(env.state_store(), store, {
226                self.create_snapshot_backfill_node(
227                    node,
228                    stream_scan,
229                    actor_context,
230                    vnode_bitmap,
231                    local_barrier_manager,
232                    store,
233                )
234                .await
235            });
236        }
237
238        // The "stateful" here means that the executor may issue read operations to the state store
239        // massively and continuously. Used to decide whether to apply the optimization of subtasks.
240        fn is_stateful_executor(stream_node: &StreamNode) -> bool {
241            matches!(
242                stream_node.get_node_body().unwrap(),
243                NodeBody::HashAgg(_)
244                    | NodeBody::HashJoin(_)
245                    | NodeBody::DeltaIndexJoin(_)
246                    | NodeBody::Lookup(_)
247                    | NodeBody::StreamScan(_)
248                    | NodeBody::StreamCdcScan(_)
249                    | NodeBody::DynamicFilter(_)
250                    | NodeBody::GroupTopN(_)
251                    | NodeBody::Now(_)
252            )
253        }
254        let is_stateful = is_stateful_executor(node);
255
256        // Create the input executor before creating itself
257        let mut input = Vec::with_capacity(node.input.iter().len());
258        for input_stream_node in &node.input {
259            input.push(
260                self.create_nodes_inner(
261                    fragment_id,
262                    input_stream_node,
263                    env.clone(),
264                    store.clone(),
265                    actor_context,
266                    vnode_bitmap.clone(),
267                    has_stateful || is_stateful,
268                    subtasks,
269                    local_barrier_manager,
270                )
271                .await?,
272            );
273        }
274
275        self.generate_executor_from_inputs(
276            fragment_id,
277            node,
278            env,
279            store,
280            actor_context,
281            vnode_bitmap,
282            has_stateful || is_stateful,
283            subtasks,
284            local_barrier_manager,
285            input,
286        )
287        .await
288    }
289
290    #[expect(clippy::too_many_arguments)]
291    async fn generate_executor_from_inputs(
292        &self,
293        fragment_id: FragmentId,
294        node: &stream_plan::StreamNode,
295        env: StreamEnvironment,
296        store: impl StateStore,
297        actor_context: &ActorContextRef,
298        vnode_bitmap: Option<Bitmap>,
299        has_stateful: bool,
300        subtasks: &mut Vec<SubtaskHandle>,
301        local_barrier_manager: &LocalBarrierManager,
302        input: Vec<Executor>,
303    ) -> StreamResult<Executor> {
304        let op_info = node.get_identity().clone();
305
306        // We assume that the operator_id of different instances from the same RelNode will be the
307        // same.
308        let executor_id = Self::get_executor_id(actor_context, node);
309        let operator_id = unique_operator_id(fragment_id, node.operator_id);
310
311        let info = Self::get_executor_info(node, executor_id);
312
313        let eval_error_report = ActorEvalErrorReport {
314            actor_context: actor_context.clone(),
315            identity: info.identity.clone().into(),
316        };
317
318        // Build the executor with params.
319        let executor_params = ExecutorParams {
320            env: env.clone(),
321
322            info: info.clone(),
323            executor_id,
324            operator_id,
325            op_info,
326            input,
327            fragment_id,
328            executor_stats: self.streaming_metrics.clone(),
329            actor_context: actor_context.clone(),
330            vnode_bitmap,
331            eval_error_report,
332            watermark_epoch: self.watermark_epoch.clone(),
333            local_barrier_manager: local_barrier_manager.clone(),
334            config: actor_context.config.clone(),
335        };
336
337        let executor = create_executor(executor_params, node, store).await?;
338
339        // Wrap the executor for debug purpose.
340        let wrapped = WrapperExecutor::new(executor, actor_context.clone());
341        let executor = (info, wrapped).into();
342
343        // If there're multiple stateful executors in this actor, we will wrap it into a subtask.
344        let executor = if has_stateful {
345            // TODO(bugen): subtask does not work with tracing spans.
346            // let (subtask, executor) = subtask::wrap(executor, actor_context.id);
347            // subtasks.push(subtask);
348            // executor.boxed()
349
350            let _ = subtasks;
351            executor
352        } else {
353            executor
354        };
355
356        Ok(executor)
357    }
358
359    /// Create a chain(tree) of nodes and return the head executor.
360    async fn create_nodes(
361        &self,
362        fragment_id: FragmentId,
363        node: &stream_plan::StreamNode,
364        env: StreamEnvironment,
365        actor_context: &ActorContextRef,
366        vnode_bitmap: Option<Bitmap>,
367        local_barrier_manager: &LocalBarrierManager,
368    ) -> StreamResult<(Executor, Vec<SubtaskHandle>)> {
369        let mut subtasks = vec![];
370
371        let executor = dispatch_state_store!(env.state_store(), store, {
372            self.create_nodes_inner(
373                fragment_id,
374                node,
375                env,
376                store,
377                actor_context,
378                vnode_bitmap,
379                false,
380                &mut subtasks,
381                local_barrier_manager,
382            )
383            .await
384        })?;
385
386        Ok((executor, subtasks))
387    }
388
389    async fn create_actor(
390        self: Arc<Self>,
391        actor: BuildActorInfo,
392        fragment_id: FragmentId,
393        node: Arc<StreamNode>,
394        local_barrier_manager: LocalBarrierManager,
395        new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
396    ) -> StreamResult<Actor<DispatchExecutor>> {
397        let actor_id = actor.actor_id;
398        let actor_context = ActorContext::create(
399            &actor,
400            fragment_id,
401            self.env.total_mem_usage(),
402            self.streaming_metrics.clone(),
403            self.env.meta_client(),
404            self.env.global_config().clone(), // TODO(config): local config for actor
405            self.env.clone(),
406        );
407        let vnode_bitmap = actor.vnode_bitmap.as_ref().map(|b| b.into());
408        let expr_context = actor.expr_context.clone().unwrap();
409
410        let (executor, subtasks) = self
411            .create_nodes(
412                fragment_id,
413                &node,
414                self.env.clone(),
415                &actor_context,
416                vnode_bitmap,
417                &local_barrier_manager,
418            )
419            .await?;
420
421        let dispatcher = DispatchExecutor::new(
422            executor,
423            new_output_request_rx,
424            actor.dispatchers,
425            actor_id,
426            fragment_id,
427            local_barrier_manager.clone(),
428            self.streaming_metrics.clone(),
429        )
430        .await?;
431        let actor = Actor::new(
432            dispatcher,
433            subtasks,
434            self.streaming_metrics.clone(),
435            actor_context.clone(),
436            expr_context,
437            local_barrier_manager,
438        );
439        Ok(actor)
440    }
441
442    pub(super) fn spawn_actor(
443        self: &Arc<Self>,
444        actor: BuildActorInfo,
445        fragment_id: FragmentId,
446        node: Arc<StreamNode>,
447        local_barrier_manager: LocalBarrierManager,
448        new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
449    ) -> (JoinHandle<()>, Option<JoinHandle<()>>) {
450        let monitor = tokio_metrics::TaskMonitor::new();
451        let stream_actor_ref = &actor;
452        let actor_id = stream_actor_ref.actor_id;
453        let handle = {
454            let trace_span = format!("Actor {actor_id}: `{}`", stream_actor_ref.mview_definition);
455            let barrier_manager = local_barrier_manager;
456            // wrap the future of `create_actor` with `boxed` to avoid stack overflow
457            let actor = self
458                .clone()
459                .create_actor(
460                    actor,
461                    fragment_id,
462                    node,
463                    barrier_manager.clone(),
464                    new_output_request_rx,
465                )
466                .boxed()
467                .and_then(|actor| actor.run())
468                .map(move |result| {
469                    if let Err(err) = result {
470                        // TODO: check error type and panic if it's unexpected.
471                        // Intentionally use `?` on the report to also include the backtrace.
472                        tracing::error!(%actor_id, error = ?err.as_report(), "actor exit with error");
473                        barrier_manager.notify_failure(actor_id, err);
474                    }
475                });
476            let traced = match &self.await_tree_reg {
477                Some(m) => m
478                    .register(await_tree_key::Actor(actor_id), trace_span)
479                    .instrument(actor)
480                    .left_future(),
481                None => actor.right_future(),
482            };
483            let instrumented = monitor.instrument(traced);
484            // If hummock tracing is not enabled, it directly returns wrapped future.
485            let may_track_hummock = instrumented.may_trace_hummock();
486
487            self.runtime.spawn(may_track_hummock)
488        };
489
490        let monitor_handle = if self.streaming_metrics.level >= MetricLevel::Debug
491            || self
492                .env
493                .global_config()
494                .developer
495                .enable_actor_tokio_metrics
496        {
497            tracing::info!("Tokio metrics are enabled.");
498            let streaming_metrics = self.streaming_metrics.clone();
499            let actor_monitor_task = self.runtime.spawn(async move {
500                let metrics = streaming_metrics.new_actor_metrics(actor_id);
501                loop {
502                    let task_metrics = monitor.cumulative();
503                    metrics
504                        .actor_execution_time
505                        .set(task_metrics.total_poll_duration.as_secs_f64());
506                    metrics
507                        .actor_fast_poll_duration
508                        .set(task_metrics.total_fast_poll_duration.as_secs_f64());
509                    metrics
510                        .actor_fast_poll_cnt
511                        .set(task_metrics.total_fast_poll_count as i64);
512                    metrics
513                        .actor_slow_poll_duration
514                        .set(task_metrics.total_slow_poll_duration.as_secs_f64());
515                    metrics
516                        .actor_slow_poll_cnt
517                        .set(task_metrics.total_slow_poll_count as i64);
518                    metrics
519                        .actor_poll_duration
520                        .set(task_metrics.total_poll_duration.as_secs_f64());
521                    metrics
522                        .actor_poll_cnt
523                        .set(task_metrics.total_poll_count as i64);
524                    metrics
525                        .actor_idle_duration
526                        .set(task_metrics.total_idle_duration.as_secs_f64());
527                    metrics
528                        .actor_idle_cnt
529                        .set(task_metrics.total_idled_count as i64);
530                    metrics
531                        .actor_scheduled_duration
532                        .set(task_metrics.total_scheduled_duration.as_secs_f64());
533                    metrics
534                        .actor_scheduled_cnt
535                        .set(task_metrics.total_scheduled_count as i64);
536                    tokio::time::sleep(Duration::from_secs(1)).await;
537                }
538            });
539            Some(actor_monitor_task)
540        } else {
541            None
542        };
543        (handle, monitor_handle)
544    }
545}
546
547/// Parameters to construct executors.
548/// - [`crate::from_proto::create_executor`]
549/// - [`StreamActorManager::create_nodes`]
550pub struct ExecutorParams {
551    pub env: StreamEnvironment,
552
553    /// Basic information about the executor.
554    pub info: ExecutorInfo,
555
556    /// Executor id, unique across all actors.
557    pub executor_id: u64,
558
559    /// Operator id, unique for each operator in fragment.
560    pub operator_id: u64,
561
562    /// Information of the operator from plan node, like `StreamHashJoin { .. }`.
563    // TODO: use it for `identity`
564    pub op_info: String,
565
566    /// The input executor.
567    pub input: Vec<Executor>,
568
569    /// `FragmentId` of the actor
570    pub fragment_id: FragmentId,
571
572    /// Metrics
573    pub executor_stats: Arc<StreamingMetrics>,
574
575    /// Actor context
576    pub actor_context: ActorContextRef,
577
578    /// Vnodes owned by this executor. Represented in bitmap.
579    pub vnode_bitmap: Option<Bitmap>,
580
581    /// Used for reporting expression evaluation errors.
582    pub eval_error_report: ActorEvalErrorReport,
583
584    /// `watermark_epoch` field in `MemoryManager`
585    pub watermark_epoch: AtomicU64Ref,
586
587    pub local_barrier_manager: LocalBarrierManager,
588
589    /// The local streaming configuration for this specific actor. Same as `actor_context.config`.
590    ///
591    /// Compared to `stream_env.global_config`, this config can have some entries overridden by the user.
592    pub config: Arc<StreamingConfig>,
593}
594
595impl Debug for ExecutorParams {
596    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
597        f.debug_struct("ExecutorParams")
598            .field("info", &self.info)
599            .field("executor_id", &self.executor_id)
600            .field("operator_id", &self.operator_id)
601            .field("op_info", &self.op_info)
602            .field("input", &self.input.len())
603            .field("actor_id", &self.actor_context.id)
604            .finish_non_exhaustive()
605    }
606}