risingwave_frontend/handler/
explain_analyze_stream_job.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use pgwire::pg_response::StatementType;
18use risingwave_common::types::Fields;
19use risingwave_sqlparser::ast::AnalyzeTarget;
20use tokio::time::Duration;
21
22use crate::error::Result;
23use crate::handler::explain_analyze_stream_job::graph::{
24    extract_executor_infos, extract_stream_node_infos, render_graph_with_metrics,
25};
26use crate::handler::{HandlerArgs, RwPgResponse, RwPgResponseBuilder, RwPgResponseBuilderExt};
27
28#[derive(Fields)]
29struct ExplainAnalyzeStreamJobOutput {
30    identity: String,
31    actor_ids: String,
32    output_rows_per_second: Option<String>,
33    downstream_backpressure_ratio: Option<String>,
34}
35
36pub async fn handle_explain_analyze_stream_job(
37    handler_args: HandlerArgs,
38    target: AnalyzeTarget,
39    duration_secs: Option<u64>,
40) -> Result<RwPgResponse> {
41    let profiling_duration = Duration::from_secs(duration_secs.unwrap_or(10));
42    let job_id = bind::bind_relation(&target, &handler_args)?;
43
44    let meta_client = handler_args.session.env().meta_client();
45    let fragments = net::get_fragments(meta_client, job_id).await?;
46    let dispatcher_fragment_ids = fragments.iter().map(|f| f.id).collect::<Vec<_>>();
47    let fragment_parallelisms = fragments
48        .iter()
49        .map(|f| (f.id, f.actors.len()))
50        .collect::<HashMap<_, _>>();
51    let (root_node, adjacency_list) = extract_stream_node_infos(fragments);
52    let (executor_ids, operator_to_executor) = extract_executor_infos(&adjacency_list);
53
54    let worker_nodes = net::list_stream_worker_nodes(handler_args.session.env()).await?;
55
56    let executor_stats = net::get_executor_stats(
57        &handler_args,
58        &worker_nodes,
59        &executor_ids,
60        &dispatcher_fragment_ids,
61        profiling_duration,
62    )
63    .await?;
64    tracing::debug!(?executor_stats, "collected executor stats");
65    let aggregated_stats = metrics::OperatorStats::aggregate(
66        operator_to_executor,
67        &executor_stats,
68        &fragment_parallelisms,
69    );
70    tracing::debug!(?aggregated_stats, "collected aggregated stats");
71
72    // Render graph with metrics
73    let rows = render_graph_with_metrics(
74        &adjacency_list,
75        root_node,
76        &aggregated_stats,
77        &profiling_duration,
78    );
79    let builder = RwPgResponseBuilder::empty(StatementType::EXPLAIN);
80    let builder = builder.rows(rows);
81    Ok(builder.into())
82}
83
84/// Binding pass, since we don't go through the binder.
85/// TODO(noel): Should this be in binder? But it may make compilation slower and doesn't require any binder logic...
86mod bind {
87    use risingwave_sqlparser::ast::AnalyzeTarget;
88
89    use crate::Binder;
90    use crate::catalog::root_catalog::SchemaPath;
91    use crate::error::Result;
92    use crate::handler::HandlerArgs;
93
94    /// Bind the analyze target relation to its actual id.
95    pub(super) fn bind_relation(
96        target_relation: &AnalyzeTarget,
97        handler_args: &HandlerArgs,
98    ) -> Result<u32> {
99        let job_id = match &target_relation {
100            AnalyzeTarget::Id(id) => *id,
101            AnalyzeTarget::Index(name)
102            | AnalyzeTarget::Table(name)
103            | AnalyzeTarget::Sink(name)
104            | AnalyzeTarget::MaterializedView(name) => {
105                let session = &handler_args.session;
106                let db_name = session.database();
107                let (schema_name, name) =
108                    Binder::resolve_schema_qualified_name(&db_name, name.clone())?;
109                let search_path = session.config().search_path();
110                let user_name = &session.user_name();
111                let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
112
113                let catalog_reader = handler_args.session.env().catalog_reader();
114                let catalog = catalog_reader.read_guard();
115
116                match target_relation {
117                    AnalyzeTarget::Index(_) => {
118                        let (catalog, _schema_name) =
119                            catalog.get_index_by_name(&db_name, schema_path, &name)?;
120                        catalog.id.index_id
121                    }
122                    AnalyzeTarget::Table(_) => {
123                        let (catalog, _schema_name) =
124                            catalog.get_any_table_by_name(&db_name, schema_path, &name)?;
125                        catalog.id.table_id
126                    }
127                    AnalyzeTarget::Sink(_) => {
128                        let (catalog, _schema_name) =
129                            catalog.get_sink_by_name(&db_name, schema_path, &name)?;
130                        catalog.id.sink_id
131                    }
132                    AnalyzeTarget::MaterializedView(_) => {
133                        let (catalog, _schema_name) =
134                            catalog.get_any_table_by_name(&db_name, schema_path, &name)?;
135                        catalog.id.table_id
136                    }
137                    AnalyzeTarget::Id(_) => unreachable!(),
138                }
139            }
140        };
141        Ok(job_id)
142    }
143}
144
145/// Utilities for fetching stats from CN
146mod net {
147    use std::collections::HashSet;
148
149    use risingwave_pb::common::WorkerNode;
150    use risingwave_pb::meta::list_table_fragments_response::FragmentInfo;
151    use risingwave_pb::monitor_service::GetProfileStatsRequest;
152    use tokio::time::{Duration, sleep};
153
154    use crate::error::Result;
155    use crate::handler::HandlerArgs;
156    use crate::handler::explain_analyze_stream_job::graph::ExecutorId;
157    use crate::handler::explain_analyze_stream_job::metrics::ExecutorStats;
158    use crate::meta_client::FrontendMetaClient;
159    use crate::session::FrontendEnv;
160
161    pub(super) async fn list_stream_worker_nodes(env: &FrontendEnv) -> Result<Vec<WorkerNode>> {
162        let worker_nodes = env.meta_client().list_all_nodes().await?;
163        let stream_worker_nodes = worker_nodes
164            .into_iter()
165            .filter(|node| {
166                node.property
167                    .as_ref()
168                    .map(|p| p.is_streaming)
169                    .unwrap_or_else(|| false)
170            })
171            .collect::<Vec<_>>();
172        Ok(stream_worker_nodes)
173    }
174
175    // TODO(kwannoel): Only fetch the names, actor_ids and graph of the fragments
176    pub(super) async fn get_fragments(
177        meta_client: &dyn FrontendMetaClient,
178        job_id: u32,
179    ) -> Result<Vec<FragmentInfo>> {
180        let mut fragment_map = meta_client.list_table_fragments(&[job_id]).await?;
181        assert_eq!(fragment_map.len(), 1, "expected only one fragment");
182        let (fragment_job_id, table_fragment_info) = fragment_map.drain().next().unwrap();
183        assert_eq!(fragment_job_id, job_id);
184        Ok(table_fragment_info.fragments)
185    }
186
187    pub(super) async fn get_executor_stats(
188        handler_args: &HandlerArgs,
189        worker_nodes: &[WorkerNode],
190        executor_ids: &HashSet<ExecutorId>,
191        dispatcher_fragment_ids: &[u32],
192        profiling_duration: Duration,
193    ) -> Result<ExecutorStats> {
194        let mut aggregated_stats = ExecutorStats::new();
195        for node in worker_nodes {
196            let mut compute_client = handler_args.session.env().client_pool().get(node).await?;
197            let stats = compute_client
198                .monitor_client
199                .get_profile_stats(GetProfileStatsRequest {
200                    executor_ids: executor_ids.iter().copied().collect(),
201                    dispatcher_fragment_ids: dispatcher_fragment_ids.into(),
202                })
203                .await
204                .expect("get profiling stats failed");
205            aggregated_stats.start_record(
206                executor_ids,
207                dispatcher_fragment_ids,
208                &stats.into_inner(),
209            );
210        }
211
212        sleep(profiling_duration).await;
213
214        for node in worker_nodes {
215            let mut compute_client = handler_args.session.env().client_pool().get(node).await?;
216            let stats = compute_client
217                .monitor_client
218                .get_profile_stats(GetProfileStatsRequest {
219                    executor_ids: executor_ids.iter().copied().collect(),
220                    dispatcher_fragment_ids: dispatcher_fragment_ids.into(),
221                })
222                .await
223                .expect("get profiling stats failed");
224            aggregated_stats.finish_record(
225                executor_ids,
226                dispatcher_fragment_ids,
227                &stats.into_inner(),
228            );
229        }
230
231        Ok(aggregated_stats)
232    }
233}
234
235/// Profiling metrics data structure and utilities
236/// We have 2 stages of metric collection:
237/// 1. Collect the stream node metrics at the **Executor** level.
238/// 2. Merge the stream node metrics into **Operator** level, avg, max, min, etc...
239mod metrics {
240    use std::collections::{HashMap, HashSet};
241
242    use risingwave_pb::monitor_service::GetProfileStatsResponse;
243
244    use crate::catalog::FragmentId;
245    use crate::handler::explain_analyze_stream_job::graph::{ExecutorId, OperatorId};
246    use crate::handler::explain_analyze_stream_job::utils::operator_id_for_dispatch;
247
248    #[expect(dead_code)]
249    #[derive(Default, Debug)]
250    pub(super) struct ExecutorMetrics {
251        pub executor_id: ExecutorId,
252        pub epoch: u32,
253        pub total_output_throughput: u64,
254        pub total_output_pending_ns: u64,
255    }
256
257    #[derive(Default, Debug)]
258    pub(super) struct DispatchMetrics {
259        pub fragment_id: FragmentId,
260        pub epoch: u32,
261        pub total_output_throughput: u64,
262        pub total_output_pending_ns: u64,
263    }
264
265    #[derive(Debug)]
266    pub(super) struct ExecutorStats {
267        executor_stats: HashMap<ExecutorId, ExecutorMetrics>,
268        dispatch_stats: HashMap<FragmentId, DispatchMetrics>,
269    }
270
271    impl ExecutorStats {
272        pub(super) fn new() -> Self {
273            ExecutorStats {
274                executor_stats: HashMap::new(),
275                dispatch_stats: HashMap::new(),
276            }
277        }
278
279        pub fn get(&self, executor_id: &ExecutorId) -> Option<&ExecutorMetrics> {
280            self.executor_stats.get(executor_id)
281        }
282
283        /// Establish metrics baseline for profiling
284        pub(super) fn start_record<'a>(
285            &mut self,
286            executor_ids: &'a HashSet<ExecutorId>,
287            dispatch_fragment_ids: &'a [FragmentId],
288            metrics: &'a GetProfileStatsResponse,
289        ) {
290            for executor_id in executor_ids {
291                let Some(total_output_throughput) =
292                    metrics.stream_node_output_row_count.get(executor_id)
293                else {
294                    continue;
295                };
296                let Some(total_output_pending_ns) = metrics
297                    .stream_node_output_blocking_duration_ns
298                    .get(executor_id)
299                else {
300                    continue;
301                };
302                let stats = ExecutorMetrics {
303                    executor_id: *executor_id,
304                    epoch: 0,
305                    total_output_throughput: *total_output_throughput,
306                    total_output_pending_ns: *total_output_pending_ns,
307                };
308                // An executor should be scheduled on a single worker node,
309                // it should not be inserted multiple times.
310                assert!(self.executor_stats.insert(*executor_id, stats).is_none());
311            }
312
313            for fragment_id in dispatch_fragment_ids {
314                let Some(total_output_throughput) =
315                    metrics.dispatch_fragment_output_row_count.get(fragment_id)
316                else {
317                    continue;
318                };
319                let Some(total_output_pending_ns) = metrics
320                    .dispatch_fragment_output_blocking_duration_ns
321                    .get(fragment_id)
322                else {
323                    continue;
324                };
325                let stats = self.dispatch_stats.entry(*fragment_id).or_default();
326                stats.fragment_id = *fragment_id;
327                stats.epoch = 0;
328                // do a sum rather than insert
329                // because dispatchers are
330                // distributed across worker nodes.
331                stats.total_output_throughput += *total_output_throughput;
332                stats.total_output_pending_ns += *total_output_pending_ns;
333            }
334        }
335
336        /// Compute the deltas for reporting
337        pub(super) fn finish_record<'a>(
338            &mut self,
339            executor_ids: &'a HashSet<ExecutorId>,
340            dispatch_fragment_ids: &'a [FragmentId],
341            metrics: &'a GetProfileStatsResponse,
342        ) {
343            for executor_id in executor_ids {
344                let Some(stats) = self.executor_stats.get_mut(executor_id) else {
345                    continue;
346                };
347                let Some(total_output_throughput) =
348                    metrics.stream_node_output_row_count.get(executor_id)
349                else {
350                    continue;
351                };
352                let Some(total_output_pending_ns) = metrics
353                    .stream_node_output_blocking_duration_ns
354                    .get(executor_id)
355                else {
356                    continue;
357                };
358                let Some(throughput_delta) =
359                    total_output_throughput.checked_sub(stats.total_output_throughput)
360                else {
361                    continue;
362                };
363                let Some(output_ns_delta) =
364                    total_output_pending_ns.checked_sub(stats.total_output_pending_ns)
365                else {
366                    continue;
367                };
368                stats.total_output_throughput = throughput_delta;
369                stats.total_output_pending_ns = output_ns_delta;
370            }
371
372            for fragment_id in dispatch_fragment_ids {
373                let Some(stats) = self.dispatch_stats.get_mut(fragment_id) else {
374                    continue;
375                };
376                let Some(total_output_throughput) =
377                    metrics.dispatch_fragment_output_row_count.get(fragment_id)
378                else {
379                    continue;
380                };
381                let Some(total_output_pending_ns) = metrics
382                    .dispatch_fragment_output_blocking_duration_ns
383                    .get(fragment_id)
384                else {
385                    continue;
386                };
387                let Some(throughput_delta) =
388                    total_output_throughput.checked_sub(stats.total_output_throughput)
389                else {
390                    continue;
391                };
392                let Some(output_ns_delta) =
393                    total_output_pending_ns.checked_sub(stats.total_output_pending_ns)
394                else {
395                    continue;
396                };
397                stats.total_output_throughput = throughput_delta;
398                stats.total_output_pending_ns = output_ns_delta;
399            }
400        }
401    }
402
403    #[expect(dead_code)]
404    #[derive(Debug)]
405    pub(super) struct OperatorMetrics {
406        pub operator_id: OperatorId,
407        pub epoch: u32,
408        pub total_output_throughput: u64,
409        pub total_output_pending_ns: u64,
410    }
411
412    #[derive(Debug)]
413    pub(super) struct OperatorStats {
414        inner: HashMap<OperatorId, OperatorMetrics>,
415    }
416
417    impl OperatorStats {
418        /// Aggregates executor-level stats into operator-level stats
419        pub(super) fn aggregate(
420            operator_map: HashMap<OperatorId, HashSet<ExecutorId>>,
421            executor_stats: &ExecutorStats,
422            fragment_parallelisms: &HashMap<FragmentId, usize>,
423        ) -> Self {
424            let mut operator_stats = HashMap::new();
425            'operator_loop: for (operator_id, executor_ids) in operator_map {
426                let num_executors = executor_ids.len() as u64;
427                let mut total_output_throughput = 0;
428                let mut total_output_pending_ns = 0;
429                for executor_id in executor_ids {
430                    if let Some(stats) = executor_stats.get(&executor_id) {
431                        total_output_throughput += stats.total_output_throughput;
432                        total_output_pending_ns += stats.total_output_pending_ns;
433                    } else {
434                        // skip this operator if it doesn't have executor stats for any of its executors
435                        continue 'operator_loop;
436                    }
437                }
438                let total_output_throughput = total_output_throughput;
439                let total_output_pending_ns = total_output_pending_ns / num_executors;
440
441                operator_stats.insert(
442                    operator_id,
443                    OperatorMetrics {
444                        operator_id,
445                        epoch: 0,
446                        total_output_throughput,
447                        total_output_pending_ns,
448                    },
449                );
450            }
451
452            for (fragment_id, dispatch_metrics) in &executor_stats.dispatch_stats {
453                let operator_id = operator_id_for_dispatch(*fragment_id);
454                let total_output_throughput = dispatch_metrics.total_output_throughput;
455                let fragment_parallelism = fragment_parallelisms
456                    .get(fragment_id)
457                    .copied()
458                    .expect("should have fragment parallelism");
459                let total_output_pending_ns =
460                    dispatch_metrics.total_output_pending_ns / fragment_parallelism as u64;
461
462                operator_stats.insert(
463                    operator_id,
464                    OperatorMetrics {
465                        operator_id,
466                        epoch: 0,
467                        total_output_throughput,
468                        total_output_pending_ns,
469                    },
470                );
471            }
472
473            OperatorStats {
474                inner: operator_stats,
475            }
476        }
477
478        pub fn get(&self, operator_id: &OperatorId) -> Option<&OperatorMetrics> {
479            self.inner.get(operator_id)
480        }
481    }
482}
483
484/// Utilities for the stream node graph:
485/// rendering, extracting, etc.
486mod graph {
487    use std::collections::{HashMap, HashSet};
488    use std::time::Duration;
489
490    use itertools::Itertools;
491    use risingwave_common::operator::{
492        unique_executor_id_from_unique_operator_id, unique_operator_id,
493    };
494    use risingwave_pb::meta::list_table_fragments_response::FragmentInfo;
495    use risingwave_pb::stream_plan::stream_node::{NodeBody, NodeBodyDiscriminants};
496    use risingwave_pb::stream_plan::{MergeNode, StreamNode as PbStreamNode};
497
498    use crate::handler::explain_analyze_stream_job::ExplainAnalyzeStreamJobOutput;
499    use crate::handler::explain_analyze_stream_job::metrics::OperatorStats;
500    use crate::handler::explain_analyze_stream_job::utils::operator_id_for_dispatch;
501    pub(super) type OperatorId = u64;
502    pub(super) type ExecutorId = u64;
503
504    /// This is an internal struct used ONLY for explain analyze stream job.
505    #[derive(Debug)]
506    pub(super) struct StreamNode {
507        operator_id: OperatorId,
508        fragment_id: u32,
509        identity: NodeBodyDiscriminants,
510        actor_ids: HashSet<u32>,
511        dependencies: Vec<u64>,
512    }
513
514    impl StreamNode {
515        fn new_for_dispatcher(fragment_id: u32) -> Self {
516            StreamNode {
517                operator_id: operator_id_for_dispatch(fragment_id),
518                fragment_id,
519                identity: NodeBodyDiscriminants::Exchange,
520                actor_ids: Default::default(),
521                dependencies: Default::default(),
522            }
523        }
524    }
525
526    /// Extracts the root node of the plan, as well as the adjacency list
527    pub(super) fn extract_stream_node_infos(
528        fragments: Vec<FragmentInfo>,
529    ) -> (OperatorId, HashMap<OperatorId, StreamNode>) {
530        // Finds root nodes of the graph
531
532        fn find_root_nodes(stream_nodes: &HashMap<u64, StreamNode>) -> HashSet<u64> {
533            let mut all_nodes = stream_nodes.keys().copied().collect::<HashSet<_>>();
534            for node in stream_nodes.values() {
535                for dependency in &node.dependencies {
536                    all_nodes.remove(dependency);
537                }
538            }
539            all_nodes
540        }
541
542        // Recursively extracts stream node info, and builds an adjacency list between stream nodes
543        // and their dependencies
544        fn extract_stream_node_info(
545            fragment_id: u32,
546            fragment_id_to_merge_operator_id: &mut HashMap<u32, OperatorId>,
547            operator_id_to_stream_node: &mut HashMap<OperatorId, StreamNode>,
548            node: &PbStreamNode,
549            actor_ids: &HashSet<u32>,
550        ) {
551            let identity = node
552                .node_body
553                .as_ref()
554                .expect("should have node body")
555                .into();
556            let operator_id = unique_operator_id(fragment_id, node.operator_id);
557            if let Some(merge_node) = node.node_body.as_ref()
558                && let NodeBody::Merge(box MergeNode {
559                    upstream_fragment_id,
560                    ..
561                }) = merge_node
562            {
563                fragment_id_to_merge_operator_id.insert(*upstream_fragment_id, operator_id);
564            }
565            let dependencies = &node.input;
566            let dependency_ids = dependencies
567                .iter()
568                .map(|input| unique_operator_id(fragment_id, input.operator_id))
569                .collect::<Vec<_>>();
570            operator_id_to_stream_node.insert(
571                operator_id,
572                StreamNode {
573                    operator_id,
574                    fragment_id,
575                    identity,
576                    actor_ids: actor_ids.clone(),
577                    dependencies: dependency_ids,
578                },
579            );
580            for dependency in dependencies {
581                extract_stream_node_info(
582                    fragment_id,
583                    fragment_id_to_merge_operator_id,
584                    operator_id_to_stream_node,
585                    dependency,
586                    actor_ids,
587                );
588            }
589        }
590
591        // build adjacency list and hanging merge edges.
592        // hanging merge edges will be filled in the following section.
593        let mut operator_id_to_stream_node = HashMap::new();
594        let mut fragment_id_to_merge_operator_id = HashMap::new();
595        for fragment in fragments {
596            let actors = fragment.actors;
597            assert!(
598                !actors.is_empty(),
599                "fragment {} should have at least one actor",
600                fragment.id
601            );
602            let actor_ids = actors.iter().map(|actor| actor.id).collect::<HashSet<_>>();
603            let node = actors[0].node.as_ref().expect("should have stream node");
604            extract_stream_node_info(
605                fragment.id,
606                &mut fragment_id_to_merge_operator_id,
607                &mut operator_id_to_stream_node,
608                node,
609                &actor_ids,
610            );
611        }
612
613        // find root node, and fill in dispatcher edges + nodes.
614        let root_or_dispatch_nodes = find_root_nodes(&operator_id_to_stream_node);
615        let mut root_node = None;
616        for operator_id in root_or_dispatch_nodes {
617            let node = operator_id_to_stream_node.get_mut(&operator_id).unwrap();
618            let fragment_id = node.fragment_id;
619            if let Some(merge_operator_id) = fragment_id_to_merge_operator_id.get(&fragment_id) {
620                let mut dispatcher = StreamNode::new_for_dispatcher(fragment_id);
621                let operator_id_for_dispatch = dispatcher.operator_id;
622                dispatcher.dependencies.push(operator_id);
623                assert!(
624                    operator_id_to_stream_node
625                        .insert(operator_id_for_dispatch as _, dispatcher)
626                        .is_none()
627                );
628                operator_id_to_stream_node
629                    .get_mut(merge_operator_id)
630                    .unwrap()
631                    .dependencies
632                    .push(operator_id_for_dispatch as _)
633            } else {
634                root_node = Some(operator_id);
635            }
636        }
637
638        (root_node.unwrap(), operator_id_to_stream_node)
639    }
640
641    pub(super) fn extract_executor_infos(
642        adjacency_list: &HashMap<OperatorId, StreamNode>,
643    ) -> (HashSet<u64>, HashMap<u64, HashSet<u64>>) {
644        let mut executor_ids = HashSet::new();
645        let mut operator_to_executor = HashMap::new();
646        for (operator_id, node) in adjacency_list {
647            assert_eq!(*operator_id, node.operator_id);
648            let operator_id = node.operator_id;
649            for actor_id in &node.actor_ids {
650                let executor_id =
651                    unique_executor_id_from_unique_operator_id(*actor_id, operator_id);
652                assert!(executor_ids.insert(executor_id));
653                assert!(
654                    operator_to_executor
655                        .entry(operator_id)
656                        .or_insert_with(HashSet::new)
657                        .insert(executor_id)
658                );
659            }
660        }
661        (executor_ids, operator_to_executor)
662    }
663
664    // Do a DFS based rendering. Each node will occupy its own row.
665    // Schema:
666    // | Operator ID | Identity | Actor IDs | Metrics ... |
667    // Each node will be indented based on its depth in the graph.
668    pub(super) fn render_graph_with_metrics(
669        adjacency_list: &HashMap<u64, StreamNode>,
670        root_node: u64,
671        stats: &OperatorStats,
672        profiling_duration: &Duration,
673    ) -> Vec<ExplainAnalyzeStreamJobOutput> {
674        let profiling_duration_secs = profiling_duration.as_secs_f64();
675        let mut rows = vec![];
676        let mut stack = vec![(String::new(), true, root_node)];
677        while let Some((prefix, last_child, node_id)) = stack.pop() {
678            let Some(node) = adjacency_list.get(&node_id) else {
679                continue;
680            };
681            let is_root = node_id == root_node;
682
683            let identity_rendered = if is_root {
684                node.identity.to_string()
685            } else {
686                let connector = if last_child { "└─ " } else { "├─ " };
687                format!("{}{}{}", prefix, connector, node.identity)
688            };
689
690            let child_prefix = if is_root {
691                ""
692            } else if last_child {
693                "   "
694            } else {
695                "│  "
696            };
697            let child_prefix = format!("{}{}", prefix, child_prefix);
698
699            let stats = stats.get(&node_id);
700            let (output_rows_per_second, downstream_backpressure_ratio) = match stats {
701                Some(stats) => (
702                    Some(
703                        (stats.total_output_throughput as f64 / profiling_duration_secs)
704                            .to_string(),
705                    ),
706                    Some(
707                        (Duration::from_nanos(stats.total_output_pending_ns).as_secs_f64()
708                            / usize::max(node.actor_ids.len(), 1) as f64
709                            / profiling_duration_secs)
710                            .to_string(),
711                    ),
712                ),
713                None => (None, None),
714            };
715            let row = ExplainAnalyzeStreamJobOutput {
716                identity: identity_rendered,
717                actor_ids: node
718                    .actor_ids
719                    .iter()
720                    .sorted()
721                    .map(|id| id.to_string())
722                    .collect::<Vec<_>>()
723                    .join(","),
724                output_rows_per_second,
725                downstream_backpressure_ratio,
726            };
727            rows.push(row);
728            for (position, dependency) in node.dependencies.iter().enumerate() {
729                stack.push((child_prefix.clone(), position == 0, *dependency));
730            }
731        }
732        rows
733    }
734}
735
736mod utils {
737    use risingwave_common::operator::unique_operator_id;
738
739    use crate::handler::explain_analyze_stream_job::graph::OperatorId;
740
741    pub(super) fn operator_id_for_dispatch(fragment_id: u32) -> OperatorId {
742        unique_operator_id(fragment_id, u32::MAX as u64)
743    }
744}