risingwave_stream/task/
stream_manager.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use core::time::Duration;
16use std::collections::{HashMap, HashSet};
17use std::fmt::Debug;
18use std::sync::Arc;
19use std::sync::atomic::AtomicU64;
20use std::time::Instant;
21
22use async_recursion::async_recursion;
23use await_tree::{InstrumentAwait, SpanExt};
24use futures::future::join_all;
25use futures::stream::BoxStream;
26use futures::{FutureExt, TryFutureExt};
27use itertools::Itertools;
28use risingwave_common::bitmap::Bitmap;
29use risingwave_common::catalog::{ColumnId, DatabaseId, Field, Schema, TableId};
30use risingwave_common::config::MetricLevel;
31use risingwave_common::must_match;
32use risingwave_common::operator::{unique_executor_id, unique_operator_id};
33use risingwave_pb::plan_common::StorageTableDesc;
34use risingwave_pb::stream_plan;
35use risingwave_pb::stream_plan::stream_node::NodeBody;
36use risingwave_pb::stream_plan::{StreamNode, StreamScanNode, StreamScanType};
37use risingwave_pb::stream_service::inject_barrier_request::BuildActorInfo;
38use risingwave_pb::stream_service::streaming_control_stream_request::InitRequest;
39use risingwave_pb::stream_service::{
40    StreamingControlStreamRequest, StreamingControlStreamResponse,
41};
42use risingwave_storage::monitor::HummockTraceFutureExt;
43use risingwave_storage::table::batch_table::BatchTable;
44use risingwave_storage::{StateStore, dispatch_state_store};
45use thiserror_ext::AsReport;
46use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
47use tokio::task::JoinHandle;
48use tonic::Status;
49
50use crate::common::table::state_table::StateTable;
51use crate::error::StreamResult;
52use crate::executor::exchange::permit::Receiver;
53use crate::executor::monitor::StreamingMetrics;
54use crate::executor::subtask::SubtaskHandle;
55use crate::executor::{
56    Actor, ActorContext, ActorContextRef, DispatchExecutor, Execute, Executor, ExecutorInfo,
57    MergeExecutorInput, SnapshotBackfillExecutor, TroublemakerExecutor, WrapperExecutor,
58};
59use crate::from_proto::{MergeExecutorBuilder, create_executor};
60use crate::task::barrier_manager::{
61    ControlStreamHandle, EventSender, LocalActorOperation, LocalBarrierWorker,
62};
63use crate::task::{
64    ActorId, FragmentId, LocalBarrierManager, NewOutputRequest, StreamActorManager,
65    StreamEnvironment, UpDownActorIds,
66};
67
68#[cfg(test)]
69pub static LOCAL_TEST_ADDR: std::sync::LazyLock<risingwave_common::util::addr::HostAddr> =
70    std::sync::LazyLock::new(|| "127.0.0.1:2333".parse().unwrap());
71
72pub type ActorHandle = JoinHandle<()>;
73
74pub type AtomicU64Ref = Arc<AtomicU64>;
75
76pub mod await_tree_key {
77    /// Await-tree key type for actors.
78    #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
79    pub struct Actor(pub crate::task::ActorId);
80
81    /// Await-tree key type for barriers.
82    #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
83    pub struct BarrierAwait {
84        pub prev_epoch: u64,
85    }
86}
87
88/// `LocalStreamManager` manages all stream executors in this project.
89#[derive(Clone)]
90pub struct LocalStreamManager {
91    await_tree_reg: Option<await_tree::Registry>,
92
93    pub env: StreamEnvironment,
94
95    actor_op_tx: EventSender<LocalActorOperation>,
96}
97
98/// Report expression evaluation errors to the actor context.
99///
100/// The struct can be cheaply cloned.
101#[derive(Clone)]
102pub struct ActorEvalErrorReport {
103    pub actor_context: ActorContextRef,
104    pub identity: Arc<str>,
105}
106
107impl risingwave_expr::expr::EvalErrorReport for ActorEvalErrorReport {
108    fn report(&self, err: risingwave_expr::ExprError) {
109        self.actor_context.on_compute_error(err, &self.identity);
110    }
111}
112
113pub struct ExecutorParams {
114    pub env: StreamEnvironment,
115
116    /// Basic information about the executor.
117    pub info: ExecutorInfo,
118
119    /// Executor id, unique across all actors.
120    pub executor_id: u64,
121
122    /// Operator id, unique for each operator in fragment.
123    pub operator_id: u64,
124
125    /// Information of the operator from plan node, like `StreamHashJoin { .. }`.
126    // TODO: use it for `identity`
127    pub op_info: String,
128
129    /// The input executor.
130    pub input: Vec<Executor>,
131
132    /// `FragmentId` of the actor
133    pub fragment_id: FragmentId,
134
135    /// Metrics
136    pub executor_stats: Arc<StreamingMetrics>,
137
138    /// Actor context
139    pub actor_context: ActorContextRef,
140
141    /// Vnodes owned by this executor. Represented in bitmap.
142    pub vnode_bitmap: Option<Bitmap>,
143
144    /// Used for reporting expression evaluation errors.
145    pub eval_error_report: ActorEvalErrorReport,
146
147    /// `watermark_epoch` field in `MemoryManager`
148    pub watermark_epoch: AtomicU64Ref,
149
150    pub local_barrier_manager: LocalBarrierManager,
151}
152
153impl Debug for ExecutorParams {
154    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
155        f.debug_struct("ExecutorParams")
156            .field("info", &self.info)
157            .field("executor_id", &self.executor_id)
158            .field("operator_id", &self.operator_id)
159            .field("op_info", &self.op_info)
160            .field("input", &self.input.len())
161            .field("actor_id", &self.actor_context.id)
162            .finish_non_exhaustive()
163    }
164}
165
166impl LocalStreamManager {
167    pub fn new(
168        env: StreamEnvironment,
169        streaming_metrics: Arc<StreamingMetrics>,
170        await_tree_config: Option<await_tree::Config>,
171        watermark_epoch: AtomicU64Ref,
172    ) -> Self {
173        if !env.config().unsafe_enable_strict_consistency {
174            // If strict consistency is disabled, should disable storage sanity check.
175            // Since this is a special config, we have to check it here.
176            risingwave_storage::hummock::utils::disable_sanity_check();
177        }
178
179        let await_tree_reg = await_tree_config.clone().map(await_tree::Registry::new);
180
181        let (actor_op_tx, actor_op_rx) = unbounded_channel();
182
183        let _join_handle = LocalBarrierWorker::spawn(
184            env.clone(),
185            streaming_metrics,
186            await_tree_reg.clone(),
187            watermark_epoch,
188            actor_op_rx,
189        );
190        Self {
191            await_tree_reg,
192            env,
193            actor_op_tx: EventSender(actor_op_tx),
194        }
195    }
196
197    /// Get the registry of await-trees.
198    pub fn await_tree_reg(&self) -> Option<&await_tree::Registry> {
199        self.await_tree_reg.as_ref()
200    }
201
202    /// Receive a new control stream request from meta. Notify the barrier worker to reset the CN and use the new control stream
203    /// to receive control message from meta
204    pub fn handle_new_control_stream(
205        &self,
206        sender: UnboundedSender<Result<StreamingControlStreamResponse, Status>>,
207        request_stream: BoxStream<'static, Result<StreamingControlStreamRequest, Status>>,
208        init_request: InitRequest,
209    ) {
210        self.actor_op_tx
211            .send_event(LocalActorOperation::NewControlStream {
212                handle: ControlStreamHandle::new(sender, request_stream),
213                init_request,
214            })
215    }
216
217    pub async fn take_receiver(
218        &self,
219        database_id: DatabaseId,
220        term_id: String,
221        ids: UpDownActorIds,
222    ) -> StreamResult<Receiver> {
223        self.actor_op_tx
224            .send_and_await(|result_sender| LocalActorOperation::TakeReceiver {
225                database_id,
226                term_id,
227                ids,
228                result_sender,
229            })
230            .await?
231    }
232
233    pub async fn inspect_barrier_state(&self) -> StreamResult<String> {
234        info!("start inspecting barrier state");
235        let start = Instant::now();
236        self.actor_op_tx
237            .send_and_await(|result_sender| LocalActorOperation::InspectState { result_sender })
238            .inspect(|result| {
239                info!(
240                    ok = result.is_ok(),
241                    time = ?start.elapsed(),
242                    "finish inspecting barrier state"
243                );
244            })
245            .await
246    }
247
248    pub async fn shutdown(&self) -> StreamResult<()> {
249        self.actor_op_tx
250            .send_and_await(|result_sender| LocalActorOperation::Shutdown { result_sender })
251            .await
252    }
253}
254
255impl LocalBarrierWorker {
256    /// Force stop all actors on this worker, and then drop their resources.
257    pub(super) async fn reset(&mut self, init_request: InitRequest) {
258        join_all(
259            self.state
260                .databases
261                .values_mut()
262                .map(|database| database.abort()),
263        )
264        .await;
265        if let Some(m) = self.actor_manager.await_tree_reg.as_ref() {
266            m.clear();
267        }
268
269        if let Some(hummock) = self.actor_manager.env.state_store().as_hummock() {
270            hummock
271                .clear_shared_buffer()
272                .instrument_await("store_clear_shared_buffer".verbose())
273                .await
274        }
275        self.actor_manager.env.dml_manager_ref().clear();
276        *self = Self::new(
277            self.actor_manager.clone(),
278            init_request.databases,
279            init_request.term_id,
280        );
281        self.actor_manager.env.client_pool().invalidate_all();
282    }
283}
284
285impl StreamActorManager {
286    fn get_executor_id(actor_context: &ActorContext, node: &StreamNode) -> u64 {
287        // We assume that the operator_id of different instances from the same RelNode will be the
288        // same.
289        unique_executor_id(actor_context.id, node.operator_id)
290    }
291
292    fn get_executor_info(node: &StreamNode, executor_id: u64) -> ExecutorInfo {
293        let schema: Schema = node.fields.iter().map(Field::from).collect();
294
295        let pk_indices = node
296            .get_stream_key()
297            .iter()
298            .map(|idx| *idx as usize)
299            .collect::<Vec<_>>();
300
301        let identity = format!("{} {:X}", node.get_node_body().unwrap(), executor_id);
302        ExecutorInfo {
303            schema,
304            pk_indices,
305            identity,
306            id: executor_id,
307        }
308    }
309
310    async fn create_snapshot_backfill_input(
311        &self,
312        upstream_node: &StreamNode,
313        actor_context: &ActorContextRef,
314        local_barrier_manager: &LocalBarrierManager,
315        chunk_size: usize,
316    ) -> StreamResult<MergeExecutorInput> {
317        let info = Self::get_executor_info(
318            upstream_node,
319            Self::get_executor_id(actor_context, upstream_node),
320        );
321
322        let upstream_merge = must_match!(upstream_node.get_node_body().unwrap(), NodeBody::Merge(upstream_merge) => {
323            upstream_merge
324        });
325
326        MergeExecutorBuilder::new_input(
327            local_barrier_manager.clone(),
328            self.streaming_metrics.clone(),
329            actor_context.clone(),
330            info,
331            upstream_merge,
332            chunk_size,
333        )
334        .await
335    }
336
337    #[expect(clippy::too_many_arguments)]
338    async fn create_snapshot_backfill_node(
339        &self,
340        stream_node: &StreamNode,
341        node: &StreamScanNode,
342        actor_context: &ActorContextRef,
343        vnode_bitmap: Option<Bitmap>,
344        env: StreamEnvironment,
345        local_barrier_manager: &LocalBarrierManager,
346        state_store: impl StateStore,
347    ) -> StreamResult<Executor> {
348        let [upstream_node, _]: &[_; 2] = stream_node.input.as_slice().try_into().unwrap();
349        let chunk_size = env.config().developer.chunk_size;
350        let upstream = self
351            .create_snapshot_backfill_input(
352                upstream_node,
353                actor_context,
354                local_barrier_manager,
355                chunk_size,
356            )
357            .await?;
358
359        let table_desc: &StorageTableDesc = node.get_table_desc()?;
360
361        let output_indices = node
362            .output_indices
363            .iter()
364            .map(|&i| i as usize)
365            .collect_vec();
366
367        let column_ids = node
368            .upstream_column_ids
369            .iter()
370            .map(ColumnId::from)
371            .collect_vec();
372
373        let progress = local_barrier_manager.register_create_mview_progress(actor_context.id);
374
375        let vnodes = vnode_bitmap.map(Arc::new);
376        let barrier_rx = local_barrier_manager.subscribe_barrier(actor_context.id);
377
378        let upstream_table =
379            BatchTable::new_partial(state_store.clone(), column_ids, vnodes.clone(), table_desc);
380
381        let state_table = node.get_state_table()?;
382        let state_table =
383            StateTable::from_table_catalog(state_table, state_store.clone(), vnodes).await;
384
385        let executor = SnapshotBackfillExecutor::new(
386            upstream_table,
387            state_table,
388            upstream,
389            output_indices,
390            actor_context.clone(),
391            progress,
392            chunk_size,
393            node.rate_limit.into(),
394            barrier_rx,
395            self.streaming_metrics.clone(),
396            node.snapshot_backfill_epoch,
397        )
398        .boxed();
399
400        let info = Self::get_executor_info(
401            stream_node,
402            Self::get_executor_id(actor_context, stream_node),
403        );
404
405        if crate::consistency::insane() {
406            let mut troubled_info = info.clone();
407            troubled_info.identity = format!("{} (troubled)", info.identity);
408            Ok((
409                info,
410                TroublemakerExecutor::new((troubled_info, executor).into(), chunk_size),
411            )
412                .into())
413        } else {
414            Ok((info, executor).into())
415        }
416    }
417
418    /// Create a chain(tree) of nodes, with given `store`.
419    #[expect(clippy::too_many_arguments)]
420    #[async_recursion]
421    async fn create_nodes_inner(
422        &self,
423        fragment_id: FragmentId,
424        node: &stream_plan::StreamNode,
425        env: StreamEnvironment,
426        store: impl StateStore,
427        actor_context: &ActorContextRef,
428        vnode_bitmap: Option<Bitmap>,
429        has_stateful: bool,
430        subtasks: &mut Vec<SubtaskHandle>,
431        local_barrier_manager: &LocalBarrierManager,
432    ) -> StreamResult<Executor> {
433        if let NodeBody::StreamScan(stream_scan) = node.get_node_body().unwrap()
434            && let Ok(StreamScanType::SnapshotBackfill) = stream_scan.get_stream_scan_type()
435        {
436            return dispatch_state_store!(env.state_store(), store, {
437                self.create_snapshot_backfill_node(
438                    node,
439                    stream_scan,
440                    actor_context,
441                    vnode_bitmap,
442                    env,
443                    local_barrier_manager,
444                    store,
445                )
446                .await
447            });
448        }
449
450        // The "stateful" here means that the executor may issue read operations to the state store
451        // massively and continuously. Used to decide whether to apply the optimization of subtasks.
452        fn is_stateful_executor(stream_node: &StreamNode) -> bool {
453            matches!(
454                stream_node.get_node_body().unwrap(),
455                NodeBody::HashAgg(_)
456                    | NodeBody::HashJoin(_)
457                    | NodeBody::DeltaIndexJoin(_)
458                    | NodeBody::Lookup(_)
459                    | NodeBody::StreamScan(_)
460                    | NodeBody::StreamCdcScan(_)
461                    | NodeBody::DynamicFilter(_)
462                    | NodeBody::GroupTopN(_)
463                    | NodeBody::Now(_)
464            )
465        }
466        let is_stateful = is_stateful_executor(node);
467
468        // Create the input executor before creating itself
469        let mut input = Vec::with_capacity(node.input.iter().len());
470        for input_stream_node in &node.input {
471            input.push(
472                self.create_nodes_inner(
473                    fragment_id,
474                    input_stream_node,
475                    env.clone(),
476                    store.clone(),
477                    actor_context,
478                    vnode_bitmap.clone(),
479                    has_stateful || is_stateful,
480                    subtasks,
481                    local_barrier_manager,
482                )
483                .await?,
484            );
485        }
486
487        let op_info = node.get_identity().clone();
488
489        // We assume that the operator_id of different instances from the same RelNode will be the
490        // same.
491        let executor_id = Self::get_executor_id(actor_context, node);
492        let operator_id = unique_operator_id(fragment_id, node.operator_id);
493
494        let info = Self::get_executor_info(node, executor_id);
495
496        let eval_error_report = ActorEvalErrorReport {
497            actor_context: actor_context.clone(),
498            identity: info.identity.clone().into(),
499        };
500
501        // Build the executor with params.
502        let executor_params = ExecutorParams {
503            env: env.clone(),
504
505            info: info.clone(),
506            executor_id,
507            operator_id,
508            op_info,
509            input,
510            fragment_id,
511            executor_stats: self.streaming_metrics.clone(),
512            actor_context: actor_context.clone(),
513            vnode_bitmap,
514            eval_error_report,
515            watermark_epoch: self.watermark_epoch.clone(),
516            local_barrier_manager: local_barrier_manager.clone(),
517        };
518
519        let executor = create_executor(executor_params, node, store).await?;
520
521        // Wrap the executor for debug purpose.
522        let wrapped = WrapperExecutor::new(
523            executor,
524            actor_context.clone(),
525            env.config().developer.enable_executor_row_count,
526            env.config().developer.enable_explain_analyze_stats,
527        );
528        let executor = (info, wrapped).into();
529
530        // If there're multiple stateful executors in this actor, we will wrap it into a subtask.
531        let executor = if has_stateful && is_stateful {
532            // TODO(bugen): subtask does not work with tracing spans.
533            // let (subtask, executor) = subtask::wrap(executor, actor_context.id);
534            // subtasks.push(subtask);
535            // executor.boxed()
536
537            let _ = subtasks;
538            executor
539        } else {
540            executor
541        };
542
543        Ok(executor)
544    }
545
546    /// Create a chain(tree) of nodes and return the head executor.
547    async fn create_nodes(
548        &self,
549        fragment_id: FragmentId,
550        node: &stream_plan::StreamNode,
551        env: StreamEnvironment,
552        actor_context: &ActorContextRef,
553        vnode_bitmap: Option<Bitmap>,
554        local_barrier_manager: &LocalBarrierManager,
555    ) -> StreamResult<(Executor, Vec<SubtaskHandle>)> {
556        let mut subtasks = vec![];
557
558        let executor = dispatch_state_store!(env.state_store(), store, {
559            self.create_nodes_inner(
560                fragment_id,
561                node,
562                env,
563                store,
564                actor_context,
565                vnode_bitmap,
566                false,
567                &mut subtasks,
568                local_barrier_manager,
569            )
570            .await
571        })?;
572
573        Ok((executor, subtasks))
574    }
575
576    async fn create_actor(
577        self: Arc<Self>,
578        actor: BuildActorInfo,
579        fragment_id: FragmentId,
580        node: Arc<StreamNode>,
581        related_subscriptions: Arc<HashMap<TableId, HashSet<u32>>>,
582        local_barrier_manager: LocalBarrierManager,
583        new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
584    ) -> StreamResult<Actor<DispatchExecutor>> {
585        {
586            let actor_id = actor.actor_id;
587            let streaming_config = self.env.config().clone();
588            let actor_context = ActorContext::create(
589                &actor,
590                fragment_id,
591                self.env.total_mem_usage(),
592                self.streaming_metrics.clone(),
593                related_subscriptions,
594                self.env.meta_client().clone(),
595                streaming_config,
596            );
597            let vnode_bitmap = actor.vnode_bitmap.as_ref().map(|b| b.into());
598            let expr_context = actor.expr_context.clone().unwrap();
599
600            let (executor, subtasks) = self
601                .create_nodes(
602                    fragment_id,
603                    &node,
604                    self.env.clone(),
605                    &actor_context,
606                    vnode_bitmap,
607                    &local_barrier_manager,
608                )
609                .await?;
610
611            let dispatcher = DispatchExecutor::new(
612                executor,
613                new_output_request_rx,
614                actor.dispatchers,
615                actor_id,
616                fragment_id,
617                local_barrier_manager.clone(),
618                self.streaming_metrics.clone(),
619            )
620            .await?;
621            let actor = Actor::new(
622                dispatcher,
623                subtasks,
624                self.streaming_metrics.clone(),
625                actor_context.clone(),
626                expr_context,
627                local_barrier_manager,
628            );
629            Ok(actor)
630        }
631    }
632}
633
634impl StreamActorManager {
635    pub(super) fn spawn_actor(
636        self: &Arc<Self>,
637        actor: BuildActorInfo,
638        fragment_id: FragmentId,
639        node: Arc<StreamNode>,
640        related_subscriptions: Arc<HashMap<TableId, HashSet<u32>>>,
641        local_barrier_manager: LocalBarrierManager,
642        new_output_request_rx: UnboundedReceiver<(ActorId, NewOutputRequest)>,
643    ) -> (JoinHandle<()>, Option<JoinHandle<()>>) {
644        {
645            let monitor = tokio_metrics::TaskMonitor::new();
646            let stream_actor_ref = &actor;
647            let actor_id = stream_actor_ref.actor_id;
648            let handle = {
649                let trace_span =
650                    format!("Actor {actor_id}: `{}`", stream_actor_ref.mview_definition);
651                let barrier_manager = local_barrier_manager.clone();
652                // wrap the future of `create_actor` with `boxed` to avoid stack overflow
653                let actor = self
654                    .clone()
655                    .create_actor(
656                        actor,
657                        fragment_id,
658                        node,
659                        related_subscriptions,
660                        barrier_manager.clone(),
661                        new_output_request_rx
662                    ).boxed().and_then(|actor| actor.run()).map(move |result| {
663                    if let Err(err) = result {
664                        // TODO: check error type and panic if it's unexpected.
665                        // Intentionally use `?` on the report to also include the backtrace.
666                        tracing::error!(actor_id, error = ?err.as_report(), "actor exit with error");
667                        barrier_manager.notify_failure(actor_id, err);
668                    }
669                });
670                let traced = match &self.await_tree_reg {
671                    Some(m) => m
672                        .register(await_tree_key::Actor(actor_id), trace_span)
673                        .instrument(actor)
674                        .left_future(),
675                    None => actor.right_future(),
676                };
677                let instrumented = monitor.instrument(traced);
678                let with_config = crate::CONFIG.scope(self.env.config().clone(), instrumented);
679                // If hummock tracing is not enabled, it directly returns wrapped future.
680                let may_track_hummock = with_config.may_trace_hummock();
681
682                self.runtime.spawn(may_track_hummock)
683            };
684
685            let monitor_handle = if self.streaming_metrics.level >= MetricLevel::Debug
686                || self.env.config().developer.enable_actor_tokio_metrics
687            {
688                tracing::info!("Tokio metrics are enabled.");
689                let streaming_metrics = self.streaming_metrics.clone();
690                let actor_monitor_task = self.runtime.spawn(async move {
691                    let metrics = streaming_metrics.new_actor_metrics(actor_id);
692                    loop {
693                        let task_metrics = monitor.cumulative();
694                        metrics
695                            .actor_execution_time
696                            .set(task_metrics.total_poll_duration.as_secs_f64());
697                        metrics
698                            .actor_fast_poll_duration
699                            .set(task_metrics.total_fast_poll_duration.as_secs_f64());
700                        metrics
701                            .actor_fast_poll_cnt
702                            .set(task_metrics.total_fast_poll_count as i64);
703                        metrics
704                            .actor_slow_poll_duration
705                            .set(task_metrics.total_slow_poll_duration.as_secs_f64());
706                        metrics
707                            .actor_slow_poll_cnt
708                            .set(task_metrics.total_slow_poll_count as i64);
709                        metrics
710                            .actor_poll_duration
711                            .set(task_metrics.total_poll_duration.as_secs_f64());
712                        metrics
713                            .actor_poll_cnt
714                            .set(task_metrics.total_poll_count as i64);
715                        metrics
716                            .actor_idle_duration
717                            .set(task_metrics.total_idle_duration.as_secs_f64());
718                        metrics
719                            .actor_idle_cnt
720                            .set(task_metrics.total_idled_count as i64);
721                        metrics
722                            .actor_scheduled_duration
723                            .set(task_metrics.total_scheduled_duration.as_secs_f64());
724                        metrics
725                            .actor_scheduled_cnt
726                            .set(task_metrics.total_scheduled_count as i64);
727                        tokio::time::sleep(Duration::from_secs(1)).await;
728                    }
729                });
730                Some(actor_monitor_task)
731            } else {
732                None
733            };
734            (handle, monitor_handle)
735        }
736    }
737}
738
739#[cfg(test)]
740pub mod test_utils {
741    use risingwave_pb::common::{ActorInfo, HostAddress};
742
743    use super::*;
744
745    pub fn helper_make_local_actor(actor_id: u32) -> ActorInfo {
746        ActorInfo {
747            actor_id,
748            host: Some(HostAddress {
749                host: LOCAL_TEST_ADDR.host.clone(),
750                port: LOCAL_TEST_ADDR.port as i32,
751            }),
752        }
753    }
754}