risingwave_stream/task/
actor_manager.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use core::time::Duration;
16use std::fmt::Debug;
17use std::sync::Arc;
18
19use async_recursion::async_recursion;
20use futures::{FutureExt, TryFutureExt};
21use itertools::Itertools;
22use risingwave_common::bitmap::Bitmap;
23use risingwave_common::catalog::{ColumnId, Field, Schema};
24use risingwave_common::config::MetricLevel;
25use risingwave_common::must_match;
26use risingwave_common::operator::{unique_executor_id, unique_operator_id};
27use risingwave_common::util::runtime::BackgroundShutdownRuntime;
28use risingwave_pb::plan_common::StorageTableDesc;
29use risingwave_pb::stream_plan::stream_node::NodeBody;
30use risingwave_pb::stream_plan::{self, StreamNode, StreamScanNode, StreamScanType};
31use risingwave_pb::stream_service::inject_barrier_request::BuildActorInfo;
32use risingwave_storage::monitor::HummockTraceFutureExt;
33use risingwave_storage::table::batch_table::BatchTable;
34use risingwave_storage::{StateStore, dispatch_state_store};
35use thiserror_ext::AsReport;
36use tokio::sync::mpsc::UnboundedReceiver;
37use tokio::task::JoinHandle;
38
39use crate::common::table::state_table::StateTableBuilder;
40use crate::error::StreamResult;
41use crate::executor::monitor::StreamingMetrics;
42use crate::executor::subtask::SubtaskHandle;
43use crate::executor::{
44    Actor, ActorContext, ActorContextRef, DispatchExecutor, Execute, Executor, ExecutorInfo,
45    MergeExecutorInput, SnapshotBackfillExecutor, TroublemakerExecutor, WrapperExecutor,
46};
47use crate::from_proto::{MergeExecutorBuilder, create_executor};
48use crate::task::{
49    ActorEvalErrorReport, ActorId, AtomicU64Ref, FragmentId, LocalBarrierManager, NewOutputRequest,
50    StreamEnvironment, await_tree_key,
51};
52
53/// [Spawning actors](`Self::spawn_actor`), called by [`crate::task::barrier_worker::managed_state::DatabaseManagedBarrierState`].
54///
55/// See [`crate::task`] for architecture overview.
56pub(crate) struct StreamActorManager {
57    pub(super) env: StreamEnvironment,
58    pub(super) streaming_metrics: Arc<StreamingMetrics>,
59
60    /// Watermark epoch number.
61    pub(super) watermark_epoch: AtomicU64Ref,
62
63    /// Manages the await-trees of all actors.
64    pub(super) await_tree_reg: Option<await_tree::Registry>,
65
66    /// Runtime for the streaming actors.
67    pub(super) runtime: BackgroundShutdownRuntime,
68}
69
70impl StreamActorManager {
71    fn get_executor_id(actor_context: &ActorContext, node: &StreamNode) -> u64 {
72        // We assume that the operator_id of different instances from the same RelNode will be the
73        // same.
74        unique_executor_id(actor_context.id, node.operator_id)
75    }
76
77    fn get_executor_info(node: &StreamNode, executor_id: u64) -> ExecutorInfo {
78        let schema: Schema = node.fields.iter().map(Field::from).collect();
79
80        let pk_indices = node
81            .get_stream_key()
82            .iter()
83            .map(|idx| *idx as usize)
84            .collect::<Vec<_>>();
85
86        let stream_kind = node.stream_kind();
87
88        let identity = format!("{} {:X}", node.get_node_body().unwrap(), executor_id);
89
90        ExecutorInfo {
91            schema,
92            pk_indices,
93            stream_kind,
94            identity,
95            id: executor_id,
96        }
97    }
98
99    async fn create_snapshot_backfill_input(
100        &self,
101        upstream_node: &StreamNode,
102        actor_context: &ActorContextRef,
103        local_barrier_manager: &LocalBarrierManager,
104        chunk_size: usize,
105    ) -> StreamResult<MergeExecutorInput> {
106        let info = Self::get_executor_info(
107            upstream_node,
108            Self::get_executor_id(actor_context, upstream_node),
109        );
110
111        let upstream_merge = must_match!(upstream_node.get_node_body().unwrap(), NodeBody::Merge(upstream_merge) => {
112            upstream_merge
113        });
114
115        MergeExecutorBuilder::new_input(
116            local_barrier_manager.clone(),
117            self.streaming_metrics.clone(),
118            actor_context.clone(),
119            info,
120            upstream_merge,
121            chunk_size,
122        )
123        .await
124    }
125
126    #[expect(clippy::too_many_arguments)]
127    async fn create_snapshot_backfill_node(
128        &self,
129        stream_node: &StreamNode,
130        node: &StreamScanNode,
131        actor_context: &ActorContextRef,
132        vnode_bitmap: Option<Bitmap>,
133        env: StreamEnvironment,
134        local_barrier_manager: &LocalBarrierManager,
135        state_store: impl StateStore,
136    ) -> StreamResult<Executor> {
137        let [upstream_node, _]: &[_; 2] = stream_node.input.as_slice().try_into().unwrap();
138        let chunk_size = env.config().developer.chunk_size;
139        let upstream = self
140            .create_snapshot_backfill_input(
141                upstream_node,
142                actor_context,
143                local_barrier_manager,
144                chunk_size,
145            )
146            .await?;
147
148        let table_desc: &StorageTableDesc = node.get_table_desc()?;
149
150        let output_indices = node
151            .output_indices
152            .iter()
153            .map(|&i| i as usize)
154            .collect_vec();
155
156        let column_ids = node
157            .upstream_column_ids
158            .iter()
159            .map(ColumnId::from)
160            .collect_vec();
161
162        let progress = local_barrier_manager.register_create_mview_progress(actor_context.id);
163
164        let vnodes = vnode_bitmap.map(Arc::new);
165        let barrier_rx = local_barrier_manager.subscribe_barrier(actor_context.id);
166
167        let upstream_table =
168            BatchTable::new_partial(state_store.clone(), column_ids, vnodes.clone(), table_desc);
169
170        let state_table = node.get_state_table()?;
171        let state_table = StateTableBuilder::new(state_table, state_store.clone(), vnodes)
172            .enable_preload_all_rows_by_config(&actor_context.streaming_config)
173            .build()
174            .await;
175
176        let executor = SnapshotBackfillExecutor::new(
177            upstream_table,
178            state_table,
179            upstream,
180            output_indices,
181            actor_context.clone(),
182            progress,
183            chunk_size,
184            node.rate_limit.into(),
185            barrier_rx,
186            self.streaming_metrics.clone(),
187            node.snapshot_backfill_epoch,
188        )
189        .boxed();
190
191        let info = Self::get_executor_info(
192            stream_node,
193            Self::get_executor_id(actor_context, stream_node),
194        );
195
196        if crate::consistency::insane() {
197            let mut troubled_info = info.clone();
198            troubled_info.identity = format!("{} (troubled)", info.identity);
199            Ok((
200                info,
201                TroublemakerExecutor::new((troubled_info, executor).into(), chunk_size),
202            )
203                .into())
204        } else {
205            Ok((info, executor).into())
206        }
207    }
208
209    /// Create a chain(tree) of nodes, with given `store`.
210    #[expect(clippy::too_many_arguments)]
211    #[async_recursion]
212    async fn create_nodes_inner(
213        &self,
214        fragment_id: FragmentId,
215        node: &stream_plan::StreamNode,
216        env: StreamEnvironment,
217        store: impl StateStore,
218        actor_context: &ActorContextRef,
219        vnode_bitmap: Option<Bitmap>,
220        has_stateful: bool,
221        subtasks: &mut Vec<SubtaskHandle>,
222        local_barrier_manager: &LocalBarrierManager,
223    ) -> StreamResult<Executor> {
224        if let NodeBody::StreamScan(stream_scan) = node.get_node_body().unwrap()
225            && let Ok(StreamScanType::SnapshotBackfill) = stream_scan.get_stream_scan_type()
226        {
227            return dispatch_state_store!(env.state_store(), store, {
228                self.create_snapshot_backfill_node(
229                    node,
230                    stream_scan,
231                    actor_context,
232                    vnode_bitmap,
233                    env,
234                    local_barrier_manager,
235                    store,
236                )
237                .await
238            });
239        }
240
241        // The "stateful" here means that the executor may issue read operations to the state store
242        // massively and continuously. Used to decide whether to apply the optimization of subtasks.
243        fn is_stateful_executor(stream_node: &StreamNode) -> bool {
244            matches!(
245                stream_node.get_node_body().unwrap(),
246                NodeBody::HashAgg(_)
247                    | NodeBody::HashJoin(_)
248                    | NodeBody::DeltaIndexJoin(_)
249                    | NodeBody::Lookup(_)
250                    | NodeBody::StreamScan(_)
251                    | NodeBody::StreamCdcScan(_)
252                    | NodeBody::DynamicFilter(_)
253                    | NodeBody::GroupTopN(_)
254                    | NodeBody::Now(_)
255            )
256        }
257        let is_stateful = is_stateful_executor(node);
258
259        // Create the input executor before creating itself
260        let mut input = Vec::with_capacity(node.input.iter().len());
261        for input_stream_node in &node.input {
262            input.push(
263                self.create_nodes_inner(
264                    fragment_id,
265                    input_stream_node,
266                    env.clone(),
267                    store.clone(),
268                    actor_context,
269                    vnode_bitmap.clone(),
270                    has_stateful || is_stateful,
271                    subtasks,
272                    local_barrier_manager,
273                )
274                .await?,
275            );
276        }
277
278        self.generate_executor_from_inputs(
279            fragment_id,
280            node,
281            env,
282            store,
283            actor_context,
284            vnode_bitmap,
285            has_stateful || is_stateful,
286            subtasks,
287            local_barrier_manager,
288            input,
289        )
290        .await
291    }
292
293    #[expect(clippy::too_many_arguments)]
294    async fn generate_executor_from_inputs(
295        &self,
296        fragment_id: FragmentId,
297        node: &stream_plan::StreamNode,
298        env: StreamEnvironment,
299        store: impl StateStore,
300        actor_context: &ActorContextRef,
301        vnode_bitmap: Option<Bitmap>,
302        has_stateful: bool,
303        subtasks: &mut Vec<SubtaskHandle>,
304        local_barrier_manager: &LocalBarrierManager,
305        input: Vec<Executor>,
306    ) -> StreamResult<Executor> {
307        let op_info = node.get_identity().clone();
308
309        // We assume that the operator_id of different instances from the same RelNode will be the
310        // same.
311        let executor_id = Self::get_executor_id(actor_context, node);
312        let operator_id = unique_operator_id(fragment_id, node.operator_id);
313
314        let info = Self::get_executor_info(node, executor_id);
315
316        let eval_error_report = ActorEvalErrorReport {
317            actor_context: actor_context.clone(),
318            identity: info.identity.clone().into(),
319        };
320
321        // Build the executor with params.
322        let executor_params = ExecutorParams {
323            env: env.clone(),
324
325            info: info.clone(),
326            executor_id,
327            operator_id,
328            op_info,
329            input,
330            fragment_id,
331            executor_stats: self.streaming_metrics.clone(),
332            actor_context: actor_context.clone(),
333            vnode_bitmap,
334            eval_error_report,
335            watermark_epoch: self.watermark_epoch.clone(),
336            local_barrier_manager: local_barrier_manager.clone(),
337        };
338
339        let executor = create_executor(executor_params, node, store).await?;
340
341        // Wrap the executor for debug purpose.
342        let wrapped = WrapperExecutor::new(
343            executor,
344            actor_context.clone(),
345            env.config().developer.enable_executor_row_count,
346            env.config().developer.enable_explain_analyze_stats,
347        );
348        let executor = (info, wrapped).into();
349
350        // If there're multiple stateful executors in this actor, we will wrap it into a subtask.
351        let executor = if has_stateful {
352            // TODO(bugen): subtask does not work with tracing spans.
353            // let (subtask, executor) = subtask::wrap(executor, actor_context.id);
354            // subtasks.push(subtask);
355            // executor.boxed()
356
357            let _ = subtasks;
358            executor
359        } else {
360            executor
361        };
362
363        Ok(executor)
364    }
365
366    /// Create a chain(tree) of nodes and return the head executor.
367    async fn create_nodes(
368        &self,
369        fragment_id: FragmentId,
370        node: &stream_plan::StreamNode,
371        env: StreamEnvironment,
372        actor_context: &ActorContextRef,
373        vnode_bitmap: Option<Bitmap>,
374        local_barrier_manager: &LocalBarrierManager,
375    ) -> StreamResult<(Executor, Vec<SubtaskHandle>)> {
376        let mut subtasks = vec![];
377
378        let executor = dispatch_state_store!(env.state_store(), store, {
379            self.create_nodes_inner(
380                fragment_id,
381                node,
382                env,
383                store,
384                actor_context,
385                vnode_bitmap,
386                false,
387                &mut subtasks,
388                local_barrier_manager,
389            )
390            .await
391        })?;
392
393        Ok((executor, subtasks))
394    }
395
396    async fn create_actor(
397        self: Arc<Self>,
398        actor: BuildActorInfo,
399        fragment_id: FragmentId,
400        node: Arc<StreamNode>,
401        local_barrier_manager: LocalBarrierManager,
402        new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
403    ) -> StreamResult<Actor<DispatchExecutor>> {
404        {
405            let actor_id = actor.actor_id;
406            let streaming_config = self.env.config().clone();
407            let actor_context = ActorContext::create(
408                &actor,
409                fragment_id,
410                self.env.total_mem_usage(),
411                self.streaming_metrics.clone(),
412                self.env.meta_client(),
413                streaming_config,
414                self.env.clone(),
415            );
416            let vnode_bitmap = actor.vnode_bitmap.as_ref().map(|b| b.into());
417            let expr_context = actor.expr_context.clone().unwrap();
418
419            let (executor, subtasks) = self
420                .create_nodes(
421                    fragment_id,
422                    &node,
423                    self.env.clone(),
424                    &actor_context,
425                    vnode_bitmap,
426                    &local_barrier_manager,
427                )
428                .await?;
429
430            let dispatcher = DispatchExecutor::new(
431                executor,
432                new_output_request_rx,
433                actor.dispatchers,
434                actor_id,
435                fragment_id,
436                local_barrier_manager.clone(),
437                self.streaming_metrics.clone(),
438            )
439            .await?;
440            let actor = Actor::new(
441                dispatcher,
442                subtasks,
443                self.streaming_metrics.clone(),
444                actor_context.clone(),
445                expr_context,
446                local_barrier_manager,
447            );
448            Ok(actor)
449        }
450    }
451
452    pub(super) fn spawn_actor(
453        self: &Arc<Self>,
454        actor: BuildActorInfo,
455        fragment_id: FragmentId,
456        node: Arc<StreamNode>,
457        local_barrier_manager: LocalBarrierManager,
458        new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
459    ) -> (JoinHandle<()>, Option<JoinHandle<()>>) {
460        {
461            let monitor = tokio_metrics::TaskMonitor::new();
462            let stream_actor_ref = &actor;
463            let actor_id = stream_actor_ref.actor_id;
464            let handle = {
465                let trace_span =
466                    format!("Actor {actor_id}: `{}`", stream_actor_ref.mview_definition);
467                let barrier_manager = local_barrier_manager;
468                // wrap the future of `create_actor` with `boxed` to avoid stack overflow
469                let actor = self
470                    .clone()
471                    .create_actor(
472                        actor,
473                        fragment_id,
474                        node,
475                        barrier_manager.clone(),
476                        new_output_request_rx
477                    ).boxed().and_then(|actor| actor.run()).map(move |result| {
478                    if let Err(err) = result {
479                        // TODO: check error type and panic if it's unexpected.
480                        // Intentionally use `?` on the report to also include the backtrace.
481                        tracing::error!(actor_id, error = ?err.as_report(), "actor exit with error");
482                        barrier_manager.notify_failure(actor_id, err);
483                    }
484                });
485                let traced = match &self.await_tree_reg {
486                    Some(m) => m
487                        .register(await_tree_key::Actor(actor_id), trace_span)
488                        .instrument(actor)
489                        .left_future(),
490                    None => actor.right_future(),
491                };
492                let instrumented = monitor.instrument(traced);
493                let with_config = crate::CONFIG.scope(self.env.config().clone(), instrumented);
494                // If hummock tracing is not enabled, it directly returns wrapped future.
495                let may_track_hummock = with_config.may_trace_hummock();
496
497                self.runtime.spawn(may_track_hummock)
498            };
499
500            let monitor_handle = if self.streaming_metrics.level >= MetricLevel::Debug
501                || self.env.config().developer.enable_actor_tokio_metrics
502            {
503                tracing::info!("Tokio metrics are enabled.");
504                let streaming_metrics = self.streaming_metrics.clone();
505                let actor_monitor_task = self.runtime.spawn(async move {
506                    let metrics = streaming_metrics.new_actor_metrics(actor_id);
507                    loop {
508                        let task_metrics = monitor.cumulative();
509                        metrics
510                            .actor_execution_time
511                            .set(task_metrics.total_poll_duration.as_secs_f64());
512                        metrics
513                            .actor_fast_poll_duration
514                            .set(task_metrics.total_fast_poll_duration.as_secs_f64());
515                        metrics
516                            .actor_fast_poll_cnt
517                            .set(task_metrics.total_fast_poll_count as i64);
518                        metrics
519                            .actor_slow_poll_duration
520                            .set(task_metrics.total_slow_poll_duration.as_secs_f64());
521                        metrics
522                            .actor_slow_poll_cnt
523                            .set(task_metrics.total_slow_poll_count as i64);
524                        metrics
525                            .actor_poll_duration
526                            .set(task_metrics.total_poll_duration.as_secs_f64());
527                        metrics
528                            .actor_poll_cnt
529                            .set(task_metrics.total_poll_count as i64);
530                        metrics
531                            .actor_idle_duration
532                            .set(task_metrics.total_idle_duration.as_secs_f64());
533                        metrics
534                            .actor_idle_cnt
535                            .set(task_metrics.total_idled_count as i64);
536                        metrics
537                            .actor_scheduled_duration
538                            .set(task_metrics.total_scheduled_duration.as_secs_f64());
539                        metrics
540                            .actor_scheduled_cnt
541                            .set(task_metrics.total_scheduled_count as i64);
542                        tokio::time::sleep(Duration::from_secs(1)).await;
543                    }
544                });
545                Some(actor_monitor_task)
546            } else {
547                None
548            };
549            (handle, monitor_handle)
550        }
551    }
552}
553
554/// Parameters to construct executors.
555/// - [`crate::from_proto::create_executor`]
556/// - [`StreamActorManager::create_nodes`]
557pub struct ExecutorParams {
558    pub env: StreamEnvironment,
559
560    /// Basic information about the executor.
561    pub info: ExecutorInfo,
562
563    /// Executor id, unique across all actors.
564    pub executor_id: u64,
565
566    /// Operator id, unique for each operator in fragment.
567    pub operator_id: u64,
568
569    /// Information of the operator from plan node, like `StreamHashJoin { .. }`.
570    // TODO: use it for `identity`
571    pub op_info: String,
572
573    /// The input executor.
574    pub input: Vec<Executor>,
575
576    /// `FragmentId` of the actor
577    pub fragment_id: FragmentId,
578
579    /// Metrics
580    pub executor_stats: Arc<StreamingMetrics>,
581
582    /// Actor context
583    pub actor_context: ActorContextRef,
584
585    /// Vnodes owned by this executor. Represented in bitmap.
586    pub vnode_bitmap: Option<Bitmap>,
587
588    /// Used for reporting expression evaluation errors.
589    pub eval_error_report: ActorEvalErrorReport,
590
591    /// `watermark_epoch` field in `MemoryManager`
592    pub watermark_epoch: AtomicU64Ref,
593
594    pub local_barrier_manager: LocalBarrierManager,
595}
596
597impl Debug for ExecutorParams {
598    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
599        f.debug_struct("ExecutorParams")
600            .field("info", &self.info)
601            .field("executor_id", &self.executor_id)
602            .field("operator_id", &self.operator_id)
603            .field("op_info", &self.op_info)
604            .field("input", &self.input.len())
605            .field("actor_id", &self.actor_context.id)
606            .finish_non_exhaustive()
607    }
608}