risingwave_frontend/handler/
explain_analyze_stream_job.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use pgwire::pg_response::StatementType;
18use risingwave_common::types::Fields;
19use risingwave_sqlparser::ast::AnalyzeTarget;
20use tokio::time::Duration;
21
22use crate::error::Result;
23use crate::handler::explain_analyze_stream_job::graph::{
24    extract_executor_infos, extract_stream_node_infos, render_graph_with_metrics,
25};
26use crate::handler::{HandlerArgs, RwPgResponse, RwPgResponseBuilder, RwPgResponseBuilderExt};
27
28#[macro_export]
29macro_rules! debug_panic_or_warn {
30    ($($arg:tt)*) => {
31        if cfg!(debug_assertions) || cfg!(madsim) {
32            panic!($($arg)*);
33        } else {
34            tracing::warn!($($arg)*);
35        }
36    };
37}
38
39#[derive(Fields)]
40struct ExplainAnalyzeStreamJobOutput {
41    identity: String,
42    actor_ids: String,
43    output_rows_per_second: Option<String>,
44    downstream_backpressure_ratio: Option<String>,
45}
46
47pub async fn handle_explain_analyze_stream_job(
48    handler_args: HandlerArgs,
49    target: AnalyzeTarget,
50    duration_secs: Option<u64>,
51) -> Result<RwPgResponse> {
52    let profiling_duration = Duration::from_secs(duration_secs.unwrap_or(10));
53    let job_id = bind::bind_relation(&target, &handler_args)?;
54
55    let meta_client = handler_args.session.env().meta_client();
56    let fragments = net::get_fragments(meta_client, job_id).await?;
57    let fragment_parallelisms = fragments
58        .iter()
59        .map(|f| (f.id, f.actors.len()))
60        .collect::<HashMap<_, _>>();
61    let (root_node, dispatcher_fragment_ids, adjacency_list) = extract_stream_node_infos(fragments);
62    let (executor_ids, operator_to_executor) = extract_executor_infos(&adjacency_list);
63    tracing::debug!(
64        ?fragment_parallelisms,
65        ?root_node,
66        ?dispatcher_fragment_ids,
67        ?adjacency_list,
68        "explain analyze metadata"
69    );
70
71    let worker_nodes = net::list_stream_worker_nodes(handler_args.session.env()).await?;
72
73    let executor_stats = net::get_executor_stats(
74        &handler_args,
75        &worker_nodes,
76        &executor_ids,
77        &dispatcher_fragment_ids,
78        profiling_duration,
79    )
80    .await?;
81    tracing::debug!(?executor_stats, "collected executor stats");
82    let aggregated_stats = metrics::OperatorStats::aggregate(
83        operator_to_executor,
84        &executor_stats,
85        &fragment_parallelisms,
86    );
87    tracing::debug!(?aggregated_stats, "collected aggregated stats");
88
89    // Render graph with metrics
90    let rows = render_graph_with_metrics(
91        &adjacency_list,
92        root_node,
93        &aggregated_stats,
94        &profiling_duration,
95    );
96    let builder = RwPgResponseBuilder::empty(StatementType::EXPLAIN);
97    let builder = builder.rows(rows);
98    Ok(builder.into())
99}
100
101/// Binding pass, since we don't go through the binder.
102/// TODO(noel): Should this be in binder? But it may make compilation slower and doesn't require any binder logic...
103mod bind {
104    use risingwave_sqlparser::ast::AnalyzeTarget;
105
106    use crate::Binder;
107    use crate::catalog::root_catalog::SchemaPath;
108    use crate::error::Result;
109    use crate::handler::HandlerArgs;
110
111    /// Bind the analyze target relation to its actual id.
112    pub(super) fn bind_relation(
113        target_relation: &AnalyzeTarget,
114        handler_args: &HandlerArgs,
115    ) -> Result<u32> {
116        let job_id = match &target_relation {
117            AnalyzeTarget::Id(id) => *id,
118            AnalyzeTarget::Index(name)
119            | AnalyzeTarget::Table(name)
120            | AnalyzeTarget::Sink(name)
121            | AnalyzeTarget::MaterializedView(name) => {
122                let session = &handler_args.session;
123                let db_name = session.database();
124                let (schema_name, name) = Binder::resolve_schema_qualified_name(&db_name, name)?;
125                let search_path = session.config().search_path();
126                let user_name = &session.user_name();
127                let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
128
129                let catalog_reader = handler_args.session.env().catalog_reader();
130                let catalog = catalog_reader.read_guard();
131
132                match target_relation {
133                    AnalyzeTarget::Index(_) => {
134                        let (catalog, _schema_name) =
135                            catalog.get_index_by_name(&db_name, schema_path, &name)?;
136                        catalog.id.index_id
137                    }
138                    AnalyzeTarget::Table(_) => {
139                        let (catalog, _schema_name) =
140                            catalog.get_any_table_by_name(&db_name, schema_path, &name)?;
141                        catalog.id.table_id
142                    }
143                    AnalyzeTarget::Sink(_) => {
144                        let (catalog, _schema_name) =
145                            catalog.get_any_sink_by_name(&db_name, schema_path, &name)?;
146                        catalog.id.sink_id
147                    }
148                    AnalyzeTarget::MaterializedView(_) => {
149                        let (catalog, _schema_name) =
150                            catalog.get_any_table_by_name(&db_name, schema_path, &name)?;
151                        catalog.id.table_id
152                    }
153                    AnalyzeTarget::Id(_) => unreachable!(),
154                }
155            }
156        };
157        Ok(job_id)
158    }
159}
160
161/// Utilities for fetching stats from CN
162mod net {
163    use std::collections::HashSet;
164
165    use risingwave_pb::common::WorkerNode;
166    use risingwave_pb::meta::list_table_fragments_response::FragmentInfo;
167    use risingwave_pb::monitor_service::GetProfileStatsRequest;
168    use tokio::time::{Duration, sleep};
169
170    use crate::error::Result;
171    use crate::handler::HandlerArgs;
172    use crate::handler::explain_analyze_stream_job::graph::ExecutorId;
173    use crate::handler::explain_analyze_stream_job::metrics::ExecutorStats;
174    use crate::meta_client::FrontendMetaClient;
175    use crate::session::FrontendEnv;
176
177    pub(super) async fn list_stream_worker_nodes(env: &FrontendEnv) -> Result<Vec<WorkerNode>> {
178        let worker_nodes = env.meta_client().list_all_nodes().await?;
179        let stream_worker_nodes = worker_nodes
180            .into_iter()
181            .filter(|node| {
182                node.property
183                    .as_ref()
184                    .map(|p| p.is_streaming)
185                    .unwrap_or_else(|| false)
186            })
187            .collect::<Vec<_>>();
188        Ok(stream_worker_nodes)
189    }
190
191    // TODO(kwannoel): Only fetch the names, actor_ids and graph of the fragments
192    pub(super) async fn get_fragments(
193        meta_client: &dyn FrontendMetaClient,
194        job_id: u32,
195    ) -> Result<Vec<FragmentInfo>> {
196        let mut fragment_map = meta_client.list_table_fragments(&[job_id]).await?;
197        assert_eq!(fragment_map.len(), 1, "expected only one fragment");
198        let (fragment_job_id, table_fragment_info) = fragment_map.drain().next().unwrap();
199        assert_eq!(fragment_job_id, job_id);
200        Ok(table_fragment_info.fragments)
201    }
202
203    pub(super) async fn get_executor_stats(
204        handler_args: &HandlerArgs,
205        worker_nodes: &[WorkerNode],
206        executor_ids: &HashSet<ExecutorId>,
207        dispatcher_fragment_ids: &HashSet<u32>,
208        profiling_duration: Duration,
209    ) -> Result<ExecutorStats> {
210        let dispatcher_fragment_ids = dispatcher_fragment_ids.iter().copied().collect::<Vec<_>>();
211        let mut initial_aggregated_stats = ExecutorStats::new();
212        for node in worker_nodes {
213            let mut compute_client = handler_args.session.env().client_pool().get(node).await?;
214            let stats = compute_client
215                .monitor_client
216                .get_profile_stats(GetProfileStatsRequest {
217                    executor_ids: executor_ids.iter().copied().collect(),
218                    dispatcher_fragment_ids: dispatcher_fragment_ids.clone(),
219                })
220                .await
221                .expect("get profiling stats failed");
222            initial_aggregated_stats.record(
223                executor_ids,
224                &dispatcher_fragment_ids,
225                &stats.into_inner(),
226            );
227        }
228        tracing::debug!(?initial_aggregated_stats, "initial aggregated stats");
229
230        sleep(profiling_duration).await;
231
232        let mut final_aggregated_stats = ExecutorStats::new();
233        for node in worker_nodes {
234            let mut compute_client = handler_args.session.env().client_pool().get(node).await?;
235            let stats = compute_client
236                .monitor_client
237                .get_profile_stats(GetProfileStatsRequest {
238                    executor_ids: executor_ids.iter().copied().collect(),
239                    dispatcher_fragment_ids: dispatcher_fragment_ids.clone(),
240                })
241                .await
242                .expect("get profiling stats failed");
243            final_aggregated_stats.record(
244                executor_ids,
245                &dispatcher_fragment_ids,
246                &stats.into_inner(),
247            );
248        }
249        tracing::debug!(?final_aggregated_stats, "final aggregated stats");
250
251        let delta_aggregated_stats = ExecutorStats::get_delta(
252            &initial_aggregated_stats,
253            &final_aggregated_stats,
254            executor_ids,
255            &dispatcher_fragment_ids,
256        );
257
258        Ok(delta_aggregated_stats)
259    }
260}
261
262/// Profiling metrics data structure and utilities
263/// We have 2 stages of metric collection:
264/// 1. Collect the stream node metrics at the **Executor** level.
265/// 2. Merge the stream node metrics into **Operator** level, avg, max, min, etc...
266mod metrics {
267    use std::collections::{HashMap, HashSet};
268
269    use risingwave_common::operator::unique_executor_id_into_parts;
270    use risingwave_pb::monitor_service::GetProfileStatsResponse;
271
272    use crate::catalog::FragmentId;
273    use crate::handler::explain_analyze_stream_job::graph::{ExecutorId, OperatorId};
274    use crate::handler::explain_analyze_stream_job::utils::operator_id_for_dispatch;
275
276    #[expect(dead_code)]
277    #[derive(Default, Debug)]
278    pub(super) struct ExecutorMetrics {
279        pub executor_id: ExecutorId,
280        pub epoch: u32,
281        pub total_output_throughput: u64,
282        pub total_output_pending_ns: u64,
283    }
284
285    #[derive(Default, Debug)]
286    pub(super) struct DispatchMetrics {
287        pub fragment_id: FragmentId,
288        pub epoch: u32,
289        pub total_output_throughput: u64,
290        pub total_output_pending_ns: u64,
291    }
292
293    #[derive(Debug)]
294    pub(super) struct ExecutorStats {
295        executor_stats: HashMap<ExecutorId, ExecutorMetrics>,
296        dispatch_stats: HashMap<FragmentId, DispatchMetrics>,
297    }
298
299    impl ExecutorStats {
300        pub(super) fn new() -> Self {
301            ExecutorStats {
302                executor_stats: HashMap::new(),
303                dispatch_stats: HashMap::new(),
304            }
305        }
306
307        pub fn get(&self, executor_id: &ExecutorId) -> Option<&ExecutorMetrics> {
308            self.executor_stats.get(executor_id)
309        }
310
311        /// Record metrics for profiling
312        pub(super) fn record<'a>(
313            &mut self,
314            executor_ids: &'a HashSet<ExecutorId>,
315            dispatch_fragment_ids: &'a [FragmentId],
316            metrics: &'a GetProfileStatsResponse,
317        ) {
318            for executor_id in executor_ids {
319                let Some(total_output_throughput) =
320                    metrics.stream_node_output_row_count.get(executor_id)
321                else {
322                    continue;
323                };
324                let Some(total_output_pending_ns) = metrics
325                    .stream_node_output_blocking_duration_ns
326                    .get(executor_id)
327                else {
328                    continue;
329                };
330                let stats = ExecutorMetrics {
331                    executor_id: *executor_id,
332                    epoch: 0,
333                    total_output_throughput: *total_output_throughput,
334                    total_output_pending_ns: *total_output_pending_ns,
335                };
336                // An executor should be scheduled on a single worker node,
337                // it should not be inserted multiple times.
338                if cfg!(madsim) {
339                    // If madsim is enabled, worker nodes will share the same process.
340                    // The metrics is stored as a global object, so querying each worker node
341                    // will return the same set of executor metrics.
342                    // So we should not assert here.
343                    self.executor_stats.insert(*executor_id, stats);
344                } else {
345                    assert!(self.executor_stats.insert(*executor_id, stats).is_none());
346                }
347            }
348
349            for fragment_id in dispatch_fragment_ids {
350                let Some(total_output_throughput) =
351                    metrics.dispatch_fragment_output_row_count.get(fragment_id)
352                else {
353                    continue;
354                };
355                let Some(total_output_pending_ns) = metrics
356                    .dispatch_fragment_output_blocking_duration_ns
357                    .get(fragment_id)
358                else {
359                    continue;
360                };
361                let stats = self.dispatch_stats.entry(*fragment_id).or_default();
362                stats.fragment_id = *fragment_id;
363                stats.epoch = 0;
364                // do a sum rather than insert
365                // because dispatchers are
366                // distributed across worker nodes.
367                stats.total_output_throughput += *total_output_throughput;
368                stats.total_output_pending_ns += *total_output_pending_ns;
369            }
370        }
371
372        pub(super) fn get_delta(
373            initial: &Self,
374            end: &Self,
375            executor_ids: &HashSet<ExecutorId>,
376            dispatch_fragment_ids: &[FragmentId],
377        ) -> Self {
378            let mut delta_aggregated_stats = Self::new();
379            for executor_id in executor_ids {
380                let (actor_id, operator_id) = unique_executor_id_into_parts(*executor_id);
381                let Some(initial_stats) = initial.executor_stats.get(executor_id) else {
382                    debug_panic_or_warn!(
383                        "missing initial stats for executor {} (actor {} operator {})",
384                        executor_id,
385                        actor_id,
386                        operator_id
387                    );
388                    continue;
389                };
390                let Some(end_stats) = end.executor_stats.get(executor_id) else {
391                    debug_panic_or_warn!(
392                        "missing final stats for executor {} (actor {} operator {})",
393                        executor_id,
394                        actor_id,
395                        operator_id
396                    );
397                    continue;
398                };
399
400                let initial_throughput = initial_stats.total_output_throughput;
401                let end_throughput = end_stats.total_output_throughput;
402                let Some(delta_throughput) = end_throughput.checked_sub(initial_throughput) else {
403                    debug_panic_or_warn!(
404                        "delta throughput is negative for actor {} operator {} (initial: {}, end: {})",
405                        actor_id,
406                        operator_id,
407                        initial_throughput,
408                        end_throughput
409                    );
410                    continue;
411                };
412
413                let initial_pending_ns = initial_stats.total_output_pending_ns;
414                let end_pending_ns = end_stats.total_output_pending_ns;
415                let Some(delta_pending_ns) = end_pending_ns.checked_sub(initial_pending_ns) else {
416                    debug_panic_or_warn!(
417                        "delta pending ns is negative for actor {} operator {} (initial: {}, end: {})",
418                        actor_id,
419                        operator_id,
420                        initial_pending_ns,
421                        end_pending_ns
422                    );
423                    continue;
424                };
425
426                let delta_stats = ExecutorMetrics {
427                    executor_id: *executor_id,
428                    epoch: 0,
429                    total_output_throughput: delta_throughput,
430                    total_output_pending_ns: delta_pending_ns,
431                };
432                delta_aggregated_stats
433                    .executor_stats
434                    .insert(*executor_id, delta_stats);
435            }
436
437            for fragment_id in dispatch_fragment_ids {
438                let Some(initial_stats) = initial.dispatch_stats.get(fragment_id) else {
439                    debug_panic_or_warn!("missing initial stats for fragment {}", fragment_id);
440                    continue;
441                };
442                let Some(end_stats) = end.dispatch_stats.get(fragment_id) else {
443                    debug_panic_or_warn!("missing final stats for fragment {}", fragment_id);
444                    continue;
445                };
446
447                let initial_throughput = initial_stats.total_output_throughput;
448                let end_throughput = end_stats.total_output_throughput;
449                let Some(delta_throughput) = end_throughput.checked_sub(initial_throughput) else {
450                    debug_panic_or_warn!(
451                        "delta throughput is negative for fragment {} (initial: {}, end: {})",
452                        fragment_id,
453                        initial_throughput,
454                        end_throughput
455                    );
456                    continue;
457                };
458
459                let initial_pending_ns = initial_stats.total_output_pending_ns;
460                let end_pending_ns = end_stats.total_output_pending_ns;
461                let Some(delta_pending_ns) = end_pending_ns.checked_sub(initial_pending_ns) else {
462                    debug_panic_or_warn!(
463                        "delta pending ns is negative for fragment {} (initial: {}, end: {})",
464                        fragment_id,
465                        initial_pending_ns,
466                        end_pending_ns
467                    );
468                    continue;
469                };
470
471                let delta_stats = DispatchMetrics {
472                    fragment_id: *fragment_id,
473                    epoch: 0,
474                    total_output_throughput: delta_throughput,
475                    total_output_pending_ns: delta_pending_ns,
476                };
477                delta_aggregated_stats
478                    .dispatch_stats
479                    .insert(*fragment_id, delta_stats);
480            }
481
482            delta_aggregated_stats
483        }
484    }
485
486    #[expect(dead_code)]
487    #[derive(Debug)]
488    pub(super) struct OperatorMetrics {
489        pub operator_id: OperatorId,
490        pub epoch: u32,
491        pub total_output_throughput: u64,
492        pub total_output_pending_ns: u64,
493    }
494
495    #[derive(Debug)]
496    pub(super) struct OperatorStats {
497        inner: HashMap<OperatorId, OperatorMetrics>,
498    }
499
500    impl OperatorStats {
501        /// Aggregates executor-level stats into operator-level stats
502        pub(super) fn aggregate(
503            operator_map: HashMap<OperatorId, HashSet<ExecutorId>>,
504            executor_stats: &ExecutorStats,
505            fragment_parallelisms: &HashMap<FragmentId, usize>,
506        ) -> Self {
507            let mut operator_stats = HashMap::new();
508            'operator_loop: for (operator_id, executor_ids) in operator_map {
509                let num_executors = executor_ids.len() as u64;
510                let mut total_output_throughput = 0;
511                let mut total_output_pending_ns = 0;
512                for executor_id in executor_ids {
513                    if let Some(stats) = executor_stats.get(&executor_id) {
514                        total_output_throughput += stats.total_output_throughput;
515                        total_output_pending_ns += stats.total_output_pending_ns;
516                    } else {
517                        // skip this operator if it doesn't have executor stats for any of its executors
518                        continue 'operator_loop;
519                    }
520                }
521                let total_output_throughput = total_output_throughput;
522                let total_output_pending_ns = total_output_pending_ns / num_executors;
523
524                operator_stats.insert(
525                    operator_id,
526                    OperatorMetrics {
527                        operator_id,
528                        epoch: 0,
529                        total_output_throughput,
530                        total_output_pending_ns,
531                    },
532                );
533            }
534
535            for (fragment_id, dispatch_metrics) in &executor_stats.dispatch_stats {
536                let operator_id = operator_id_for_dispatch(*fragment_id);
537                let total_output_throughput = dispatch_metrics.total_output_throughput;
538                let Some(fragment_parallelism) = fragment_parallelisms.get(fragment_id) else {
539                    debug_panic_or_warn!(
540                        "missing fragment parallelism for fragment {}",
541                        fragment_id
542                    );
543                    continue;
544                };
545                let total_output_pending_ns =
546                    dispatch_metrics.total_output_pending_ns / *fragment_parallelism as u64;
547
548                operator_stats.insert(
549                    operator_id,
550                    OperatorMetrics {
551                        operator_id,
552                        epoch: 0,
553                        total_output_throughput,
554                        total_output_pending_ns,
555                    },
556                );
557            }
558
559            OperatorStats {
560                inner: operator_stats,
561            }
562        }
563
564        pub fn get(&self, operator_id: &OperatorId) -> Option<&OperatorMetrics> {
565            self.inner.get(operator_id)
566        }
567    }
568}
569
570/// Utilities for the stream node graph:
571/// rendering, extracting, etc.
572mod graph {
573    use std::collections::{HashMap, HashSet};
574    use std::fmt::Debug;
575    use std::time::Duration;
576
577    use itertools::Itertools;
578    use risingwave_common::operator::{
579        unique_executor_id_from_unique_operator_id, unique_operator_id,
580        unique_operator_id_into_parts,
581    };
582    use risingwave_pb::meta::list_table_fragments_response::FragmentInfo;
583    use risingwave_pb::stream_plan::stream_node::{NodeBody, NodeBodyDiscriminants};
584    use risingwave_pb::stream_plan::{MergeNode, StreamNode as PbStreamNode};
585
586    use crate::catalog::FragmentId;
587    use crate::handler::explain_analyze_stream_job::ExplainAnalyzeStreamJobOutput;
588    use crate::handler::explain_analyze_stream_job::metrics::OperatorStats;
589    use crate::handler::explain_analyze_stream_job::utils::operator_id_for_dispatch;
590    pub(super) type OperatorId = u64;
591    pub(super) type ExecutorId = u64;
592
593    /// This is an internal struct used ONLY for explain analyze stream job.
594    pub(super) struct StreamNode {
595        operator_id: OperatorId,
596        fragment_id: u32,
597        identity: NodeBodyDiscriminants,
598        actor_ids: HashSet<u32>,
599        dependencies: Vec<u64>,
600    }
601
602    impl Debug for StreamNode {
603        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
604            let (actor_id, operator_id) = unique_operator_id_into_parts(self.operator_id);
605            let operator_id_str = format!(
606                "{}: (actor_id: {}, operator_id: {})",
607                self.operator_id, actor_id, operator_id
608            );
609            write!(
610                f,
611                "StreamNode {{ operator_id: {}, fragment_id: {}, identity: {:?}, actor_ids: {:?}, dependencies: {:?} }}",
612                operator_id_str, self.fragment_id, self.identity, self.actor_ids, self.dependencies
613            )
614        }
615    }
616
617    impl StreamNode {
618        fn new_for_dispatcher(fragment_id: u32) -> Self {
619            StreamNode {
620                operator_id: operator_id_for_dispatch(fragment_id),
621                fragment_id,
622                identity: NodeBodyDiscriminants::Exchange,
623                actor_ids: Default::default(),
624                dependencies: Default::default(),
625            }
626        }
627    }
628
629    /// Extracts the root node of the plan, as well as the adjacency list
630    pub(super) fn extract_stream_node_infos(
631        fragments: Vec<FragmentInfo>,
632    ) -> (
633        OperatorId,
634        HashSet<FragmentId>,
635        HashMap<OperatorId, StreamNode>,
636    ) {
637        let job_fragment_ids = fragments.iter().map(|f| f.id).collect::<HashSet<_>>();
638
639        // Finds root nodes of the graph
640        fn find_root_nodes(stream_nodes: &HashMap<u64, StreamNode>) -> HashSet<u64> {
641            let mut all_nodes = stream_nodes.keys().copied().collect::<HashSet<_>>();
642            for node in stream_nodes.values() {
643                for dependency in &node.dependencies {
644                    all_nodes.remove(dependency);
645                }
646            }
647            all_nodes
648        }
649
650        // Recursively extracts stream node info, and builds an adjacency list between stream nodes
651        // and their dependencies
652        fn extract_stream_node_info(
653            fragment_id: u32,
654            fragment_id_to_merge_operator_id: &mut HashMap<u32, OperatorId>,
655            operator_id_to_stream_node: &mut HashMap<OperatorId, StreamNode>,
656            node: &PbStreamNode,
657            actor_ids: &HashSet<u32>,
658        ) {
659            let identity = node
660                .node_body
661                .as_ref()
662                .expect("should have node body")
663                .into();
664            let operator_id = unique_operator_id(fragment_id, node.operator_id);
665            if let Some(merge_node) = node.node_body.as_ref()
666                && let NodeBody::Merge(box MergeNode {
667                    upstream_fragment_id,
668                    ..
669                }) = merge_node
670            {
671                fragment_id_to_merge_operator_id.insert(*upstream_fragment_id, operator_id);
672            }
673            let dependencies = &node.input;
674            let dependency_ids = dependencies
675                .iter()
676                .map(|input| unique_operator_id(fragment_id, input.operator_id))
677                .collect::<Vec<_>>();
678            operator_id_to_stream_node.insert(
679                operator_id,
680                StreamNode {
681                    operator_id,
682                    fragment_id,
683                    identity,
684                    actor_ids: actor_ids.clone(),
685                    dependencies: dependency_ids,
686                },
687            );
688            for dependency in dependencies {
689                extract_stream_node_info(
690                    fragment_id,
691                    fragment_id_to_merge_operator_id,
692                    operator_id_to_stream_node,
693                    dependency,
694                    actor_ids,
695                );
696            }
697        }
698
699        // build adjacency list and hanging merge edges.
700        // hanging merge edges will be filled in the following section.
701        let mut operator_id_to_stream_node = HashMap::new();
702        let mut fragment_id_to_merge_operator_id = HashMap::new();
703        for fragment in fragments {
704            let actors = fragment.actors;
705            assert!(
706                !actors.is_empty(),
707                "fragment {} should have at least one actor",
708                fragment.id
709            );
710            let actor_ids = actors.iter().map(|actor| actor.id).collect::<HashSet<_>>();
711            let node = actors[0].node.as_ref().expect("should have stream node");
712            extract_stream_node_info(
713                fragment.id,
714                &mut fragment_id_to_merge_operator_id,
715                &mut operator_id_to_stream_node,
716                node,
717                &actor_ids,
718            );
719        }
720
721        // find root node, and fill in dispatcher edges + nodes.
722        let root_or_dispatch_nodes = find_root_nodes(&operator_id_to_stream_node);
723        let mut root_node = None;
724        for operator_id in root_or_dispatch_nodes {
725            let node = operator_id_to_stream_node.get_mut(&operator_id).unwrap();
726            let fragment_id = node.fragment_id;
727            if let Some(merge_operator_id) = fragment_id_to_merge_operator_id.get(&fragment_id) {
728                let mut dispatcher = StreamNode::new_for_dispatcher(fragment_id);
729                let operator_id_for_dispatch = dispatcher.operator_id;
730                dispatcher.dependencies.push(operator_id);
731                assert!(
732                    operator_id_to_stream_node
733                        .insert(operator_id_for_dispatch as _, dispatcher)
734                        .is_none()
735                );
736                operator_id_to_stream_node
737                    .get_mut(merge_operator_id)
738                    .unwrap()
739                    .dependencies
740                    .push(operator_id_for_dispatch as _)
741            } else {
742                root_node = Some(operator_id);
743            }
744        }
745
746        let mut dispatcher_fragment_ids = HashSet::new();
747        for dispatcher_fragment_id in fragment_id_to_merge_operator_id.keys() {
748            if job_fragment_ids.contains(dispatcher_fragment_id) {
749                dispatcher_fragment_ids.insert(*dispatcher_fragment_id);
750            }
751        }
752
753        (
754            root_node.unwrap(),
755            dispatcher_fragment_ids,
756            operator_id_to_stream_node,
757        )
758    }
759
760    pub(super) fn extract_executor_infos(
761        adjacency_list: &HashMap<OperatorId, StreamNode>,
762    ) -> (HashSet<u64>, HashMap<u64, HashSet<u64>>) {
763        let mut executor_ids = HashSet::new();
764        let mut operator_to_executor = HashMap::new();
765        for (operator_id, node) in adjacency_list {
766            assert_eq!(*operator_id, node.operator_id);
767            let operator_id = node.operator_id;
768            for actor_id in &node.actor_ids {
769                let executor_id =
770                    unique_executor_id_from_unique_operator_id(*actor_id, operator_id);
771                if node.identity != NodeBodyDiscriminants::BatchPlan
772                // FIXME(kwannoel): Add back after https://github.com/risingwavelabs/risingwave/issues/22775 is resolved.
773                && node.identity != NodeBodyDiscriminants::Merge
774                && node.identity != NodeBodyDiscriminants::Project
775                {
776                    assert!(executor_ids.insert(executor_id));
777                }
778                assert!(
779                    operator_to_executor
780                        .entry(operator_id)
781                        .or_insert_with(HashSet::new)
782                        .insert(executor_id)
783                );
784            }
785        }
786        (executor_ids, operator_to_executor)
787    }
788
789    // Do a DFS based rendering. Each node will occupy its own row.
790    // Schema:
791    // | Operator ID | Identity | Actor IDs | Metrics ... |
792    // Each node will be indented based on its depth in the graph.
793    pub(super) fn render_graph_with_metrics(
794        adjacency_list: &HashMap<u64, StreamNode>,
795        root_node: u64,
796        stats: &OperatorStats,
797        profiling_duration: &Duration,
798    ) -> Vec<ExplainAnalyzeStreamJobOutput> {
799        let profiling_duration_secs = profiling_duration.as_secs_f64();
800        let mut rows = vec![];
801        let mut stack = vec![(String::new(), true, root_node)];
802        while let Some((prefix, last_child, node_id)) = stack.pop() {
803            let Some(node) = adjacency_list.get(&node_id) else {
804                continue;
805            };
806            let is_root = node_id == root_node;
807
808            let identity_rendered = if is_root {
809                node.identity.to_string()
810            } else {
811                let connector = if last_child { "└─ " } else { "├─ " };
812                format!("{}{}{}", prefix, connector, node.identity)
813            };
814
815            let child_prefix = if is_root {
816                ""
817            } else if last_child {
818                "   "
819            } else {
820                "│  "
821            };
822            let child_prefix = format!("{}{}", prefix, child_prefix);
823
824            let stats = stats.get(&node_id);
825            let (output_rows_per_second, downstream_backpressure_ratio) = match stats {
826                Some(stats) => (
827                    Some(
828                        (stats.total_output_throughput as f64 / profiling_duration_secs)
829                            .to_string(),
830                    ),
831                    Some(
832                        (Duration::from_nanos(stats.total_output_pending_ns).as_secs_f64()
833                            / usize::max(node.actor_ids.len(), 1) as f64
834                            / profiling_duration_secs)
835                            .to_string(),
836                    ),
837                ),
838                None => (None, None),
839            };
840            let row = ExplainAnalyzeStreamJobOutput {
841                identity: identity_rendered,
842                actor_ids: node
843                    .actor_ids
844                    .iter()
845                    .sorted()
846                    .map(|id| id.to_string())
847                    .collect::<Vec<_>>()
848                    .join(","),
849                output_rows_per_second,
850                downstream_backpressure_ratio,
851            };
852            rows.push(row);
853            for (position, dependency) in node.dependencies.iter().enumerate() {
854                stack.push((child_prefix.clone(), position == 0, *dependency));
855            }
856        }
857        rows
858    }
859}
860
861mod utils {
862    use risingwave_common::operator::unique_operator_id;
863
864    use crate::handler::explain_analyze_stream_job::graph::OperatorId;
865
866    pub(super) fn operator_id_for_dispatch(fragment_id: u32) -> OperatorId {
867        unique_operator_id(fragment_id, u32::MAX as u64)
868    }
869}