risingwave_stream/task/
actor_manager.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use core::time::Duration;
16use std::fmt::Debug;
17use std::sync::Arc;
18
19use async_recursion::async_recursion;
20use futures::{FutureExt, TryFutureExt};
21use itertools::Itertools;
22use risingwave_common::bail;
23use risingwave_common::bitmap::Bitmap;
24use risingwave_common::catalog::{ColumnId, Field, Schema};
25use risingwave_common::config::{MetricLevel, StreamingConfig, merge_streaming_config_section};
26use risingwave_common::operator::{unique_executor_id, unique_operator_id};
27use risingwave_common::util::runtime::BackgroundShutdownRuntime;
28use risingwave_pb::id::{ExecutorId, GlobalOperatorId};
29use risingwave_pb::stream_plan::stream_node::NodeBody;
30use risingwave_pb::stream_plan::{
31    self, StreamNode, StreamScanNode, StreamScanType, SyncLogStoreNode,
32};
33use risingwave_pb::stream_service::inject_barrier_request::BuildActorInfo;
34use risingwave_storage::monitor::HummockTraceFutureExt;
35use risingwave_storage::table::batch_table::BatchTable;
36use risingwave_storage::{StateStore, dispatch_state_store};
37use thiserror_ext::AsReport;
38use tokio::sync::mpsc::UnboundedReceiver;
39use tokio::task::JoinHandle;
40
41use crate::common::table::state_table::StateTableBuilder;
42use crate::error::StreamResult;
43use crate::executor::monitor::StreamingMetrics;
44use crate::executor::subtask::SubtaskHandle;
45use crate::executor::{
46    Actor, ActorContext, ActorContextRef, DispatchExecutor, Execute, Executor, ExecutorInfo,
47    SnapshotBackfillExecutor, SyncLogStoreDispatchExecutor, TroublemakerExecutor, WrapperExecutor,
48};
49use crate::from_proto::{MergeExecutorBuilder, create_executor};
50use crate::task::{
51    ActorEvalErrorReport, ActorId, AtomicU64Ref, FragmentId, LocalBarrierManager, NewOutputRequest,
52    StreamEnvironment, await_tree_key,
53};
54
55/// Default capacity for `ConfigOverrideCache`.
56/// Since we only support per-job config override right now, 256 jobs should be sufficient on a single node.
57pub const CONFIG_OVERRIDE_CACHE_DEFAULT_CAPACITY: u64 = 256;
58pub type ConfigOverrideCache = moka::sync::Cache<String, Arc<StreamingConfig>>;
59
60/// [Spawning actors](`Self::spawn_actor`), called by [`crate::task::barrier_worker::managed_state::PartialGraphState`].
61///
62/// See [`crate::task`] for architecture overview.
63pub(crate) struct StreamActorManager {
64    pub(super) env: StreamEnvironment,
65    pub(super) streaming_metrics: Arc<StreamingMetrics>,
66
67    /// Watermark epoch number.
68    pub(super) watermark_epoch: AtomicU64Ref,
69
70    /// Manages the await-trees of all actors.
71    pub(super) await_tree_reg: Option<await_tree::Registry>,
72
73    /// Runtime for the streaming actors.
74    pub(super) runtime: BackgroundShutdownRuntime,
75
76    /// Cache for overridden configuration: `config_override` -> `StreamingConfig`.
77    ///
78    /// Since the override is based on `env.global_config`, which won't change after creation,
79    /// we can use the `config_override` as the only key.
80    pub(super) config_override_cache: ConfigOverrideCache,
81}
82
83impl StreamActorManager {
84    fn get_executor_id(actor_context: &ActorContext, node: &StreamNode) -> ExecutorId {
85        // We assume that the operator_id of different instances from the same RelNode will be the
86        // same.
87        unique_executor_id(actor_context.id, node.operator_id)
88    }
89
90    fn get_executor_info(node: &StreamNode, executor_id: ExecutorId) -> ExecutorInfo {
91        let schema: Schema = node.fields.iter().map(Field::from).collect();
92
93        let stream_key = node
94            .get_stream_key()
95            .iter()
96            .map(|idx| *idx as usize)
97            .collect::<Vec<_>>();
98
99        let stream_kind = node.stream_kind();
100
101        let identity = format!("{} {:X}", node.get_node_body().unwrap(), executor_id);
102
103        ExecutorInfo {
104            schema,
105            stream_key,
106            stream_kind,
107            identity,
108            id: executor_id,
109        }
110    }
111
112    async fn create_snapshot_backfill_node(
113        &self,
114        stream_node: &StreamNode,
115        node: &StreamScanNode,
116        actor_context: &ActorContextRef,
117        vnode_bitmap: Option<Bitmap>,
118        local_barrier_manager: &LocalBarrierManager,
119        state_store: impl StateStore,
120    ) -> StreamResult<Executor> {
121        let [upstream_node, _]: &[_; 2] = stream_node.input.as_slice().try_into().unwrap();
122        let chunk_size = actor_context.config.developer.chunk_size;
123
124        let upstream_info = Self::get_executor_info(
125            upstream_node,
126            Self::get_executor_id(actor_context, upstream_node),
127        );
128
129        let NodeBody::Merge(upstream_merge) = upstream_node.get_node_body()? else {
130            bail!("expect Merge as input of SnapshotBackfill");
131        };
132
133        let upstream = MergeExecutorBuilder::new_input(
134            local_barrier_manager.clone(),
135            self.streaming_metrics.clone(),
136            actor_context.clone(),
137            upstream_info,
138            upstream_merge,
139            chunk_size,
140        )
141        .await?;
142
143        let table_desc = node.get_table_desc()?;
144
145        let output_indices = node
146            .output_indices
147            .iter()
148            .map(|&i| i as usize)
149            .collect_vec();
150        let info = Self::get_executor_info(
151            stream_node,
152            Self::get_executor_id(actor_context, stream_node),
153        );
154        let stream_key = info.stream_key.clone();
155
156        let column_ids = node
157            .upstream_column_ids
158            .iter()
159            .map(ColumnId::from)
160            .collect_vec();
161
162        let progress = local_barrier_manager.register_create_mview_progress(actor_context);
163
164        let vnodes = vnode_bitmap.map(Arc::new);
165        let barrier_rx = local_barrier_manager.subscribe_barrier(actor_context.id);
166
167        let upstream_table =
168            BatchTable::new_partial(state_store.clone(), column_ids, vnodes.clone(), table_desc);
169
170        let state_table = node.get_state_table()?;
171        let state_table = StateTableBuilder::new(state_table, state_store.clone(), vnodes)
172            .enable_preload_all_rows_by_config(&actor_context.config)
173            .build()
174            .await;
175
176        let executor = SnapshotBackfillExecutor::new(
177            upstream_table,
178            state_table,
179            upstream,
180            node.pk_scan_range.as_ref(),
181            output_indices,
182            stream_key,
183            actor_context.clone(),
184            progress,
185            chunk_size,
186            node.rate_limit.into(),
187            barrier_rx,
188            self.streaming_metrics.clone(),
189            node.snapshot_backfill_epoch,
190        )?
191        .boxed();
192
193        if crate::consistency::insane() {
194            let mut troubled_info = info.clone();
195            troubled_info.identity = format!("{} (troubled)", info.identity);
196            Ok((
197                info,
198                TroublemakerExecutor::new((troubled_info, executor).into(), chunk_size),
199            )
200                .into())
201        } else {
202            Ok((info, executor).into())
203        }
204    }
205
206    /// Create a chain(tree) of nodes, with given `store`.
207    #[expect(clippy::too_many_arguments)]
208    #[async_recursion]
209    async fn create_nodes_inner(
210        &self,
211        fragment_id: FragmentId,
212        node: &stream_plan::StreamNode,
213        env: StreamEnvironment,
214        store: impl StateStore,
215        actor_context: &ActorContextRef,
216        vnode_bitmap: Option<Bitmap>,
217        has_stateful: bool,
218        subtasks: &mut Vec<SubtaskHandle>,
219        local_barrier_manager: &LocalBarrierManager,
220    ) -> StreamResult<Executor> {
221        // The "stateful" here means that the executor may issue read operations to the state store
222        // massively and continuously. Used to decide whether to apply the optimization of subtasks.
223        fn is_stateful_executor(stream_node: &StreamNode) -> bool {
224            matches!(
225                stream_node.get_node_body().unwrap(),
226                NodeBody::HashAgg(_)
227                    | NodeBody::HashJoin(_)
228                    | NodeBody::DeltaIndexJoin(_)
229                    | NodeBody::Lookup(_)
230                    | NodeBody::StreamScan(_)
231                    | NodeBody::StreamCdcScan(_)
232                    | NodeBody::DynamicFilter(_)
233                    | NodeBody::GroupTopN(_)
234                    | NodeBody::Now(_)
235            )
236        }
237        let is_stateful = is_stateful_executor(node);
238
239        let executor = if let NodeBody::StreamScan(stream_scan) = node.get_node_body().unwrap()
240            && let Ok(StreamScanType::SnapshotBackfill) = stream_scan.get_stream_scan_type()
241        {
242            dispatch_state_store!(env.state_store(), store, {
243                self.create_snapshot_backfill_node(
244                    node,
245                    stream_scan,
246                    actor_context,
247                    vnode_bitmap,
248                    local_barrier_manager,
249                    store,
250                )
251                .await
252            })?
253        } else {
254            // Create the input executor before creating itself
255            let mut input = Vec::with_capacity(node.input.iter().len());
256            for input_stream_node in &node.input {
257                input.push(
258                    self.create_nodes_inner(
259                        fragment_id,
260                        input_stream_node,
261                        env.clone(),
262                        store.clone(),
263                        actor_context,
264                        vnode_bitmap.clone(),
265                        has_stateful || is_stateful,
266                        subtasks,
267                        local_barrier_manager,
268                    )
269                    .await?,
270                );
271            }
272
273            self.generate_executor_from_inputs(
274                fragment_id,
275                node,
276                env,
277                store,
278                actor_context,
279                vnode_bitmap,
280                local_barrier_manager,
281                input,
282            )
283            .await?
284        };
285        Ok(Self::wrap_executor(
286            executor,
287            actor_context,
288            has_stateful || is_stateful,
289            subtasks,
290        ))
291    }
292
293    #[expect(clippy::too_many_arguments)]
294    async fn generate_executor_from_inputs(
295        &self,
296        fragment_id: FragmentId,
297        node: &stream_plan::StreamNode,
298        env: StreamEnvironment,
299        store: impl StateStore,
300        actor_context: &ActorContextRef,
301        vnode_bitmap: Option<Bitmap>,
302        local_barrier_manager: &LocalBarrierManager,
303        input: Vec<Executor>,
304    ) -> StreamResult<Executor> {
305        let op_info = node.get_identity().clone();
306
307        // We assume that the operator_id of different instances from the same RelNode will be the
308        // same.
309        let executor_id = Self::get_executor_id(actor_context, node);
310        let operator_id = unique_operator_id(fragment_id, node.operator_id);
311
312        let info = Self::get_executor_info(node, executor_id);
313
314        let eval_error_report = ActorEvalErrorReport {
315            actor_context: actor_context.clone(),
316            identity: info.identity.clone().into(),
317        };
318
319        // Build the executor with params.
320        let executor_params = ExecutorParams {
321            env: env.clone(),
322
323            info: info.clone(),
324            executor_id,
325            operator_id,
326            op_info,
327            input,
328            fragment_id,
329            executor_stats: self.streaming_metrics.clone(),
330            actor_context: actor_context.clone(),
331            vnode_bitmap,
332            eval_error_report,
333            watermark_epoch: self.watermark_epoch.clone(),
334            local_barrier_manager: local_barrier_manager.clone(),
335            config: actor_context.config.clone(),
336        };
337
338        create_executor(executor_params, node, store).await
339    }
340
341    fn wrap_executor(
342        executor: Executor,
343        actor_context: &ActorContextRef,
344        has_stateful: bool,
345        subtasks: &mut Vec<SubtaskHandle>,
346    ) -> Executor {
347        let info = executor.info().clone();
348        // Wrap the executor for debug purpose.
349        let wrapped = WrapperExecutor::new(executor, actor_context.clone());
350        let executor = (info, wrapped).into();
351
352        // If there're multiple stateful executors in this actor, we will wrap it into a subtask.
353        if has_stateful {
354            // TODO(bugen): subtask does not work with tracing spans.
355            // let (subtask, executor) = subtask::wrap(executor, actor_context.id);
356            // subtasks.push(subtask);
357            // executor.boxed()
358
359            let _ = subtasks;
360        }
361        executor
362    }
363
364    /// Create a chain(tree) of nodes and return the head executor.
365    async fn create_nodes(
366        &self,
367        fragment_id: FragmentId,
368        node: &stream_plan::StreamNode,
369        env: StreamEnvironment,
370        actor_context: &ActorContextRef,
371        vnode_bitmap: Option<Bitmap>,
372        local_barrier_manager: &LocalBarrierManager,
373    ) -> StreamResult<(Executor, Vec<SubtaskHandle>)> {
374        let mut subtasks = vec![];
375
376        let executor = dispatch_state_store!(env.state_store(), store, {
377            self.create_nodes_inner(
378                fragment_id,
379                node,
380                env,
381                store,
382                actor_context,
383                vnode_bitmap,
384                false,
385                &mut subtasks,
386                local_barrier_manager,
387            )
388            .await
389        })?;
390
391        Ok((executor, subtasks))
392    }
393
394    /// Get the overridden configuration for the given `config_override`.
395    fn get_overridden_config(
396        &self,
397        config_override: &str,
398        actor_id: ActorId,
399    ) -> Arc<StreamingConfig> {
400        self.config_override_cache
401            .get_with_by_ref(config_override, || {
402                let global = self.env.global_config();
403                match merge_streaming_config_section(global.as_ref(), config_override) {
404                    Ok(Some(config)) => {
405                        tracing::info!(%actor_id, "applied configuration override");
406                        Arc::new(config)
407                    }
408                    Ok(None) => global.clone(), // nothing to override
409                    Err(e) => {
410                        // We should have validated the config override when user specified it for the job.
411                        // However, we still tolerate invalid config override here in case there's
412                        // any compatibility issue.
413                        tracing::error!(
414                            error = %e.as_report(),
415                            %actor_id,
416                            "failed to apply configuration override, use global config instead",
417                        );
418                        global.clone()
419                    }
420                }
421            })
422    }
423
424    async fn create_actor(
425        self: Arc<Self>,
426        actor: BuildActorInfo,
427        fragment_id: FragmentId,
428        node: Arc<StreamNode>,
429        local_barrier_manager: LocalBarrierManager,
430        new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
431        actor_config: Arc<StreamingConfig>,
432    ) -> StreamResult<Actor<DispatchExecutor>> {
433        let actor_context = ActorContext::create(
434            &actor,
435            fragment_id,
436            self.env.total_mem_usage(),
437            self.streaming_metrics.clone(),
438            self.env.meta_client(),
439            actor_config,
440            self.env.clone(),
441        );
442        let vnode_bitmap = actor.vnode_bitmap.as_ref().map(|b| b.into());
443        let expr_context = actor.expr_context.clone().unwrap();
444
445        let (executor, subtasks) = self
446            .create_nodes(
447                fragment_id,
448                &node,
449                self.env.clone(),
450                &actor_context,
451                vnode_bitmap,
452                &local_barrier_manager,
453            )
454            .await?;
455
456        let dispatcher = DispatchExecutor::new(
457            executor,
458            new_output_request_rx,
459            actor.dispatchers,
460            &actor_context,
461        )
462        .await?;
463
464        let actor = Actor::new(
465            dispatcher,
466            subtasks,
467            self.streaming_metrics.clone(),
468            actor_context.clone(),
469            expr_context,
470            local_barrier_manager,
471        );
472        Ok(actor)
473    }
474
475    #[expect(clippy::too_many_arguments)]
476    async fn create_actor_with_log_store_dispatcher<S: StateStore>(
477        self: Arc<Self>,
478        actor: BuildActorInfo,
479        fragment_id: FragmentId,
480        node: Arc<StreamNode>,
481        local_barrier_manager: LocalBarrierManager,
482        new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
483        actor_config: Arc<StreamingConfig>,
484        sync: Box<SyncLogStoreNode>,
485        state_store: S,
486    ) -> StreamResult<Actor<SyncLogStoreDispatchExecutor<S>>> {
487        let actor_context = ActorContext::create(
488            &actor,
489            fragment_id,
490            self.env.total_mem_usage(),
491            self.streaming_metrics.clone(),
492            self.env.meta_client(),
493            actor_config,
494            self.env.clone(),
495        );
496        let vnode_bitmap = actor.vnode_bitmap.as_ref().map(|b| b.into());
497        let expr_context = actor.expr_context.clone().unwrap();
498
499        let [input] = node.input.as_slice() else {
500            bail!("SyncLogStoreNode should have exactly one input");
501        };
502
503        let (executor, subtasks) = self
504            .create_nodes(
505                fragment_id,
506                input,
507                self.env.clone(),
508                &actor_context,
509                vnode_bitmap.clone(),
510                &local_barrier_manager,
511            )
512            .await?;
513
514        let dispatcher = SyncLogStoreDispatchExecutor::new(
515            executor,
516            new_output_request_rx,
517            actor.dispatchers,
518            &actor_context,
519            sync.as_ref(),
520            vnode_bitmap,
521            state_store,
522        )
523        .await?;
524
525        let actor = Actor::new(
526            dispatcher,
527            subtasks,
528            self.streaming_metrics.clone(),
529            actor_context.clone(),
530            expr_context,
531            local_barrier_manager,
532        );
533        Ok(actor)
534    }
535
536    pub(super) fn spawn_actor(
537        self: &Arc<Self>,
538        actor: BuildActorInfo,
539        fragment_id: FragmentId,
540        node: Arc<StreamNode>,
541        local_barrier_manager: LocalBarrierManager,
542        new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
543    ) -> (JoinHandle<()>, Option<JoinHandle<()>>) {
544        let stream_actor_ref = &actor;
545        let actor_id = stream_actor_ref.actor_id;
546        let actor_config = self.get_overridden_config(&actor.config_override, actor_id);
547
548        let monitor = actor_config
549            .developer
550            .enable_actor_tokio_metrics
551            .then(tokio_metrics::TaskMonitor::new);
552
553        let handle = {
554            let trace_span = format!("Actor {actor_id}: `{}`", stream_actor_ref.mview_definition);
555            let barrier_manager = local_barrier_manager;
556            let node_body = node.get_node_body().unwrap().clone();
557            let use_sync_log_store_dispatcher =
558                !actor_config.developer.disable_sync_log_store_dispatcher;
559            // wrap the future of `create_actor` with `boxed` to avoid stack overflow
560            let actor = match node_body {
561                NodeBody::SyncLogStore(sync) if use_sync_log_store_dispatcher => {
562                    tracing::info!(
563                        "SyncLogStoreDispatchExecutor is created for fragment {}",
564                        fragment_id
565                    );
566                    dispatch_state_store!(self.env.state_store(), store, {
567                        self
568                        .clone()
569                        .create_actor_with_log_store_dispatcher(
570                            actor,
571                            fragment_id,
572                            node,
573                            barrier_manager.clone(),
574                            new_output_request_rx,
575                            actor_config,
576                            sync,
577                            store,
578                        )
579                        .and_then(|actor| actor.run())
580                        .map(move |result| {
581                            if let Err(err) = result {
582                                tracing::error!(%actor_id, error = ?err.as_report(), "actor exit with error");
583                                barrier_manager.notify_failure(actor_id, err);
584                            }
585                        })
586                        .boxed()
587                    })
588                }
589                _ => {
590                    self
591                    .clone()
592                    .create_actor(
593                        actor,
594                        fragment_id,
595                        node,
596                        barrier_manager.clone(),
597                        new_output_request_rx,
598                        actor_config,
599                    )
600                    .and_then(|actor| actor.run())
601                    .map(move |result| {
602                        if let Err(err) = result {
603                        // TODO: check error type and panic if it's unexpected.
604                        // Intentionally use `?` on the report to also include the backtrace.
605                        tracing::error!(%actor_id, error = ?err.as_report(), "actor exit with error");
606                        barrier_manager.notify_failure(actor_id, err);
607                        }
608                    })
609                    .boxed()
610                }
611            };
612            let traced = match &self.await_tree_reg {
613                Some(m) => m
614                    .register(await_tree_key::Actor(actor_id), trace_span)
615                    .instrument(actor)
616                    .left_future(),
617                None => actor.right_future(),
618            };
619            let instrumented = match &monitor {
620                Some(m) => m.instrument(traced).left_future(),
621                None => traced.right_future(),
622            };
623            // If hummock tracing is not enabled, it directly returns wrapped future.
624            let may_track_hummock = instrumented.may_trace_hummock();
625
626            self.runtime.spawn(may_track_hummock)
627        };
628
629        let enable_count_metrics = self.streaming_metrics.level >= MetricLevel::Debug;
630        let monitor_handle = if let Some(monitor) = monitor {
631            let streaming_metrics = self.streaming_metrics.clone();
632            let actor_monitor_task = self.runtime.spawn(async move {
633                let metrics = streaming_metrics.new_actor_metrics(actor_id, fragment_id);
634                let mut interval = tokio::time::interval(Duration::from_secs(15));
635                for task_metrics in monitor.intervals() {
636                    interval.tick().await; // tick at the start since the first interval tick is at 0s.
637                    metrics
638                        .actor_poll_duration
639                        .inc_by(task_metrics.total_poll_duration.as_nanos() as u64);
640                    metrics
641                        .actor_idle_duration
642                        .inc_by(task_metrics.total_idle_duration.as_nanos() as u64);
643                    metrics
644                        .actor_scheduled_duration
645                        .inc_by(task_metrics.total_scheduled_duration.as_nanos() as u64);
646
647                    if enable_count_metrics {
648                        metrics.actor_poll_cnt.inc_by(task_metrics.total_poll_count);
649                        metrics
650                            .actor_idle_cnt
651                            .inc_by(task_metrics.total_idled_count);
652                        metrics
653                            .actor_scheduled_cnt
654                            .inc_by(task_metrics.total_scheduled_count);
655                    }
656                }
657            });
658            Some(actor_monitor_task)
659        } else {
660            None
661        };
662        (handle, monitor_handle)
663    }
664}
665
666/// Parameters to construct executors.
667/// - [`crate::from_proto::create_executor`]
668/// - [`StreamActorManager::create_nodes`]
669pub struct ExecutorParams {
670    pub env: StreamEnvironment,
671
672    /// Basic information about the executor.
673    pub info: ExecutorInfo,
674
675    /// Executor id, unique across all actors.
676    pub executor_id: ExecutorId,
677
678    /// Operator id, unique for each operator in fragment.
679    pub operator_id: GlobalOperatorId,
680
681    /// Information of the operator from plan node, like `StreamHashJoin { .. }`.
682    // TODO: use it for `identity`
683    pub op_info: String,
684
685    /// The input executor.
686    pub input: Vec<Executor>,
687
688    /// `FragmentId` of the actor
689    pub fragment_id: FragmentId,
690
691    /// Metrics
692    pub executor_stats: Arc<StreamingMetrics>,
693
694    /// Actor context
695    pub actor_context: ActorContextRef,
696
697    /// Vnodes owned by this executor. Represented in bitmap.
698    pub vnode_bitmap: Option<Bitmap>,
699
700    /// Used for reporting expression evaluation errors.
701    pub eval_error_report: ActorEvalErrorReport,
702
703    /// `watermark_epoch` field in `MemoryManager`
704    pub watermark_epoch: AtomicU64Ref,
705
706    pub local_barrier_manager: LocalBarrierManager,
707
708    /// The local streaming configuration for this specific actor. Same as `actor_context.config`.
709    ///
710    /// Compared to `stream_env.global_config`, this config can have some entries overridden by the user.
711    pub config: Arc<StreamingConfig>,
712}
713
714impl Debug for ExecutorParams {
715    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
716        f.debug_struct("ExecutorParams")
717            .field("info", &self.info)
718            .field("executor_id", &self.executor_id)
719            .field("operator_id", &self.operator_id)
720            .field("op_info", &self.op_info)
721            .field("input", &self.input.len())
722            .field("actor_id", &self.actor_context.id)
723            .finish_non_exhaustive()
724    }
725}