risingwave_stream/task/
stream_manager.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use core::time::Duration;
16use std::collections::{HashMap, HashSet};
17use std::fmt::Debug;
18use std::sync::Arc;
19use std::sync::atomic::AtomicU64;
20use std::time::Instant;
21
22use async_recursion::async_recursion;
23use await_tree::{InstrumentAwait, SpanExt};
24use futures::future::join_all;
25use futures::stream::BoxStream;
26use futures::{FutureExt, TryFutureExt};
27use itertools::Itertools;
28use risingwave_common::bitmap::Bitmap;
29use risingwave_common::catalog::{ColumnId, DatabaseId, Field, Schema, TableId};
30use risingwave_common::config::MetricLevel;
31use risingwave_common::must_match;
32use risingwave_common::operator::{unique_executor_id, unique_operator_id};
33use risingwave_pb::common::ActorInfo;
34use risingwave_pb::plan_common::StorageTableDesc;
35use risingwave_pb::stream_plan;
36use risingwave_pb::stream_plan::stream_node::NodeBody;
37use risingwave_pb::stream_plan::{StreamNode, StreamScanNode, StreamScanType};
38use risingwave_pb::stream_service::inject_barrier_request::BuildActorInfo;
39use risingwave_pb::stream_service::streaming_control_stream_request::InitRequest;
40use risingwave_pb::stream_service::{
41    StreamingControlStreamRequest, StreamingControlStreamResponse,
42};
43use risingwave_storage::monitor::HummockTraceFutureExt;
44use risingwave_storage::table::batch_table::BatchTable;
45use risingwave_storage::{StateStore, dispatch_state_store};
46use thiserror_ext::AsReport;
47use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
48use tokio::task::JoinHandle;
49use tonic::Status;
50
51use crate::common::table::state_table::StateTable;
52use crate::error::StreamResult;
53use crate::executor::exchange::permit::Receiver;
54use crate::executor::monitor::StreamingMetrics;
55use crate::executor::subtask::SubtaskHandle;
56use crate::executor::{
57    Actor, ActorContext, ActorContextRef, DispatchExecutor, DispatcherImpl, Execute, Executor,
58    ExecutorInfo, MergeExecutorInput, SnapshotBackfillExecutor, TroublemakerExecutor,
59    WrapperExecutor,
60};
61use crate::from_proto::{MergeExecutorBuilder, create_executor};
62use crate::task::barrier_manager::{
63    ControlStreamHandle, EventSender, LocalActorOperation, LocalBarrierWorker,
64};
65use crate::task::{
66    ActorId, FragmentId, LocalBarrierManager, SharedContext, StreamActorManager, StreamEnvironment,
67    UpDownActorIds,
68};
69
70#[cfg(test)]
71pub static LOCAL_TEST_ADDR: std::sync::LazyLock<risingwave_common::util::addr::HostAddr> =
72    std::sync::LazyLock::new(|| "127.0.0.1:2333".parse().unwrap());
73
74pub type ActorHandle = JoinHandle<()>;
75
76pub type AtomicU64Ref = Arc<AtomicU64>;
77
78pub mod await_tree_key {
79    /// Await-tree key type for actors.
80    #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
81    pub struct Actor(pub crate::task::ActorId);
82
83    /// Await-tree key type for barriers.
84    #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
85    pub struct BarrierAwait {
86        pub prev_epoch: u64,
87    }
88}
89
90/// `LocalStreamManager` manages all stream executors in this project.
91#[derive(Clone)]
92pub struct LocalStreamManager {
93    await_tree_reg: Option<await_tree::Registry>,
94
95    pub env: StreamEnvironment,
96
97    actor_op_tx: EventSender<LocalActorOperation>,
98}
99
100/// Report expression evaluation errors to the actor context.
101///
102/// The struct can be cheaply cloned.
103#[derive(Clone)]
104pub struct ActorEvalErrorReport {
105    pub actor_context: ActorContextRef,
106    pub identity: Arc<str>,
107}
108
109impl risingwave_expr::expr::EvalErrorReport for ActorEvalErrorReport {
110    fn report(&self, err: risingwave_expr::ExprError) {
111        self.actor_context.on_compute_error(err, &self.identity);
112    }
113}
114
115pub struct ExecutorParams {
116    pub env: StreamEnvironment,
117
118    /// Basic information about the executor.
119    pub info: ExecutorInfo,
120
121    /// Executor id, unique across all actors.
122    pub executor_id: u64,
123
124    /// Operator id, unique for each operator in fragment.
125    pub operator_id: u64,
126
127    /// Information of the operator from plan node, like `StreamHashJoin { .. }`.
128    // TODO: use it for `identity`
129    pub op_info: String,
130
131    /// The input executor.
132    pub input: Vec<Executor>,
133
134    /// `FragmentId` of the actor
135    pub fragment_id: FragmentId,
136
137    /// Metrics
138    pub executor_stats: Arc<StreamingMetrics>,
139
140    /// Actor context
141    pub actor_context: ActorContextRef,
142
143    /// Vnodes owned by this executor. Represented in bitmap.
144    pub vnode_bitmap: Option<Bitmap>,
145
146    /// Used for reporting expression evaluation errors.
147    pub eval_error_report: ActorEvalErrorReport,
148
149    /// `watermark_epoch` field in `MemoryManager`
150    pub watermark_epoch: AtomicU64Ref,
151
152    pub shared_context: Arc<SharedContext>,
153
154    pub local_barrier_manager: LocalBarrierManager,
155}
156
157impl Debug for ExecutorParams {
158    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
159        f.debug_struct("ExecutorParams")
160            .field("info", &self.info)
161            .field("executor_id", &self.executor_id)
162            .field("operator_id", &self.operator_id)
163            .field("op_info", &self.op_info)
164            .field("input", &self.input.len())
165            .field("actor_id", &self.actor_context.id)
166            .finish_non_exhaustive()
167    }
168}
169
170impl LocalStreamManager {
171    pub fn new(
172        env: StreamEnvironment,
173        streaming_metrics: Arc<StreamingMetrics>,
174        await_tree_config: Option<await_tree::Config>,
175        watermark_epoch: AtomicU64Ref,
176    ) -> Self {
177        if !env.config().unsafe_enable_strict_consistency {
178            // If strict consistency is disabled, should disable storage sanity check.
179            // Since this is a special config, we have to check it here.
180            risingwave_storage::hummock::utils::disable_sanity_check();
181        }
182
183        let await_tree_reg = await_tree_config.clone().map(await_tree::Registry::new);
184
185        let (actor_op_tx, actor_op_rx) = unbounded_channel();
186
187        let _join_handle = LocalBarrierWorker::spawn(
188            env.clone(),
189            streaming_metrics,
190            await_tree_reg.clone(),
191            watermark_epoch,
192            actor_op_rx,
193        );
194        Self {
195            await_tree_reg,
196            env,
197            actor_op_tx: EventSender(actor_op_tx),
198        }
199    }
200
201    /// Get the registry of await-trees.
202    pub fn await_tree_reg(&self) -> Option<&await_tree::Registry> {
203        self.await_tree_reg.as_ref()
204    }
205
206    /// Receive a new control stream request from meta. Notify the barrier worker to reset the CN and use the new control stream
207    /// to receive control message from meta
208    pub fn handle_new_control_stream(
209        &self,
210        sender: UnboundedSender<Result<StreamingControlStreamResponse, Status>>,
211        request_stream: BoxStream<'static, Result<StreamingControlStreamRequest, Status>>,
212        init_request: InitRequest,
213    ) {
214        self.actor_op_tx
215            .send_event(LocalActorOperation::NewControlStream {
216                handle: ControlStreamHandle::new(sender, request_stream),
217                init_request,
218            })
219    }
220
221    pub async fn take_receiver(
222        &self,
223        database_id: DatabaseId,
224        term_id: String,
225        ids: UpDownActorIds,
226    ) -> StreamResult<Receiver> {
227        self.actor_op_tx
228            .send_and_await(|result_sender| LocalActorOperation::TakeReceiver {
229                database_id,
230                term_id,
231                ids,
232                result_sender,
233            })
234            .await?
235    }
236
237    pub async fn inspect_barrier_state(&self) -> StreamResult<String> {
238        info!("start inspecting barrier state");
239        let start = Instant::now();
240        self.actor_op_tx
241            .send_and_await(|result_sender| LocalActorOperation::InspectState { result_sender })
242            .inspect(|result| {
243                info!(
244                    ok = result.is_ok(),
245                    time = ?start.elapsed(),
246                    "finish inspecting barrier state"
247                );
248            })
249            .await
250    }
251
252    pub async fn shutdown(&self) -> StreamResult<()> {
253        self.actor_op_tx
254            .send_and_await(|result_sender| LocalActorOperation::Shutdown { result_sender })
255            .await
256    }
257}
258
259impl LocalBarrierWorker {
260    /// Force stop all actors on this worker, and then drop their resources.
261    pub(super) async fn reset(&mut self, init_request: InitRequest) {
262        join_all(
263            self.state
264                .databases
265                .values_mut()
266                .map(|database| database.abort()),
267        )
268        .await;
269        if let Some(m) = self.actor_manager.await_tree_reg.as_ref() {
270            m.clear();
271        }
272
273        if let Some(hummock) = self.actor_manager.env.state_store().as_hummock() {
274            hummock
275                .clear_shared_buffer()
276                .instrument_await("store_clear_shared_buffer".verbose())
277                .await
278        }
279        self.actor_manager.env.dml_manager_ref().clear();
280        *self = Self::new(
281            self.actor_manager.clone(),
282            init_request.databases,
283            init_request.term_id,
284        );
285    }
286}
287
288impl StreamActorManager {
289    /// Create dispatchers with downstream information registered before
290    fn create_dispatcher(
291        &self,
292        env: StreamEnvironment,
293        input: Executor,
294        dispatchers: &[stream_plan::Dispatcher],
295        actor_id: ActorId,
296        fragment_id: FragmentId,
297        shared_context: &Arc<SharedContext>,
298    ) -> StreamResult<DispatchExecutor> {
299        let dispatcher_impls = dispatchers
300            .iter()
301            .map(|dispatcher| DispatcherImpl::new(shared_context, actor_id, dispatcher))
302            .try_collect()?;
303
304        Ok(DispatchExecutor::new(
305            input,
306            dispatcher_impls,
307            actor_id,
308            fragment_id,
309            shared_context.clone(),
310            self.streaming_metrics.clone(),
311            env.config().developer.chunk_size,
312        ))
313    }
314
315    fn get_executor_id(actor_context: &ActorContext, node: &StreamNode) -> u64 {
316        // We assume that the operator_id of different instances from the same RelNode will be the
317        // same.
318        unique_executor_id(actor_context.id, node.operator_id)
319    }
320
321    fn get_executor_info(node: &StreamNode, executor_id: u64) -> ExecutorInfo {
322        let schema: Schema = node.fields.iter().map(Field::from).collect();
323
324        let pk_indices = node
325            .get_stream_key()
326            .iter()
327            .map(|idx| *idx as usize)
328            .collect::<Vec<_>>();
329
330        let identity = format!("{} {:X}", node.get_node_body().unwrap(), executor_id);
331        ExecutorInfo {
332            schema,
333            pk_indices,
334            identity,
335            id: executor_id,
336        }
337    }
338
339    fn create_snapshot_backfill_input(
340        &self,
341        upstream_node: &StreamNode,
342        actor_context: &ActorContextRef,
343        shared_context: &Arc<SharedContext>,
344        chunk_size: usize,
345    ) -> StreamResult<MergeExecutorInput> {
346        let info = Self::get_executor_info(
347            upstream_node,
348            Self::get_executor_id(actor_context, upstream_node),
349        );
350
351        let upstream_merge = must_match!(upstream_node.get_node_body().unwrap(), NodeBody::Merge(upstream_merge) => {
352            upstream_merge
353        });
354
355        MergeExecutorBuilder::new_input(
356            shared_context.clone(),
357            self.streaming_metrics.clone(),
358            actor_context.clone(),
359            info,
360            upstream_merge,
361            chunk_size,
362        )
363    }
364
365    #[expect(clippy::too_many_arguments)]
366    async fn create_snapshot_backfill_node(
367        &self,
368        stream_node: &StreamNode,
369        node: &StreamScanNode,
370        actor_context: &ActorContextRef,
371        vnode_bitmap: Option<Bitmap>,
372        shared_context: &Arc<SharedContext>,
373        env: StreamEnvironment,
374        local_barrier_manager: &LocalBarrierManager,
375        state_store: impl StateStore,
376    ) -> StreamResult<Executor> {
377        let [upstream_node, _]: &[_; 2] = stream_node.input.as_slice().try_into().unwrap();
378        let chunk_size = env.config().developer.chunk_size;
379        let upstream = self.create_snapshot_backfill_input(
380            upstream_node,
381            actor_context,
382            shared_context,
383            chunk_size,
384        )?;
385
386        let table_desc: &StorageTableDesc = node.get_table_desc()?;
387
388        let output_indices = node
389            .output_indices
390            .iter()
391            .map(|&i| i as usize)
392            .collect_vec();
393
394        let column_ids = node
395            .upstream_column_ids
396            .iter()
397            .map(ColumnId::from)
398            .collect_vec();
399
400        let progress = local_barrier_manager.register_create_mview_progress(actor_context.id);
401
402        let vnodes = vnode_bitmap.map(Arc::new);
403        let barrier_rx = local_barrier_manager.subscribe_barrier(actor_context.id);
404
405        let upstream_table =
406            BatchTable::new_partial(state_store.clone(), column_ids, vnodes.clone(), table_desc);
407
408        let state_table = node.get_state_table()?;
409        let state_table =
410            StateTable::from_table_catalog(state_table, state_store.clone(), vnodes).await;
411
412        let executor = SnapshotBackfillExecutor::new(
413            upstream_table,
414            state_table,
415            upstream,
416            output_indices,
417            actor_context.clone(),
418            progress,
419            chunk_size,
420            node.rate_limit.into(),
421            barrier_rx,
422            self.streaming_metrics.clone(),
423            node.snapshot_backfill_epoch,
424        )
425        .boxed();
426
427        let info = Self::get_executor_info(
428            stream_node,
429            Self::get_executor_id(actor_context, stream_node),
430        );
431
432        if crate::consistency::insane() {
433            let mut troubled_info = info.clone();
434            troubled_info.identity = format!("{} (troubled)", info.identity);
435            Ok((
436                info,
437                TroublemakerExecutor::new((troubled_info, executor).into(), chunk_size),
438            )
439                .into())
440        } else {
441            Ok((info, executor).into())
442        }
443    }
444
445    /// Create a chain(tree) of nodes, with given `store`.
446    #[allow(clippy::too_many_arguments)]
447    #[async_recursion]
448    async fn create_nodes_inner(
449        &self,
450        fragment_id: FragmentId,
451        node: &stream_plan::StreamNode,
452        env: StreamEnvironment,
453        store: impl StateStore,
454        actor_context: &ActorContextRef,
455        vnode_bitmap: Option<Bitmap>,
456        has_stateful: bool,
457        subtasks: &mut Vec<SubtaskHandle>,
458        shared_context: &Arc<SharedContext>,
459        local_barrier_manager: &LocalBarrierManager,
460    ) -> StreamResult<Executor> {
461        if let NodeBody::StreamScan(stream_scan) = node.get_node_body().unwrap()
462            && let Ok(StreamScanType::SnapshotBackfill) = stream_scan.get_stream_scan_type()
463        {
464            return dispatch_state_store!(env.state_store(), store, {
465                self.create_snapshot_backfill_node(
466                    node,
467                    stream_scan,
468                    actor_context,
469                    vnode_bitmap,
470                    shared_context,
471                    env,
472                    local_barrier_manager,
473                    store,
474                )
475                .await
476            });
477        }
478
479        // The "stateful" here means that the executor may issue read operations to the state store
480        // massively and continuously. Used to decide whether to apply the optimization of subtasks.
481        fn is_stateful_executor(stream_node: &StreamNode) -> bool {
482            matches!(
483                stream_node.get_node_body().unwrap(),
484                NodeBody::HashAgg(_)
485                    | NodeBody::HashJoin(_)
486                    | NodeBody::DeltaIndexJoin(_)
487                    | NodeBody::Lookup(_)
488                    | NodeBody::StreamScan(_)
489                    | NodeBody::StreamCdcScan(_)
490                    | NodeBody::DynamicFilter(_)
491                    | NodeBody::GroupTopN(_)
492                    | NodeBody::Now(_)
493            )
494        }
495        let is_stateful = is_stateful_executor(node);
496
497        // Create the input executor before creating itself
498        let mut input = Vec::with_capacity(node.input.iter().len());
499        for input_stream_node in &node.input {
500            input.push(
501                self.create_nodes_inner(
502                    fragment_id,
503                    input_stream_node,
504                    env.clone(),
505                    store.clone(),
506                    actor_context,
507                    vnode_bitmap.clone(),
508                    has_stateful || is_stateful,
509                    subtasks,
510                    shared_context,
511                    local_barrier_manager,
512                )
513                .await?,
514            );
515        }
516
517        let op_info = node.get_identity().clone();
518
519        // We assume that the operator_id of different instances from the same RelNode will be the
520        // same.
521        let executor_id = Self::get_executor_id(actor_context, node);
522        let operator_id = unique_operator_id(fragment_id, node.operator_id);
523
524        let info = Self::get_executor_info(node, executor_id);
525
526        let eval_error_report = ActorEvalErrorReport {
527            actor_context: actor_context.clone(),
528            identity: info.identity.clone().into(),
529        };
530
531        // Build the executor with params.
532        let executor_params = ExecutorParams {
533            env: env.clone(),
534
535            info: info.clone(),
536            executor_id,
537            operator_id,
538            op_info,
539            input,
540            fragment_id,
541            executor_stats: self.streaming_metrics.clone(),
542            actor_context: actor_context.clone(),
543            vnode_bitmap,
544            eval_error_report,
545            watermark_epoch: self.watermark_epoch.clone(),
546            shared_context: shared_context.clone(),
547            local_barrier_manager: local_barrier_manager.clone(),
548        };
549
550        let executor = create_executor(executor_params, node, store).await?;
551
552        // Wrap the executor for debug purpose.
553        let wrapped = WrapperExecutor::new(
554            executor,
555            actor_context.clone(),
556            env.config().developer.enable_executor_row_count,
557            env.config().developer.enable_explain_analyze_stats,
558        );
559        let executor = (info, wrapped).into();
560
561        // If there're multiple stateful executors in this actor, we will wrap it into a subtask.
562        let executor = if has_stateful && is_stateful {
563            // TODO(bugen): subtask does not work with tracing spans.
564            // let (subtask, executor) = subtask::wrap(executor, actor_context.id);
565            // subtasks.push(subtask);
566            // executor.boxed()
567
568            let _ = subtasks;
569            executor
570        } else {
571            executor
572        };
573
574        Ok(executor)
575    }
576
577    /// Create a chain(tree) of nodes and return the head executor.
578    #[expect(clippy::too_many_arguments)]
579    async fn create_nodes(
580        &self,
581        fragment_id: FragmentId,
582        node: &stream_plan::StreamNode,
583        env: StreamEnvironment,
584        actor_context: &ActorContextRef,
585        vnode_bitmap: Option<Bitmap>,
586        shared_context: &Arc<SharedContext>,
587        local_barrier_manager: &LocalBarrierManager,
588    ) -> StreamResult<(Executor, Vec<SubtaskHandle>)> {
589        let mut subtasks = vec![];
590
591        let executor = dispatch_state_store!(env.state_store(), store, {
592            self.create_nodes_inner(
593                fragment_id,
594                node,
595                env,
596                store,
597                actor_context,
598                vnode_bitmap,
599                false,
600                &mut subtasks,
601                shared_context,
602                local_barrier_manager,
603            )
604            .await
605        })?;
606
607        Ok((executor, subtasks))
608    }
609
610    async fn create_actor(
611        self: Arc<Self>,
612        actor: BuildActorInfo,
613        fragment_id: FragmentId,
614        node: Arc<StreamNode>,
615        shared_context: Arc<SharedContext>,
616        related_subscriptions: Arc<HashMap<TableId, HashSet<u32>>>,
617        local_barrier_manager: LocalBarrierManager,
618    ) -> StreamResult<Actor<DispatchExecutor>> {
619        {
620            let actor_id = actor.actor_id;
621            let streaming_config = self.env.config().clone();
622            let actor_context = ActorContext::create(
623                &actor,
624                fragment_id,
625                self.env.total_mem_usage(),
626                self.streaming_metrics.clone(),
627                related_subscriptions,
628                self.env.meta_client().clone(),
629                streaming_config,
630            );
631            let vnode_bitmap = actor.vnode_bitmap.as_ref().map(|b| b.into());
632            let expr_context = actor.expr_context.clone().unwrap();
633
634            let (executor, subtasks) = self
635                .create_nodes(
636                    fragment_id,
637                    &node,
638                    self.env.clone(),
639                    &actor_context,
640                    vnode_bitmap,
641                    &shared_context,
642                    &local_barrier_manager,
643                )
644                .await?;
645
646            let dispatcher = self.create_dispatcher(
647                self.env.clone(),
648                executor,
649                &actor.dispatchers,
650                actor_id,
651                fragment_id,
652                &shared_context,
653            )?;
654            let actor = Actor::new(
655                dispatcher,
656                subtasks,
657                self.streaming_metrics.clone(),
658                actor_context.clone(),
659                expr_context,
660                local_barrier_manager,
661            );
662            Ok(actor)
663        }
664    }
665}
666
667impl StreamActorManager {
668    pub(super) fn spawn_actor(
669        self: &Arc<Self>,
670        actor: BuildActorInfo,
671        fragment_id: FragmentId,
672        node: Arc<StreamNode>,
673        related_subscriptions: Arc<HashMap<TableId, HashSet<u32>>>,
674        current_shared_context: Arc<SharedContext>,
675        local_barrier_manager: LocalBarrierManager,
676    ) -> (JoinHandle<()>, Option<JoinHandle<()>>) {
677        {
678            let monitor = tokio_metrics::TaskMonitor::new();
679            let stream_actor_ref = &actor;
680            let actor_id = stream_actor_ref.actor_id;
681            let handle = {
682                let trace_span =
683                    format!("Actor {actor_id}: `{}`", stream_actor_ref.mview_definition);
684                let barrier_manager = local_barrier_manager.clone();
685                // wrap the future of `create_actor` with `boxed` to avoid stack overflow
686                let actor = self
687                    .clone()
688                    .create_actor(
689                        actor,
690                        fragment_id,
691                        node,
692                        current_shared_context,
693                        related_subscriptions,
694                        barrier_manager.clone()
695                    ).boxed().and_then(|actor| actor.run()).map(move |result| {
696                    if let Err(err) = result {
697                        // TODO: check error type and panic if it's unexpected.
698                        // Intentionally use `?` on the report to also include the backtrace.
699                        tracing::error!(actor_id, error = ?err.as_report(), "actor exit with error");
700                        barrier_manager.notify_failure(actor_id, err);
701                    }
702                });
703                let traced = match &self.await_tree_reg {
704                    Some(m) => m
705                        .register(await_tree_key::Actor(actor_id), trace_span)
706                        .instrument(actor)
707                        .left_future(),
708                    None => actor.right_future(),
709                };
710                let instrumented = monitor.instrument(traced);
711                let with_config = crate::CONFIG.scope(self.env.config().clone(), instrumented);
712                // If hummock tracing is not enabled, it directly returns wrapped future.
713                let may_track_hummock = with_config.may_trace_hummock();
714
715                self.runtime.spawn(may_track_hummock)
716            };
717
718            let monitor_handle = if self.streaming_metrics.level >= MetricLevel::Debug
719                || self.env.config().developer.enable_actor_tokio_metrics
720            {
721                tracing::info!("Tokio metrics are enabled.");
722                let streaming_metrics = self.streaming_metrics.clone();
723                let actor_monitor_task = self.runtime.spawn(async move {
724                    let metrics = streaming_metrics.new_actor_metrics(actor_id);
725                    loop {
726                        let task_metrics = monitor.cumulative();
727                        metrics
728                            .actor_execution_time
729                            .set(task_metrics.total_poll_duration.as_secs_f64());
730                        metrics
731                            .actor_fast_poll_duration
732                            .set(task_metrics.total_fast_poll_duration.as_secs_f64());
733                        metrics
734                            .actor_fast_poll_cnt
735                            .set(task_metrics.total_fast_poll_count as i64);
736                        metrics
737                            .actor_slow_poll_duration
738                            .set(task_metrics.total_slow_poll_duration.as_secs_f64());
739                        metrics
740                            .actor_slow_poll_cnt
741                            .set(task_metrics.total_slow_poll_count as i64);
742                        metrics
743                            .actor_poll_duration
744                            .set(task_metrics.total_poll_duration.as_secs_f64());
745                        metrics
746                            .actor_poll_cnt
747                            .set(task_metrics.total_poll_count as i64);
748                        metrics
749                            .actor_idle_duration
750                            .set(task_metrics.total_idle_duration.as_secs_f64());
751                        metrics
752                            .actor_idle_cnt
753                            .set(task_metrics.total_idled_count as i64);
754                        metrics
755                            .actor_scheduled_duration
756                            .set(task_metrics.total_scheduled_duration.as_secs_f64());
757                        metrics
758                            .actor_scheduled_cnt
759                            .set(task_metrics.total_scheduled_count as i64);
760                        tokio::time::sleep(Duration::from_secs(1)).await;
761                    }
762                });
763                Some(actor_monitor_task)
764            } else {
765                None
766            };
767            (handle, monitor_handle)
768        }
769    }
770}
771
772impl LocalBarrierWorker {
773    /// This function could only be called once during the lifecycle of `LocalStreamManager` for
774    /// now.
775    pub fn update_actor_info(
776        &mut self,
777        database_id: DatabaseId,
778        new_actor_infos: impl Iterator<Item = ActorInfo>,
779    ) {
780        Self::get_or_insert_database_shared_context(
781            &mut self.state.current_shared_context,
782            database_id,
783            &self.actor_manager,
784            &self.term_id,
785        )
786        .add_actors(new_actor_infos);
787    }
788}
789
790#[cfg(test)]
791pub mod test_utils {
792    use risingwave_pb::common::HostAddress;
793
794    use super::*;
795
796    pub fn helper_make_local_actor(actor_id: u32) -> ActorInfo {
797        ActorInfo {
798            actor_id,
799            host: Some(HostAddress {
800                host: LOCAL_TEST_ADDR.host.clone(),
801                port: LOCAL_TEST_ADDR.port as i32,
802            }),
803        }
804    }
805}