risingwave_stream/task/
actor_manager.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use core::time::Duration;
16use std::collections::{HashMap, HashSet};
17use std::fmt::Debug;
18use std::sync::Arc;
19
20use async_recursion::async_recursion;
21use futures::{FutureExt, TryFutureExt};
22use itertools::Itertools;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{ColumnId, Field, Schema, TableId};
25use risingwave_common::config::MetricLevel;
26use risingwave_common::must_match;
27use risingwave_common::operator::{unique_executor_id, unique_operator_id};
28use risingwave_common::util::runtime::BackgroundShutdownRuntime;
29use risingwave_expr::expr::build_non_strict_from_prost;
30use risingwave_pb::plan_common::StorageTableDesc;
31use risingwave_pb::stream_plan;
32use risingwave_pb::stream_plan::stream_node::NodeBody;
33use risingwave_pb::stream_plan::{StreamNode, StreamScanNode, StreamScanType};
34use risingwave_pb::stream_service::inject_barrier_request::BuildActorInfo;
35use risingwave_storage::monitor::HummockTraceFutureExt;
36use risingwave_storage::table::batch_table::BatchTable;
37use risingwave_storage::{StateStore, dispatch_state_store};
38use thiserror_ext::AsReport;
39use tokio::sync::mpsc::UnboundedReceiver;
40use tokio::task::JoinHandle;
41
42use crate::common::table::state_table::StateTable;
43use crate::error::StreamResult;
44use crate::executor::monitor::StreamingMetrics;
45use crate::executor::subtask::SubtaskHandle;
46use crate::executor::{
47    Actor, ActorContext, ActorContextRef, DispatchExecutor, Execute, Executor, ExecutorInfo,
48    MergeExecutorInput, SnapshotBackfillExecutor, TroublemakerExecutor, UpstreamSinkUnionExecutor,
49    WrapperExecutor,
50};
51use crate::from_proto::{MergeExecutorBuilder, create_executor};
52use crate::task::{
53    ActorEvalErrorReport, ActorId, AtomicU64Ref, FragmentId, LocalBarrierManager, NewOutputRequest,
54    StreamEnvironment, await_tree_key,
55};
56
57/// [Spawning actors](`Self::spawn_actor`), called by [`crate::task::barrier_worker::managed_state::DatabaseManagedBarrierState`].
58///
59/// See [`crate::task`] for architecture overview.
60pub(crate) struct StreamActorManager {
61    pub(super) env: StreamEnvironment,
62    pub(super) streaming_metrics: Arc<StreamingMetrics>,
63
64    /// Watermark epoch number.
65    pub(super) watermark_epoch: AtomicU64Ref,
66
67    /// Manages the await-trees of all actors.
68    pub(super) await_tree_reg: Option<await_tree::Registry>,
69
70    /// Runtime for the streaming actors.
71    pub(super) runtime: BackgroundShutdownRuntime,
72}
73
74struct SinkIntoTableUnion<'a> {
75    prefix_nodes: Vec<&'a stream_plan::StreamNode>,
76    merge_projects: Vec<(&'a stream_plan::StreamNode, &'a stream_plan::StreamNode)>,
77    union_node: &'a stream_plan::StreamNode,
78}
79
80impl StreamActorManager {
81    fn get_executor_id(actor_context: &ActorContext, node: &StreamNode) -> u64 {
82        // We assume that the operator_id of different instances from the same RelNode will be the
83        // same.
84        unique_executor_id(actor_context.id, node.operator_id)
85    }
86
87    fn get_executor_info(node: &StreamNode, executor_id: u64) -> ExecutorInfo {
88        let schema: Schema = node.fields.iter().map(Field::from).collect();
89
90        let pk_indices = node
91            .get_stream_key()
92            .iter()
93            .map(|idx| *idx as usize)
94            .collect::<Vec<_>>();
95
96        let identity = format!("{} {:X}", node.get_node_body().unwrap(), executor_id);
97        ExecutorInfo {
98            schema,
99            pk_indices,
100            identity,
101            id: executor_id,
102        }
103    }
104
105    async fn create_snapshot_backfill_input(
106        &self,
107        upstream_node: &StreamNode,
108        actor_context: &ActorContextRef,
109        local_barrier_manager: &LocalBarrierManager,
110        chunk_size: usize,
111    ) -> StreamResult<MergeExecutorInput> {
112        let info = Self::get_executor_info(
113            upstream_node,
114            Self::get_executor_id(actor_context, upstream_node),
115        );
116
117        let upstream_merge = must_match!(upstream_node.get_node_body().unwrap(), NodeBody::Merge(upstream_merge) => {
118            upstream_merge
119        });
120
121        MergeExecutorBuilder::new_input(
122            local_barrier_manager.clone(),
123            self.streaming_metrics.clone(),
124            actor_context.clone(),
125            info,
126            upstream_merge,
127            chunk_size,
128        )
129        .await
130    }
131
132    #[expect(clippy::too_many_arguments)]
133    async fn create_snapshot_backfill_node(
134        &self,
135        stream_node: &StreamNode,
136        node: &StreamScanNode,
137        actor_context: &ActorContextRef,
138        vnode_bitmap: Option<Bitmap>,
139        env: StreamEnvironment,
140        local_barrier_manager: &LocalBarrierManager,
141        state_store: impl StateStore,
142    ) -> StreamResult<Executor> {
143        let [upstream_node, _]: &[_; 2] = stream_node.input.as_slice().try_into().unwrap();
144        let chunk_size = env.config().developer.chunk_size;
145        let upstream = self
146            .create_snapshot_backfill_input(
147                upstream_node,
148                actor_context,
149                local_barrier_manager,
150                chunk_size,
151            )
152            .await?;
153
154        let table_desc: &StorageTableDesc = node.get_table_desc()?;
155
156        let output_indices = node
157            .output_indices
158            .iter()
159            .map(|&i| i as usize)
160            .collect_vec();
161
162        let column_ids = node
163            .upstream_column_ids
164            .iter()
165            .map(ColumnId::from)
166            .collect_vec();
167
168        let progress = local_barrier_manager.register_create_mview_progress(actor_context.id);
169
170        let vnodes = vnode_bitmap.map(Arc::new);
171        let barrier_rx = local_barrier_manager.subscribe_barrier(actor_context.id);
172
173        let upstream_table =
174            BatchTable::new_partial(state_store.clone(), column_ids, vnodes.clone(), table_desc);
175
176        let state_table = node.get_state_table()?;
177        let state_table =
178            StateTable::from_table_catalog(state_table, state_store.clone(), vnodes).await;
179
180        let executor = SnapshotBackfillExecutor::new(
181            upstream_table,
182            state_table,
183            upstream,
184            output_indices,
185            actor_context.clone(),
186            progress,
187            chunk_size,
188            node.rate_limit.into(),
189            barrier_rx,
190            self.streaming_metrics.clone(),
191            node.snapshot_backfill_epoch,
192        )
193        .boxed();
194
195        let info = Self::get_executor_info(
196            stream_node,
197            Self::get_executor_id(actor_context, stream_node),
198        );
199
200        if crate::consistency::insane() {
201            let mut troubled_info = info.clone();
202            troubled_info.identity = format!("{} (troubled)", info.identity);
203            Ok((
204                info,
205                TroublemakerExecutor::new((troubled_info, executor).into(), chunk_size),
206            )
207                .into())
208        } else {
209            Ok((info, executor).into())
210        }
211    }
212
213    #[expect(clippy::too_many_arguments)]
214    async fn create_sink_into_table_union(
215        &self,
216        fragment_id: FragmentId,
217        union_node: &stream_plan::StreamNode,
218        env: StreamEnvironment,
219        store: impl StateStore,
220        actor_context: &ActorContextRef,
221        vnode_bitmap: Option<Bitmap>,
222        has_stateful: bool,
223        subtasks: &mut Vec<SubtaskHandle>,
224        local_barrier_manager: &LocalBarrierManager,
225        prefix_nodes: Vec<&stream_plan::StreamNode>,
226        merge_projects: Vec<(&stream_plan::StreamNode, &stream_plan::StreamNode)>,
227    ) -> StreamResult<Executor> {
228        let mut input = Vec::with_capacity(union_node.get_input().len());
229
230        for input_stream_node in prefix_nodes {
231            input.push(
232                self.create_nodes_inner(
233                    fragment_id,
234                    input_stream_node,
235                    env.clone(),
236                    store.clone(),
237                    actor_context,
238                    vnode_bitmap.clone(),
239                    has_stateful,
240                    subtasks,
241                    local_barrier_manager,
242                )
243                .await?,
244            );
245        }
246
247        // Use the first MergeNode to fill in the info of the new node.
248        let first_merge = merge_projects.first().unwrap().0;
249        let executor_id = Self::get_executor_id(actor_context, first_merge);
250        let mut info = Self::get_executor_info(first_merge, executor_id);
251        info.identity = format!("UpstreamSinkUnion {:X}", executor_id);
252        let eval_error_report = ActorEvalErrorReport {
253            actor_context: actor_context.clone(),
254            identity: info.identity.clone().into(),
255        };
256
257        let upstream_infos = merge_projects
258            .into_iter()
259            .map(|(merge_node, project_node)| {
260                let upstream_fragment_id = merge_node
261                    .get_node_body()
262                    .unwrap()
263                    .as_merge()
264                    .unwrap()
265                    .upstream_fragment_id;
266                let merge_schema: Schema =
267                    merge_node.get_fields().iter().map(Field::from).collect();
268                let project_exprs = project_node
269                    .get_node_body()
270                    .unwrap()
271                    .as_project()
272                    .unwrap()
273                    .get_select_list()
274                    .iter()
275                    .map(|e| build_non_strict_from_prost(e, eval_error_report.clone()))
276                    .try_collect()
277                    .unwrap();
278                (upstream_fragment_id, merge_schema, project_exprs)
279            })
280            .collect();
281
282        let upstream_sink_union_executor = UpstreamSinkUnionExecutor::new(
283            actor_context.clone(),
284            local_barrier_manager.clone(),
285            self.streaming_metrics.clone(),
286            env.config().developer.chunk_size,
287            upstream_infos,
288        );
289        let executor = (info, upstream_sink_union_executor).into();
290        input.push(executor);
291
292        self.generate_executor_from_inputs(
293            fragment_id,
294            union_node,
295            env,
296            store,
297            actor_context,
298            vnode_bitmap,
299            has_stateful,
300            subtasks,
301            local_barrier_manager,
302            input,
303        )
304        .await
305    }
306
307    fn as_sink_into_table_union(node: &StreamNode) -> Option<SinkIntoTableUnion<'_>> {
308        let NodeBody::Union(_) = node.get_node_body().unwrap() else {
309            return None;
310        };
311
312        let mut merge_projects = Vec::new();
313        let mut remaining_nodes = Vec::new();
314
315        let mut rev_iter = node.get_input().iter().rev();
316        for union_input in rev_iter.by_ref() {
317            let mut is_sink_into = false;
318            if let NodeBody::Project(project) = union_input.get_node_body().unwrap() {
319                let project_input = union_input.get_input().first().unwrap();
320                // Check project conditions
321                let project_check = project.get_watermark_input_cols().is_empty()
322                    && project.get_watermark_output_cols().is_empty()
323                    && project.get_nondecreasing_exprs().is_empty()
324                    && !project.noop_update_hint;
325                if project_check
326                    && let NodeBody::Merge(merge) = project_input.get_node_body().unwrap()
327                {
328                    let merge_check = merge.upstream_dispatcher_type()
329                        == risingwave_pb::stream_plan::DispatcherType::Hash;
330                    if merge_check {
331                        is_sink_into = true;
332                        tracing::debug!(
333                            "replace sink into table union, merge: {:?}, project: {:?}",
334                            merge,
335                            project
336                        );
337                        merge_projects.push((project_input, union_input));
338                    }
339                }
340            }
341            if !is_sink_into {
342                remaining_nodes.push(union_input);
343                break;
344            }
345        }
346
347        if merge_projects.is_empty() {
348            return None;
349        }
350
351        remaining_nodes.extend(rev_iter);
352
353        merge_projects.reverse();
354        remaining_nodes.reverse();
355
356        // complete StreamNode structure is needed here, to provide some necessary fields.
357        Some(SinkIntoTableUnion {
358            prefix_nodes: remaining_nodes,
359            merge_projects,
360            union_node: node,
361        })
362    }
363
364    /// Create a chain(tree) of nodes, with given `store`.
365    #[expect(clippy::too_many_arguments)]
366    #[async_recursion]
367    async fn create_nodes_inner(
368        &self,
369        fragment_id: FragmentId,
370        node: &stream_plan::StreamNode,
371        env: StreamEnvironment,
372        store: impl StateStore,
373        actor_context: &ActorContextRef,
374        vnode_bitmap: Option<Bitmap>,
375        has_stateful: bool,
376        subtasks: &mut Vec<SubtaskHandle>,
377        local_barrier_manager: &LocalBarrierManager,
378    ) -> StreamResult<Executor> {
379        if let NodeBody::StreamScan(stream_scan) = node.get_node_body().unwrap()
380            && let Ok(StreamScanType::SnapshotBackfill) = stream_scan.get_stream_scan_type()
381        {
382            return dispatch_state_store!(env.state_store(), store, {
383                self.create_snapshot_backfill_node(
384                    node,
385                    stream_scan,
386                    actor_context,
387                    vnode_bitmap,
388                    env,
389                    local_barrier_manager,
390                    store,
391                )
392                .await
393            });
394        }
395
396        if let Some(SinkIntoTableUnion {
397            prefix_nodes: remaining_nodes,
398            merge_projects,
399            union_node,
400        }) = Self::as_sink_into_table_union(node)
401        {
402            return self
403                .create_sink_into_table_union(
404                    fragment_id,
405                    union_node,
406                    env,
407                    store,
408                    actor_context,
409                    vnode_bitmap,
410                    has_stateful,
411                    subtasks,
412                    local_barrier_manager,
413                    remaining_nodes,
414                    merge_projects,
415                )
416                .await;
417        }
418
419        // The "stateful" here means that the executor may issue read operations to the state store
420        // massively and continuously. Used to decide whether to apply the optimization of subtasks.
421        fn is_stateful_executor(stream_node: &StreamNode) -> bool {
422            matches!(
423                stream_node.get_node_body().unwrap(),
424                NodeBody::HashAgg(_)
425                    | NodeBody::HashJoin(_)
426                    | NodeBody::DeltaIndexJoin(_)
427                    | NodeBody::Lookup(_)
428                    | NodeBody::StreamScan(_)
429                    | NodeBody::StreamCdcScan(_)
430                    | NodeBody::DynamicFilter(_)
431                    | NodeBody::GroupTopN(_)
432                    | NodeBody::Now(_)
433            )
434        }
435        let is_stateful = is_stateful_executor(node);
436
437        // Create the input executor before creating itself
438        let mut input = Vec::with_capacity(node.input.iter().len());
439        for input_stream_node in &node.input {
440            input.push(
441                self.create_nodes_inner(
442                    fragment_id,
443                    input_stream_node,
444                    env.clone(),
445                    store.clone(),
446                    actor_context,
447                    vnode_bitmap.clone(),
448                    has_stateful || is_stateful,
449                    subtasks,
450                    local_barrier_manager,
451                )
452                .await?,
453            );
454        }
455
456        self.generate_executor_from_inputs(
457            fragment_id,
458            node,
459            env,
460            store,
461            actor_context,
462            vnode_bitmap,
463            has_stateful || is_stateful,
464            subtasks,
465            local_barrier_manager,
466            input,
467        )
468        .await
469    }
470
471    #[expect(clippy::too_many_arguments)]
472    async fn generate_executor_from_inputs(
473        &self,
474        fragment_id: FragmentId,
475        node: &stream_plan::StreamNode,
476        env: StreamEnvironment,
477        store: impl StateStore,
478        actor_context: &ActorContextRef,
479        vnode_bitmap: Option<Bitmap>,
480        has_stateful: bool,
481        subtasks: &mut Vec<SubtaskHandle>,
482        local_barrier_manager: &LocalBarrierManager,
483        input: Vec<Executor>,
484    ) -> StreamResult<Executor> {
485        let op_info = node.get_identity().clone();
486
487        // We assume that the operator_id of different instances from the same RelNode will be the
488        // same.
489        let executor_id = Self::get_executor_id(actor_context, node);
490        let operator_id = unique_operator_id(fragment_id, node.operator_id);
491
492        let info = Self::get_executor_info(node, executor_id);
493
494        let eval_error_report = ActorEvalErrorReport {
495            actor_context: actor_context.clone(),
496            identity: info.identity.clone().into(),
497        };
498
499        // Build the executor with params.
500        let executor_params = ExecutorParams {
501            env: env.clone(),
502
503            info: info.clone(),
504            executor_id,
505            operator_id,
506            op_info,
507            input,
508            fragment_id,
509            executor_stats: self.streaming_metrics.clone(),
510            actor_context: actor_context.clone(),
511            vnode_bitmap,
512            eval_error_report,
513            watermark_epoch: self.watermark_epoch.clone(),
514            local_barrier_manager: local_barrier_manager.clone(),
515        };
516
517        let executor = create_executor(executor_params, node, store).await?;
518
519        // Wrap the executor for debug purpose.
520        let wrapped = WrapperExecutor::new(
521            executor,
522            actor_context.clone(),
523            env.config().developer.enable_executor_row_count,
524            env.config().developer.enable_explain_analyze_stats,
525        );
526        let executor = (info, wrapped).into();
527
528        // If there're multiple stateful executors in this actor, we will wrap it into a subtask.
529        let executor = if has_stateful {
530            // TODO(bugen): subtask does not work with tracing spans.
531            // let (subtask, executor) = subtask::wrap(executor, actor_context.id);
532            // subtasks.push(subtask);
533            // executor.boxed()
534
535            let _ = subtasks;
536            executor
537        } else {
538            executor
539        };
540
541        Ok(executor)
542    }
543
544    /// Create a chain(tree) of nodes and return the head executor.
545    async fn create_nodes(
546        &self,
547        fragment_id: FragmentId,
548        node: &stream_plan::StreamNode,
549        env: StreamEnvironment,
550        actor_context: &ActorContextRef,
551        vnode_bitmap: Option<Bitmap>,
552        local_barrier_manager: &LocalBarrierManager,
553    ) -> StreamResult<(Executor, Vec<SubtaskHandle>)> {
554        let mut subtasks = vec![];
555
556        let executor = dispatch_state_store!(env.state_store(), store, {
557            self.create_nodes_inner(
558                fragment_id,
559                node,
560                env,
561                store,
562                actor_context,
563                vnode_bitmap,
564                false,
565                &mut subtasks,
566                local_barrier_manager,
567            )
568            .await
569        })?;
570
571        Ok((executor, subtasks))
572    }
573
574    async fn create_actor(
575        self: Arc<Self>,
576        actor: BuildActorInfo,
577        fragment_id: FragmentId,
578        node: Arc<StreamNode>,
579        related_subscriptions: Arc<HashMap<TableId, HashSet<u32>>>,
580        local_barrier_manager: LocalBarrierManager,
581        new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
582    ) -> StreamResult<Actor<DispatchExecutor>> {
583        {
584            let actor_id = actor.actor_id;
585            let streaming_config = self.env.config().clone();
586            let actor_context = ActorContext::create(
587                &actor,
588                fragment_id,
589                self.env.total_mem_usage(),
590                self.streaming_metrics.clone(),
591                related_subscriptions,
592                self.env.meta_client().clone(),
593                streaming_config,
594            );
595            let vnode_bitmap = actor.vnode_bitmap.as_ref().map(|b| b.into());
596            let expr_context = actor.expr_context.clone().unwrap();
597
598            let (executor, subtasks) = self
599                .create_nodes(
600                    fragment_id,
601                    &node,
602                    self.env.clone(),
603                    &actor_context,
604                    vnode_bitmap,
605                    &local_barrier_manager,
606                )
607                .await?;
608
609            let dispatcher = DispatchExecutor::new(
610                executor,
611                new_output_request_rx,
612                actor.dispatchers,
613                actor_id,
614                fragment_id,
615                local_barrier_manager.clone(),
616                self.streaming_metrics.clone(),
617            )
618            .await?;
619            let actor = Actor::new(
620                dispatcher,
621                subtasks,
622                self.streaming_metrics.clone(),
623                actor_context.clone(),
624                expr_context,
625                local_barrier_manager,
626            );
627            Ok(actor)
628        }
629    }
630
631    pub(super) fn spawn_actor(
632        self: &Arc<Self>,
633        actor: BuildActorInfo,
634        fragment_id: FragmentId,
635        node: Arc<StreamNode>,
636        related_subscriptions: Arc<HashMap<TableId, HashSet<u32>>>,
637        local_barrier_manager: LocalBarrierManager,
638        new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
639    ) -> (JoinHandle<()>, Option<JoinHandle<()>>) {
640        {
641            let monitor = tokio_metrics::TaskMonitor::new();
642            let stream_actor_ref = &actor;
643            let actor_id = stream_actor_ref.actor_id;
644            let handle = {
645                let trace_span =
646                    format!("Actor {actor_id}: `{}`", stream_actor_ref.mview_definition);
647                let barrier_manager = local_barrier_manager.clone();
648                // wrap the future of `create_actor` with `boxed` to avoid stack overflow
649                let actor = self
650                    .clone()
651                    .create_actor(
652                        actor,
653                        fragment_id,
654                        node,
655                        related_subscriptions,
656                        barrier_manager.clone(),
657                        new_output_request_rx
658                    ).boxed().and_then(|actor| actor.run()).map(move |result| {
659                    if let Err(err) = result {
660                        // TODO: check error type and panic if it's unexpected.
661                        // Intentionally use `?` on the report to also include the backtrace.
662                        tracing::error!(actor_id, error = ?err.as_report(), "actor exit with error");
663                        barrier_manager.notify_failure(actor_id, err);
664                    }
665                });
666                let traced = match &self.await_tree_reg {
667                    Some(m) => m
668                        .register(await_tree_key::Actor(actor_id), trace_span)
669                        .instrument(actor)
670                        .left_future(),
671                    None => actor.right_future(),
672                };
673                let instrumented = monitor.instrument(traced);
674                let with_config = crate::CONFIG.scope(self.env.config().clone(), instrumented);
675                // If hummock tracing is not enabled, it directly returns wrapped future.
676                let may_track_hummock = with_config.may_trace_hummock();
677
678                self.runtime.spawn(may_track_hummock)
679            };
680
681            let monitor_handle = if self.streaming_metrics.level >= MetricLevel::Debug
682                || self.env.config().developer.enable_actor_tokio_metrics
683            {
684                tracing::info!("Tokio metrics are enabled.");
685                let streaming_metrics = self.streaming_metrics.clone();
686                let actor_monitor_task = self.runtime.spawn(async move {
687                    let metrics = streaming_metrics.new_actor_metrics(actor_id);
688                    loop {
689                        let task_metrics = monitor.cumulative();
690                        metrics
691                            .actor_execution_time
692                            .set(task_metrics.total_poll_duration.as_secs_f64());
693                        metrics
694                            .actor_fast_poll_duration
695                            .set(task_metrics.total_fast_poll_duration.as_secs_f64());
696                        metrics
697                            .actor_fast_poll_cnt
698                            .set(task_metrics.total_fast_poll_count as i64);
699                        metrics
700                            .actor_slow_poll_duration
701                            .set(task_metrics.total_slow_poll_duration.as_secs_f64());
702                        metrics
703                            .actor_slow_poll_cnt
704                            .set(task_metrics.total_slow_poll_count as i64);
705                        metrics
706                            .actor_poll_duration
707                            .set(task_metrics.total_poll_duration.as_secs_f64());
708                        metrics
709                            .actor_poll_cnt
710                            .set(task_metrics.total_poll_count as i64);
711                        metrics
712                            .actor_idle_duration
713                            .set(task_metrics.total_idle_duration.as_secs_f64());
714                        metrics
715                            .actor_idle_cnt
716                            .set(task_metrics.total_idled_count as i64);
717                        metrics
718                            .actor_scheduled_duration
719                            .set(task_metrics.total_scheduled_duration.as_secs_f64());
720                        metrics
721                            .actor_scheduled_cnt
722                            .set(task_metrics.total_scheduled_count as i64);
723                        tokio::time::sleep(Duration::from_secs(1)).await;
724                    }
725                });
726                Some(actor_monitor_task)
727            } else {
728                None
729            };
730            (handle, monitor_handle)
731        }
732    }
733}
734
735/// Parameters to construct executors.
736/// - [`crate::from_proto::create_executor`]
737/// - [`StreamActorManager::create_nodes`]
738pub struct ExecutorParams {
739    pub env: StreamEnvironment,
740
741    /// Basic information about the executor.
742    pub info: ExecutorInfo,
743
744    /// Executor id, unique across all actors.
745    pub executor_id: u64,
746
747    /// Operator id, unique for each operator in fragment.
748    pub operator_id: u64,
749
750    /// Information of the operator from plan node, like `StreamHashJoin { .. }`.
751    // TODO: use it for `identity`
752    pub op_info: String,
753
754    /// The input executor.
755    pub input: Vec<Executor>,
756
757    /// `FragmentId` of the actor
758    pub fragment_id: FragmentId,
759
760    /// Metrics
761    pub executor_stats: Arc<StreamingMetrics>,
762
763    /// Actor context
764    pub actor_context: ActorContextRef,
765
766    /// Vnodes owned by this executor. Represented in bitmap.
767    pub vnode_bitmap: Option<Bitmap>,
768
769    /// Used for reporting expression evaluation errors.
770    pub eval_error_report: ActorEvalErrorReport,
771
772    /// `watermark_epoch` field in `MemoryManager`
773    pub watermark_epoch: AtomicU64Ref,
774
775    pub local_barrier_manager: LocalBarrierManager,
776}
777
778impl Debug for ExecutorParams {
779    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
780        f.debug_struct("ExecutorParams")
781            .field("info", &self.info)
782            .field("executor_id", &self.executor_id)
783            .field("operator_id", &self.operator_id)
784            .field("op_info", &self.op_info)
785            .field("input", &self.input.len())
786            .field("actor_id", &self.actor_context.id)
787            .finish_non_exhaustive()
788    }
789}