risingwave_stream/task/
actor_manager.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use core::time::Duration;
16use std::fmt::Debug;
17use std::sync::Arc;
18
19use async_recursion::async_recursion;
20use futures::{FutureExt, TryFutureExt};
21use itertools::Itertools;
22use risingwave_common::bitmap::Bitmap;
23use risingwave_common::catalog::{ColumnId, Field, Schema};
24use risingwave_common::config::{MetricLevel, StreamingConfig, merge_streaming_config_section};
25use risingwave_common::must_match;
26use risingwave_common::operator::{unique_executor_id, unique_operator_id};
27use risingwave_common::util::runtime::BackgroundShutdownRuntime;
28use risingwave_pb::plan_common::StorageTableDesc;
29use risingwave_pb::stream_plan::stream_node::NodeBody;
30use risingwave_pb::stream_plan::{self, StreamNode, StreamScanNode, StreamScanType};
31use risingwave_pb::stream_service::inject_barrier_request::BuildActorInfo;
32use risingwave_storage::monitor::HummockTraceFutureExt;
33use risingwave_storage::table::batch_table::BatchTable;
34use risingwave_storage::{StateStore, dispatch_state_store};
35use thiserror_ext::AsReport;
36use tokio::sync::mpsc::UnboundedReceiver;
37use tokio::task::JoinHandle;
38
39use crate::common::table::state_table::StateTableBuilder;
40use crate::error::StreamResult;
41use crate::executor::monitor::StreamingMetrics;
42use crate::executor::subtask::SubtaskHandle;
43use crate::executor::{
44    Actor, ActorContext, ActorContextRef, DispatchExecutor, Execute, Executor, ExecutorInfo,
45    MergeExecutorInput, SnapshotBackfillExecutor, TroublemakerExecutor, WrapperExecutor,
46};
47use crate::from_proto::{MergeExecutorBuilder, create_executor};
48use crate::task::{
49    ActorEvalErrorReport, ActorId, AtomicU64Ref, FragmentId, LocalBarrierManager, NewOutputRequest,
50    StreamEnvironment, await_tree_key,
51};
52
53/// Default capacity for `ConfigOverrideCache`.
54/// Since we only support per-job config override right now, 256 jobs should be sufficient on a single node.
55pub const CONFIG_OVERRIDE_CACHE_DEFAULT_CAPACITY: u64 = 256;
56pub type ConfigOverrideCache = moka::sync::Cache<String, Arc<StreamingConfig>>;
57
58/// [Spawning actors](`Self::spawn_actor`), called by [`crate::task::barrier_worker::managed_state::DatabaseManagedBarrierState`].
59///
60/// See [`crate::task`] for architecture overview.
61pub(crate) struct StreamActorManager {
62    pub(super) env: StreamEnvironment,
63    pub(super) streaming_metrics: Arc<StreamingMetrics>,
64
65    /// Watermark epoch number.
66    pub(super) watermark_epoch: AtomicU64Ref,
67
68    /// Manages the await-trees of all actors.
69    pub(super) await_tree_reg: Option<await_tree::Registry>,
70
71    /// Runtime for the streaming actors.
72    pub(super) runtime: BackgroundShutdownRuntime,
73
74    /// Cache for overridden configuration: `config_override` -> `StreamingConfig`.
75    ///
76    /// Since the override is based on `env.global_config`, which won't change after creation,
77    /// we can use the `config_override` as the only key.
78    pub(super) config_override_cache: ConfigOverrideCache,
79}
80
81impl StreamActorManager {
82    fn get_executor_id(actor_context: &ActorContext, node: &StreamNode) -> u64 {
83        // We assume that the operator_id of different instances from the same RelNode will be the
84        // same.
85        unique_executor_id(actor_context.id, node.operator_id)
86    }
87
88    fn get_executor_info(node: &StreamNode, executor_id: u64) -> ExecutorInfo {
89        let schema: Schema = node.fields.iter().map(Field::from).collect();
90
91        let stream_key = node
92            .get_stream_key()
93            .iter()
94            .map(|idx| *idx as usize)
95            .collect::<Vec<_>>();
96
97        let stream_kind = node.stream_kind();
98
99        let identity = format!("{} {:X}", node.get_node_body().unwrap(), executor_id);
100
101        ExecutorInfo {
102            schema,
103            stream_key,
104            stream_kind,
105            identity,
106            id: executor_id,
107        }
108    }
109
110    async fn create_snapshot_backfill_input(
111        &self,
112        upstream_node: &StreamNode,
113        actor_context: &ActorContextRef,
114        local_barrier_manager: &LocalBarrierManager,
115        chunk_size: usize,
116    ) -> StreamResult<MergeExecutorInput> {
117        let info = Self::get_executor_info(
118            upstream_node,
119            Self::get_executor_id(actor_context, upstream_node),
120        );
121
122        let upstream_merge = must_match!(upstream_node.get_node_body().unwrap(), NodeBody::Merge(upstream_merge) => {
123            upstream_merge
124        });
125
126        MergeExecutorBuilder::new_input(
127            local_barrier_manager.clone(),
128            self.streaming_metrics.clone(),
129            actor_context.clone(),
130            info,
131            upstream_merge,
132            chunk_size,
133        )
134        .await
135    }
136
137    async fn create_snapshot_backfill_node(
138        &self,
139        stream_node: &StreamNode,
140        node: &StreamScanNode,
141        actor_context: &ActorContextRef,
142        vnode_bitmap: Option<Bitmap>,
143        local_barrier_manager: &LocalBarrierManager,
144        state_store: impl StateStore,
145    ) -> StreamResult<Executor> {
146        let [upstream_node, _]: &[_; 2] = stream_node.input.as_slice().try_into().unwrap();
147        let chunk_size = actor_context.config.developer.chunk_size;
148        let upstream = self
149            .create_snapshot_backfill_input(
150                upstream_node,
151                actor_context,
152                local_barrier_manager,
153                chunk_size,
154            )
155            .await?;
156
157        let table_desc: &StorageTableDesc = node.get_table_desc()?;
158
159        let output_indices = node
160            .output_indices
161            .iter()
162            .map(|&i| i as usize)
163            .collect_vec();
164
165        let column_ids = node
166            .upstream_column_ids
167            .iter()
168            .map(ColumnId::from)
169            .collect_vec();
170
171        let progress = local_barrier_manager.register_create_mview_progress(actor_context);
172
173        let vnodes = vnode_bitmap.map(Arc::new);
174        let barrier_rx = local_barrier_manager.subscribe_barrier(actor_context.id);
175
176        let upstream_table =
177            BatchTable::new_partial(state_store.clone(), column_ids, vnodes.clone(), table_desc);
178
179        let state_table = node.get_state_table()?;
180        let state_table = StateTableBuilder::new(state_table, state_store.clone(), vnodes)
181            .enable_preload_all_rows_by_config(&actor_context.config)
182            .build()
183            .await;
184
185        let executor = SnapshotBackfillExecutor::new(
186            upstream_table,
187            state_table,
188            upstream,
189            output_indices,
190            actor_context.clone(),
191            progress,
192            chunk_size,
193            node.rate_limit.into(),
194            barrier_rx,
195            self.streaming_metrics.clone(),
196            node.snapshot_backfill_epoch,
197        )
198        .boxed();
199
200        let info = Self::get_executor_info(
201            stream_node,
202            Self::get_executor_id(actor_context, stream_node),
203        );
204
205        if crate::consistency::insane() {
206            let mut troubled_info = info.clone();
207            troubled_info.identity = format!("{} (troubled)", info.identity);
208            Ok((
209                info,
210                TroublemakerExecutor::new((troubled_info, executor).into(), chunk_size),
211            )
212                .into())
213        } else {
214            Ok((info, executor).into())
215        }
216    }
217
218    /// Create a chain(tree) of nodes, with given `store`.
219    #[expect(clippy::too_many_arguments)]
220    #[async_recursion]
221    async fn create_nodes_inner(
222        &self,
223        fragment_id: FragmentId,
224        node: &stream_plan::StreamNode,
225        env: StreamEnvironment,
226        store: impl StateStore,
227        actor_context: &ActorContextRef,
228        vnode_bitmap: Option<Bitmap>,
229        has_stateful: bool,
230        subtasks: &mut Vec<SubtaskHandle>,
231        local_barrier_manager: &LocalBarrierManager,
232    ) -> StreamResult<Executor> {
233        if let NodeBody::StreamScan(stream_scan) = node.get_node_body().unwrap()
234            && let Ok(StreamScanType::SnapshotBackfill) = stream_scan.get_stream_scan_type()
235        {
236            return dispatch_state_store!(env.state_store(), store, {
237                self.create_snapshot_backfill_node(
238                    node,
239                    stream_scan,
240                    actor_context,
241                    vnode_bitmap,
242                    local_barrier_manager,
243                    store,
244                )
245                .await
246            });
247        }
248
249        // The "stateful" here means that the executor may issue read operations to the state store
250        // massively and continuously. Used to decide whether to apply the optimization of subtasks.
251        fn is_stateful_executor(stream_node: &StreamNode) -> bool {
252            matches!(
253                stream_node.get_node_body().unwrap(),
254                NodeBody::HashAgg(_)
255                    | NodeBody::HashJoin(_)
256                    | NodeBody::DeltaIndexJoin(_)
257                    | NodeBody::Lookup(_)
258                    | NodeBody::StreamScan(_)
259                    | NodeBody::StreamCdcScan(_)
260                    | NodeBody::DynamicFilter(_)
261                    | NodeBody::GroupTopN(_)
262                    | NodeBody::Now(_)
263            )
264        }
265        let is_stateful = is_stateful_executor(node);
266
267        // Create the input executor before creating itself
268        let mut input = Vec::with_capacity(node.input.iter().len());
269        for input_stream_node in &node.input {
270            input.push(
271                self.create_nodes_inner(
272                    fragment_id,
273                    input_stream_node,
274                    env.clone(),
275                    store.clone(),
276                    actor_context,
277                    vnode_bitmap.clone(),
278                    has_stateful || is_stateful,
279                    subtasks,
280                    local_barrier_manager,
281                )
282                .await?,
283            );
284        }
285
286        self.generate_executor_from_inputs(
287            fragment_id,
288            node,
289            env,
290            store,
291            actor_context,
292            vnode_bitmap,
293            has_stateful || is_stateful,
294            subtasks,
295            local_barrier_manager,
296            input,
297        )
298        .await
299    }
300
301    #[expect(clippy::too_many_arguments)]
302    async fn generate_executor_from_inputs(
303        &self,
304        fragment_id: FragmentId,
305        node: &stream_plan::StreamNode,
306        env: StreamEnvironment,
307        store: impl StateStore,
308        actor_context: &ActorContextRef,
309        vnode_bitmap: Option<Bitmap>,
310        has_stateful: bool,
311        subtasks: &mut Vec<SubtaskHandle>,
312        local_barrier_manager: &LocalBarrierManager,
313        input: Vec<Executor>,
314    ) -> StreamResult<Executor> {
315        let op_info = node.get_identity().clone();
316
317        // We assume that the operator_id of different instances from the same RelNode will be the
318        // same.
319        let executor_id = Self::get_executor_id(actor_context, node);
320        let operator_id = unique_operator_id(fragment_id, node.operator_id);
321
322        let info = Self::get_executor_info(node, executor_id);
323
324        let eval_error_report = ActorEvalErrorReport {
325            actor_context: actor_context.clone(),
326            identity: info.identity.clone().into(),
327        };
328
329        // Build the executor with params.
330        let executor_params = ExecutorParams {
331            env: env.clone(),
332
333            info: info.clone(),
334            executor_id,
335            operator_id,
336            op_info,
337            input,
338            fragment_id,
339            executor_stats: self.streaming_metrics.clone(),
340            actor_context: actor_context.clone(),
341            vnode_bitmap,
342            eval_error_report,
343            watermark_epoch: self.watermark_epoch.clone(),
344            local_barrier_manager: local_barrier_manager.clone(),
345            config: actor_context.config.clone(),
346        };
347
348        let executor = create_executor(executor_params, node, store).await?;
349
350        // Wrap the executor for debug purpose.
351        let wrapped = WrapperExecutor::new(executor, actor_context.clone());
352        let executor = (info, wrapped).into();
353
354        // If there're multiple stateful executors in this actor, we will wrap it into a subtask.
355        let executor = if has_stateful {
356            // TODO(bugen): subtask does not work with tracing spans.
357            // let (subtask, executor) = subtask::wrap(executor, actor_context.id);
358            // subtasks.push(subtask);
359            // executor.boxed()
360
361            let _ = subtasks;
362            executor
363        } else {
364            executor
365        };
366
367        Ok(executor)
368    }
369
370    /// Create a chain(tree) of nodes and return the head executor.
371    async fn create_nodes(
372        &self,
373        fragment_id: FragmentId,
374        node: &stream_plan::StreamNode,
375        env: StreamEnvironment,
376        actor_context: &ActorContextRef,
377        vnode_bitmap: Option<Bitmap>,
378        local_barrier_manager: &LocalBarrierManager,
379    ) -> StreamResult<(Executor, Vec<SubtaskHandle>)> {
380        let mut subtasks = vec![];
381
382        let executor = dispatch_state_store!(env.state_store(), store, {
383            self.create_nodes_inner(
384                fragment_id,
385                node,
386                env,
387                store,
388                actor_context,
389                vnode_bitmap,
390                false,
391                &mut subtasks,
392                local_barrier_manager,
393            )
394            .await
395        })?;
396
397        Ok((executor, subtasks))
398    }
399
400    /// Get the overridden configuration for the given `config_override`.
401    fn get_overridden_config(
402        &self,
403        config_override: &str,
404        actor_id: ActorId,
405    ) -> Arc<StreamingConfig> {
406        self.config_override_cache
407            .get_with_by_ref(config_override, || {
408                let global = self.env.global_config();
409                match merge_streaming_config_section(global.as_ref(), config_override) {
410                    Ok(Some(config)) => {
411                        tracing::info!(%actor_id, "applied configuration override");
412                        Arc::new(config)
413                    }
414                    Ok(None) => global.clone(), // nothing to override
415                    Err(e) => {
416                        // We should have validated the config override when user specified it for the job.
417                        // However, we still tolerate invalid config override here in case there's
418                        // any compatibility issue.
419                        tracing::error!(
420                            error = %e.as_report(),
421                            %actor_id,
422                            "failed to apply configuration override, use global config instead",
423                        );
424                        global.clone()
425                    }
426                }
427            })
428    }
429
430    async fn create_actor(
431        self: Arc<Self>,
432        actor: BuildActorInfo,
433        fragment_id: FragmentId,
434        node: Arc<StreamNode>,
435        local_barrier_manager: LocalBarrierManager,
436        new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
437    ) -> StreamResult<Actor<DispatchExecutor>> {
438        let actor_id = actor.actor_id;
439        let actor_config = self.get_overridden_config(&actor.config_override, actor_id);
440
441        let actor_context = ActorContext::create(
442            &actor,
443            fragment_id,
444            self.env.total_mem_usage(),
445            self.streaming_metrics.clone(),
446            self.env.meta_client(),
447            actor_config,
448            self.env.clone(),
449        );
450        let vnode_bitmap = actor.vnode_bitmap.as_ref().map(|b| b.into());
451        let expr_context = actor.expr_context.clone().unwrap();
452
453        let (executor, subtasks) = self
454            .create_nodes(
455                fragment_id,
456                &node,
457                self.env.clone(),
458                &actor_context,
459                vnode_bitmap,
460                &local_barrier_manager,
461            )
462            .await?;
463
464        let dispatcher = DispatchExecutor::new(
465            executor,
466            new_output_request_rx,
467            actor.dispatchers,
468            actor_id,
469            fragment_id,
470            local_barrier_manager.clone(),
471            self.streaming_metrics.clone(),
472        )
473        .await?;
474        let actor = Actor::new(
475            dispatcher,
476            subtasks,
477            self.streaming_metrics.clone(),
478            actor_context.clone(),
479            expr_context,
480            local_barrier_manager,
481        );
482        Ok(actor)
483    }
484
485    pub(super) fn spawn_actor(
486        self: &Arc<Self>,
487        actor: BuildActorInfo,
488        fragment_id: FragmentId,
489        node: Arc<StreamNode>,
490        local_barrier_manager: LocalBarrierManager,
491        new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
492    ) -> (JoinHandle<()>, Option<JoinHandle<()>>) {
493        let monitor = tokio_metrics::TaskMonitor::new();
494        let stream_actor_ref = &actor;
495        let actor_id = stream_actor_ref.actor_id;
496        let handle = {
497            let trace_span = format!("Actor {actor_id}: `{}`", stream_actor_ref.mview_definition);
498            let barrier_manager = local_barrier_manager;
499            // wrap the future of `create_actor` with `boxed` to avoid stack overflow
500            let actor = self
501                .clone()
502                .create_actor(
503                    actor,
504                    fragment_id,
505                    node,
506                    barrier_manager.clone(),
507                    new_output_request_rx,
508                )
509                .boxed()
510                .and_then(|actor| actor.run())
511                .map(move |result| {
512                    if let Err(err) = result {
513                        // TODO: check error type and panic if it's unexpected.
514                        // Intentionally use `?` on the report to also include the backtrace.
515                        tracing::error!(%actor_id, error = ?err.as_report(), "actor exit with error");
516                        barrier_manager.notify_failure(actor_id, err);
517                    }
518                });
519            let traced = match &self.await_tree_reg {
520                Some(m) => m
521                    .register(await_tree_key::Actor(actor_id), trace_span)
522                    .instrument(actor)
523                    .left_future(),
524                None => actor.right_future(),
525            };
526            let instrumented = monitor.instrument(traced);
527            // If hummock tracing is not enabled, it directly returns wrapped future.
528            let may_track_hummock = instrumented.may_trace_hummock();
529
530            self.runtime.spawn(may_track_hummock)
531        };
532
533        let enable_count_metrics = self.streaming_metrics.level >= MetricLevel::Debug;
534        let monitor_handle = if self
535            .env
536            .global_config()
537            .developer
538            .enable_actor_tokio_metrics
539        {
540            let streaming_metrics = self.streaming_metrics.clone();
541            let actor_monitor_task = self.runtime.spawn(async move {
542                let metrics = streaming_metrics.new_actor_metrics(actor_id, fragment_id);
543                let mut interval = tokio::time::interval(Duration::from_secs(15));
544                for task_metrics in monitor.intervals() {
545                    interval.tick().await; // tick at the start since the first interval tick is at 0s.
546                    metrics
547                        .actor_poll_duration
548                        .inc_by(task_metrics.total_poll_duration.as_nanos() as u64);
549                    metrics
550                        .actor_idle_duration
551                        .inc_by(task_metrics.total_idle_duration.as_nanos() as u64);
552                    metrics
553                        .actor_scheduled_duration
554                        .inc_by(task_metrics.total_scheduled_duration.as_nanos() as u64);
555
556                    if enable_count_metrics {
557                        metrics.actor_poll_cnt.inc_by(task_metrics.total_poll_count);
558                        metrics
559                            .actor_idle_cnt
560                            .inc_by(task_metrics.total_idled_count);
561                        metrics
562                            .actor_scheduled_cnt
563                            .inc_by(task_metrics.total_scheduled_count);
564                    }
565                }
566            });
567            Some(actor_monitor_task)
568        } else {
569            None
570        };
571        (handle, monitor_handle)
572    }
573}
574
575/// Parameters to construct executors.
576/// - [`crate::from_proto::create_executor`]
577/// - [`StreamActorManager::create_nodes`]
578pub struct ExecutorParams {
579    pub env: StreamEnvironment,
580
581    /// Basic information about the executor.
582    pub info: ExecutorInfo,
583
584    /// Executor id, unique across all actors.
585    pub executor_id: u64,
586
587    /// Operator id, unique for each operator in fragment.
588    pub operator_id: u64,
589
590    /// Information of the operator from plan node, like `StreamHashJoin { .. }`.
591    // TODO: use it for `identity`
592    pub op_info: String,
593
594    /// The input executor.
595    pub input: Vec<Executor>,
596
597    /// `FragmentId` of the actor
598    pub fragment_id: FragmentId,
599
600    /// Metrics
601    pub executor_stats: Arc<StreamingMetrics>,
602
603    /// Actor context
604    pub actor_context: ActorContextRef,
605
606    /// Vnodes owned by this executor. Represented in bitmap.
607    pub vnode_bitmap: Option<Bitmap>,
608
609    /// Used for reporting expression evaluation errors.
610    pub eval_error_report: ActorEvalErrorReport,
611
612    /// `watermark_epoch` field in `MemoryManager`
613    pub watermark_epoch: AtomicU64Ref,
614
615    pub local_barrier_manager: LocalBarrierManager,
616
617    /// The local streaming configuration for this specific actor. Same as `actor_context.config`.
618    ///
619    /// Compared to `stream_env.global_config`, this config can have some entries overridden by the user.
620    pub config: Arc<StreamingConfig>,
621}
622
623impl Debug for ExecutorParams {
624    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
625        f.debug_struct("ExecutorParams")
626            .field("info", &self.info)
627            .field("executor_id", &self.executor_id)
628            .field("operator_id", &self.operator_id)
629            .field("op_info", &self.op_info)
630            .field("input", &self.input.len())
631            .field("actor_id", &self.actor_context.id)
632            .finish_non_exhaustive()
633    }
634}