risingwave_stream/task/
actor_manager.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use core::time::Duration;
16use std::fmt::Debug;
17use std::sync::Arc;
18
19use async_recursion::async_recursion;
20use futures::{FutureExt, TryFutureExt};
21use itertools::Itertools;
22use risingwave_common::bail;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{ColumnId, Field, Schema};
25use risingwave_common::config::{MetricLevel, StreamingConfig, merge_streaming_config_section};
26use risingwave_common::operator::{unique_executor_id, unique_operator_id};
27use risingwave_common::util::runtime::BackgroundShutdownRuntime;
28use risingwave_pb::id::{ExecutorId, GlobalOperatorId};
29use risingwave_pb::plan_common::StorageTableDesc;
30use risingwave_pb::stream_plan::stream_node::NodeBody;
31use risingwave_pb::stream_plan::{self, StreamNode, StreamScanNode, StreamScanType};
32use risingwave_pb::stream_service::inject_barrier_request::BuildActorInfo;
33use risingwave_storage::monitor::HummockTraceFutureExt;
34use risingwave_storage::table::batch_table::BatchTable;
35use risingwave_storage::{StateStore, dispatch_state_store};
36use thiserror_ext::AsReport;
37use tokio::sync::mpsc::UnboundedReceiver;
38use tokio::task::JoinHandle;
39
40use crate::common::table::state_table::StateTableBuilder;
41use crate::error::StreamResult;
42use crate::executor::monitor::StreamingMetrics;
43use crate::executor::subtask::SubtaskHandle;
44use crate::executor::{
45    Actor, ActorContext, ActorContextRef, DispatchExecutor, Execute, Executor, ExecutorInfo,
46    SnapshotBackfillExecutor, TroublemakerExecutor, WrapperExecutor,
47};
48use crate::from_proto::{MergeExecutorBuilder, create_executor};
49use crate::task::{
50    ActorEvalErrorReport, ActorId, AtomicU64Ref, FragmentId, LocalBarrierManager, NewOutputRequest,
51    StreamEnvironment, await_tree_key,
52};
53
54/// Default capacity for `ConfigOverrideCache`.
55/// Since we only support per-job config override right now, 256 jobs should be sufficient on a single node.
56pub const CONFIG_OVERRIDE_CACHE_DEFAULT_CAPACITY: u64 = 256;
57pub type ConfigOverrideCache = moka::sync::Cache<String, Arc<StreamingConfig>>;
58
59/// [Spawning actors](`Self::spawn_actor`), called by [`crate::task::barrier_worker::managed_state::DatabaseManagedBarrierState`].
60///
61/// See [`crate::task`] for architecture overview.
62pub(crate) struct StreamActorManager {
63    pub(super) env: StreamEnvironment,
64    pub(super) streaming_metrics: Arc<StreamingMetrics>,
65
66    /// Watermark epoch number.
67    pub(super) watermark_epoch: AtomicU64Ref,
68
69    /// Manages the await-trees of all actors.
70    pub(super) await_tree_reg: Option<await_tree::Registry>,
71
72    /// Runtime for the streaming actors.
73    pub(super) runtime: BackgroundShutdownRuntime,
74
75    /// Cache for overridden configuration: `config_override` -> `StreamingConfig`.
76    ///
77    /// Since the override is based on `env.global_config`, which won't change after creation,
78    /// we can use the `config_override` as the only key.
79    pub(super) config_override_cache: ConfigOverrideCache,
80}
81
82impl StreamActorManager {
83    fn get_executor_id(actor_context: &ActorContext, node: &StreamNode) -> ExecutorId {
84        // We assume that the operator_id of different instances from the same RelNode will be the
85        // same.
86        unique_executor_id(actor_context.id, node.operator_id)
87    }
88
89    fn get_executor_info(node: &StreamNode, executor_id: ExecutorId) -> ExecutorInfo {
90        let schema: Schema = node.fields.iter().map(Field::from).collect();
91
92        let stream_key = node
93            .get_stream_key()
94            .iter()
95            .map(|idx| *idx as usize)
96            .collect::<Vec<_>>();
97
98        let stream_kind = node.stream_kind();
99
100        let identity = format!("{} {:X}", node.get_node_body().unwrap(), executor_id);
101
102        ExecutorInfo {
103            schema,
104            stream_key,
105            stream_kind,
106            identity,
107            id: executor_id,
108        }
109    }
110
111    async fn create_snapshot_backfill_node(
112        &self,
113        stream_node: &StreamNode,
114        node: &StreamScanNode,
115        actor_context: &ActorContextRef,
116        vnode_bitmap: Option<Bitmap>,
117        local_barrier_manager: &LocalBarrierManager,
118        state_store: impl StateStore,
119    ) -> StreamResult<Executor> {
120        let [upstream_node, _]: &[_; 2] = stream_node.input.as_slice().try_into().unwrap();
121        let chunk_size = actor_context.config.developer.chunk_size;
122
123        let info = Self::get_executor_info(
124            upstream_node,
125            Self::get_executor_id(actor_context, upstream_node),
126        );
127
128        let NodeBody::Merge(upstream_merge) = upstream_node.get_node_body()? else {
129            bail!("expect Merge as input of SnapshotBackfill");
130        };
131
132        let upstream = MergeExecutorBuilder::new_input(
133            local_barrier_manager.clone(),
134            self.streaming_metrics.clone(),
135            actor_context.clone(),
136            info,
137            upstream_merge,
138            chunk_size,
139        )
140        .await?;
141
142        let table_desc: &StorageTableDesc = node.get_table_desc()?;
143
144        let output_indices = node
145            .output_indices
146            .iter()
147            .map(|&i| i as usize)
148            .collect_vec();
149
150        let column_ids = node
151            .upstream_column_ids
152            .iter()
153            .map(ColumnId::from)
154            .collect_vec();
155
156        let progress = local_barrier_manager.register_create_mview_progress(actor_context);
157
158        let vnodes = vnode_bitmap.map(Arc::new);
159        let barrier_rx = local_barrier_manager.subscribe_barrier(actor_context.id);
160
161        let upstream_table =
162            BatchTable::new_partial(state_store.clone(), column_ids, vnodes.clone(), table_desc);
163
164        let state_table = node.get_state_table()?;
165        let state_table = StateTableBuilder::new(state_table, state_store.clone(), vnodes)
166            .enable_preload_all_rows_by_config(&actor_context.config)
167            .build()
168            .await;
169
170        let executor = SnapshotBackfillExecutor::new(
171            upstream_table,
172            state_table,
173            upstream,
174            output_indices,
175            actor_context.clone(),
176            progress,
177            chunk_size,
178            node.rate_limit.into(),
179            barrier_rx,
180            self.streaming_metrics.clone(),
181            node.snapshot_backfill_epoch,
182        )
183        .boxed();
184
185        let info = Self::get_executor_info(
186            stream_node,
187            Self::get_executor_id(actor_context, stream_node),
188        );
189
190        if crate::consistency::insane() {
191            let mut troubled_info = info.clone();
192            troubled_info.identity = format!("{} (troubled)", info.identity);
193            Ok((
194                info,
195                TroublemakerExecutor::new((troubled_info, executor).into(), chunk_size),
196            )
197                .into())
198        } else {
199            Ok((info, executor).into())
200        }
201    }
202
203    /// Create a chain(tree) of nodes, with given `store`.
204    #[expect(clippy::too_many_arguments)]
205    #[async_recursion]
206    async fn create_nodes_inner(
207        &self,
208        fragment_id: FragmentId,
209        node: &stream_plan::StreamNode,
210        env: StreamEnvironment,
211        store: impl StateStore,
212        actor_context: &ActorContextRef,
213        vnode_bitmap: Option<Bitmap>,
214        has_stateful: bool,
215        subtasks: &mut Vec<SubtaskHandle>,
216        local_barrier_manager: &LocalBarrierManager,
217    ) -> StreamResult<Executor> {
218        // The "stateful" here means that the executor may issue read operations to the state store
219        // massively and continuously. Used to decide whether to apply the optimization of subtasks.
220        fn is_stateful_executor(stream_node: &StreamNode) -> bool {
221            matches!(
222                stream_node.get_node_body().unwrap(),
223                NodeBody::HashAgg(_)
224                    | NodeBody::HashJoin(_)
225                    | NodeBody::DeltaIndexJoin(_)
226                    | NodeBody::Lookup(_)
227                    | NodeBody::StreamScan(_)
228                    | NodeBody::StreamCdcScan(_)
229                    | NodeBody::DynamicFilter(_)
230                    | NodeBody::GroupTopN(_)
231                    | NodeBody::Now(_)
232            )
233        }
234        let is_stateful = is_stateful_executor(node);
235
236        let executor = if let NodeBody::StreamScan(stream_scan) = node.get_node_body().unwrap()
237            && let Ok(StreamScanType::SnapshotBackfill) = stream_scan.get_stream_scan_type()
238        {
239            dispatch_state_store!(env.state_store(), store, {
240                self.create_snapshot_backfill_node(
241                    node,
242                    stream_scan,
243                    actor_context,
244                    vnode_bitmap,
245                    local_barrier_manager,
246                    store,
247                )
248                .await
249            })?
250        } else {
251            // Create the input executor before creating itself
252            let mut input = Vec::with_capacity(node.input.iter().len());
253            for input_stream_node in &node.input {
254                input.push(
255                    self.create_nodes_inner(
256                        fragment_id,
257                        input_stream_node,
258                        env.clone(),
259                        store.clone(),
260                        actor_context,
261                        vnode_bitmap.clone(),
262                        has_stateful || is_stateful,
263                        subtasks,
264                        local_barrier_manager,
265                    )
266                    .await?,
267                );
268            }
269
270            self.generate_executor_from_inputs(
271                fragment_id,
272                node,
273                env,
274                store,
275                actor_context,
276                vnode_bitmap,
277                local_barrier_manager,
278                input,
279            )
280            .await?
281        };
282        Ok(Self::wrap_executor(
283            executor,
284            actor_context,
285            has_stateful || is_stateful,
286            subtasks,
287        ))
288    }
289
290    #[expect(clippy::too_many_arguments)]
291    async fn generate_executor_from_inputs(
292        &self,
293        fragment_id: FragmentId,
294        node: &stream_plan::StreamNode,
295        env: StreamEnvironment,
296        store: impl StateStore,
297        actor_context: &ActorContextRef,
298        vnode_bitmap: Option<Bitmap>,
299        local_barrier_manager: &LocalBarrierManager,
300        input: Vec<Executor>,
301    ) -> StreamResult<Executor> {
302        let op_info = node.get_identity().clone();
303
304        // We assume that the operator_id of different instances from the same RelNode will be the
305        // same.
306        let executor_id = Self::get_executor_id(actor_context, node);
307        let operator_id = unique_operator_id(fragment_id, node.operator_id);
308
309        let info = Self::get_executor_info(node, executor_id);
310
311        let eval_error_report = ActorEvalErrorReport {
312            actor_context: actor_context.clone(),
313            identity: info.identity.clone().into(),
314        };
315
316        // Build the executor with params.
317        let executor_params = ExecutorParams {
318            env: env.clone(),
319
320            info: info.clone(),
321            executor_id,
322            operator_id,
323            op_info,
324            input,
325            fragment_id,
326            executor_stats: self.streaming_metrics.clone(),
327            actor_context: actor_context.clone(),
328            vnode_bitmap,
329            eval_error_report,
330            watermark_epoch: self.watermark_epoch.clone(),
331            local_barrier_manager: local_barrier_manager.clone(),
332            config: actor_context.config.clone(),
333        };
334
335        create_executor(executor_params, node, store).await
336    }
337
338    fn wrap_executor(
339        executor: Executor,
340        actor_context: &ActorContextRef,
341        has_stateful: bool,
342        subtasks: &mut Vec<SubtaskHandle>,
343    ) -> Executor {
344        let info = executor.info().clone();
345        // Wrap the executor for debug purpose.
346        let wrapped = WrapperExecutor::new(executor, actor_context.clone());
347        let executor = (info, wrapped).into();
348
349        // If there're multiple stateful executors in this actor, we will wrap it into a subtask.
350        if has_stateful {
351            // TODO(bugen): subtask does not work with tracing spans.
352            // let (subtask, executor) = subtask::wrap(executor, actor_context.id);
353            // subtasks.push(subtask);
354            // executor.boxed()
355
356            let _ = subtasks;
357        }
358        executor
359    }
360
361    /// Create a chain(tree) of nodes and return the head executor.
362    async fn create_nodes(
363        &self,
364        fragment_id: FragmentId,
365        node: &stream_plan::StreamNode,
366        env: StreamEnvironment,
367        actor_context: &ActorContextRef,
368        vnode_bitmap: Option<Bitmap>,
369        local_barrier_manager: &LocalBarrierManager,
370    ) -> StreamResult<(Executor, Vec<SubtaskHandle>)> {
371        let mut subtasks = vec![];
372
373        let executor = dispatch_state_store!(env.state_store(), store, {
374            self.create_nodes_inner(
375                fragment_id,
376                node,
377                env,
378                store,
379                actor_context,
380                vnode_bitmap,
381                false,
382                &mut subtasks,
383                local_barrier_manager,
384            )
385            .await
386        })?;
387
388        Ok((executor, subtasks))
389    }
390
391    /// Get the overridden configuration for the given `config_override`.
392    fn get_overridden_config(
393        &self,
394        config_override: &str,
395        actor_id: ActorId,
396    ) -> Arc<StreamingConfig> {
397        self.config_override_cache
398            .get_with_by_ref(config_override, || {
399                let global = self.env.global_config();
400                match merge_streaming_config_section(global.as_ref(), config_override) {
401                    Ok(Some(config)) => {
402                        tracing::info!(%actor_id, "applied configuration override");
403                        Arc::new(config)
404                    }
405                    Ok(None) => global.clone(), // nothing to override
406                    Err(e) => {
407                        // We should have validated the config override when user specified it for the job.
408                        // However, we still tolerate invalid config override here in case there's
409                        // any compatibility issue.
410                        tracing::error!(
411                            error = %e.as_report(),
412                            %actor_id,
413                            "failed to apply configuration override, use global config instead",
414                        );
415                        global.clone()
416                    }
417                }
418            })
419    }
420
421    async fn create_actor(
422        self: Arc<Self>,
423        actor: BuildActorInfo,
424        fragment_id: FragmentId,
425        node: Arc<StreamNode>,
426        local_barrier_manager: LocalBarrierManager,
427        new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
428        actor_config: Arc<StreamingConfig>,
429    ) -> StreamResult<Actor<DispatchExecutor>> {
430        let actor_context = ActorContext::create(
431            &actor,
432            fragment_id,
433            self.env.total_mem_usage(),
434            self.streaming_metrics.clone(),
435            self.env.meta_client(),
436            actor_config,
437            self.env.clone(),
438        );
439        let vnode_bitmap = actor.vnode_bitmap.as_ref().map(|b| b.into());
440        let expr_context = actor.expr_context.clone().unwrap();
441
442        let (executor, subtasks) = self
443            .create_nodes(
444                fragment_id,
445                &node,
446                self.env.clone(),
447                &actor_context,
448                vnode_bitmap,
449                &local_barrier_manager,
450            )
451            .await?;
452
453        let dispatcher = DispatchExecutor::new(
454            executor,
455            new_output_request_rx,
456            actor.dispatchers,
457            &actor_context,
458        )
459        .await?;
460
461        let actor = Actor::new(
462            dispatcher,
463            subtasks,
464            self.streaming_metrics.clone(),
465            actor_context.clone(),
466            expr_context,
467            local_barrier_manager,
468        );
469        Ok(actor)
470    }
471
472    pub(super) fn spawn_actor(
473        self: &Arc<Self>,
474        actor: BuildActorInfo,
475        fragment_id: FragmentId,
476        node: Arc<StreamNode>,
477        local_barrier_manager: LocalBarrierManager,
478        new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
479    ) -> (JoinHandle<()>, Option<JoinHandle<()>>) {
480        let stream_actor_ref = &actor;
481        let actor_id = stream_actor_ref.actor_id;
482        let actor_config = self.get_overridden_config(&actor.config_override, actor_id);
483
484        let monitor = actor_config
485            .developer
486            .enable_actor_tokio_metrics
487            .then(tokio_metrics::TaskMonitor::new);
488
489        let handle = {
490            let trace_span = format!("Actor {actor_id}: `{}`", stream_actor_ref.mview_definition);
491            let barrier_manager = local_barrier_manager;
492            // wrap the future of `create_actor` with `boxed` to avoid stack overflow
493            let actor = self
494                .clone()
495                .create_actor(
496                    actor,
497                    fragment_id,
498                    node,
499                    barrier_manager.clone(),
500                    new_output_request_rx,
501                    actor_config,
502                )
503                .boxed()
504                .and_then(|actor| actor.run())
505                .map(move |result| {
506                    if let Err(err) = result {
507                        // TODO: check error type and panic if it's unexpected.
508                        // Intentionally use `?` on the report to also include the backtrace.
509                        tracing::error!(%actor_id, error = ?err.as_report(), "actor exit with error");
510                        barrier_manager.notify_failure(actor_id, err);
511                    }
512                });
513            let traced = match &self.await_tree_reg {
514                Some(m) => m
515                    .register(await_tree_key::Actor(actor_id), trace_span)
516                    .instrument(actor)
517                    .left_future(),
518                None => actor.right_future(),
519            };
520            let instrumented = match &monitor {
521                Some(m) => m.instrument(traced).left_future(),
522                None => traced.right_future(),
523            };
524            // If hummock tracing is not enabled, it directly returns wrapped future.
525            let may_track_hummock = instrumented.may_trace_hummock();
526
527            self.runtime.spawn(may_track_hummock)
528        };
529
530        let enable_count_metrics = self.streaming_metrics.level >= MetricLevel::Debug;
531        let monitor_handle = if let Some(monitor) = monitor {
532            let streaming_metrics = self.streaming_metrics.clone();
533            let actor_monitor_task = self.runtime.spawn(async move {
534                let metrics = streaming_metrics.new_actor_metrics(actor_id, fragment_id);
535                let mut interval = tokio::time::interval(Duration::from_secs(15));
536                for task_metrics in monitor.intervals() {
537                    interval.tick().await; // tick at the start since the first interval tick is at 0s.
538                    metrics
539                        .actor_poll_duration
540                        .inc_by(task_metrics.total_poll_duration.as_nanos() as u64);
541                    metrics
542                        .actor_idle_duration
543                        .inc_by(task_metrics.total_idle_duration.as_nanos() as u64);
544                    metrics
545                        .actor_scheduled_duration
546                        .inc_by(task_metrics.total_scheduled_duration.as_nanos() as u64);
547
548                    if enable_count_metrics {
549                        metrics.actor_poll_cnt.inc_by(task_metrics.total_poll_count);
550                        metrics
551                            .actor_idle_cnt
552                            .inc_by(task_metrics.total_idled_count);
553                        metrics
554                            .actor_scheduled_cnt
555                            .inc_by(task_metrics.total_scheduled_count);
556                    }
557                }
558            });
559            Some(actor_monitor_task)
560        } else {
561            None
562        };
563        (handle, monitor_handle)
564    }
565}
566
567/// Parameters to construct executors.
568/// - [`crate::from_proto::create_executor`]
569/// - [`StreamActorManager::create_nodes`]
570pub struct ExecutorParams {
571    pub env: StreamEnvironment,
572
573    /// Basic information about the executor.
574    pub info: ExecutorInfo,
575
576    /// Executor id, unique across all actors.
577    pub executor_id: ExecutorId,
578
579    /// Operator id, unique for each operator in fragment.
580    pub operator_id: GlobalOperatorId,
581
582    /// Information of the operator from plan node, like `StreamHashJoin { .. }`.
583    // TODO: use it for `identity`
584    pub op_info: String,
585
586    /// The input executor.
587    pub input: Vec<Executor>,
588
589    /// `FragmentId` of the actor
590    pub fragment_id: FragmentId,
591
592    /// Metrics
593    pub executor_stats: Arc<StreamingMetrics>,
594
595    /// Actor context
596    pub actor_context: ActorContextRef,
597
598    /// Vnodes owned by this executor. Represented in bitmap.
599    pub vnode_bitmap: Option<Bitmap>,
600
601    /// Used for reporting expression evaluation errors.
602    pub eval_error_report: ActorEvalErrorReport,
603
604    /// `watermark_epoch` field in `MemoryManager`
605    pub watermark_epoch: AtomicU64Ref,
606
607    pub local_barrier_manager: LocalBarrierManager,
608
609    /// The local streaming configuration for this specific actor. Same as `actor_context.config`.
610    ///
611    /// Compared to `stream_env.global_config`, this config can have some entries overridden by the user.
612    pub config: Arc<StreamingConfig>,
613}
614
615impl Debug for ExecutorParams {
616    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
617        f.debug_struct("ExecutorParams")
618            .field("info", &self.info)
619            .field("executor_id", &self.executor_id)
620            .field("operator_id", &self.operator_id)
621            .field("op_info", &self.op_info)
622            .field("input", &self.input.len())
623            .field("actor_id", &self.actor_context.id)
624            .finish_non_exhaustive()
625    }
626}