risingwave_frontend/handler/
explain_analyze_stream_job.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use pgwire::pg_response::StatementType;
18use risingwave_common::types::Fields;
19use risingwave_sqlparser::ast::AnalyzeTarget;
20use tokio::time::Duration;
21
22use crate::error::Result;
23use crate::handler::explain_analyze_stream_job::graph::{
24    extract_executor_infos, extract_stream_node_infos, render_graph_with_metrics,
25};
26use crate::handler::{HandlerArgs, RwPgResponse, RwPgResponseBuilder, RwPgResponseBuilderExt};
27
28#[macro_export]
29macro_rules! debug_panic_or_warn {
30    ($($arg:tt)*) => {
31        if cfg!(debug_assertions) || cfg!(madsim) {
32            panic!($($arg)*);
33        } else {
34            tracing::warn!($($arg)*);
35        }
36    };
37}
38
39#[derive(Fields)]
40struct ExplainAnalyzeStreamJobOutput {
41    identity: String,
42    actor_ids: String,
43    output_rows_per_second: Option<String>,
44    downstream_backpressure_ratio: Option<String>,
45}
46
47pub async fn handle_explain_analyze_stream_job(
48    handler_args: HandlerArgs,
49    target: AnalyzeTarget,
50    duration_secs: Option<u64>,
51) -> Result<RwPgResponse> {
52    let profiling_duration = Duration::from_secs(duration_secs.unwrap_or(10));
53    let job_id = bind::bind_relation(&target, &handler_args)?;
54
55    let meta_client = handler_args.session.env().meta_client();
56    let fragments = net::get_fragments(meta_client, job_id.into()).await?;
57    let fragment_parallelisms = fragments
58        .iter()
59        .map(|f| (f.id, f.actors.len()))
60        .collect::<HashMap<_, _>>();
61    let (root_node, dispatcher_fragment_ids, adjacency_list) = extract_stream_node_infos(fragments);
62    let (executor_ids, operator_to_executor) = extract_executor_infos(&adjacency_list);
63    tracing::debug!(
64        ?fragment_parallelisms,
65        ?root_node,
66        ?dispatcher_fragment_ids,
67        ?adjacency_list,
68        "explain analyze metadata"
69    );
70
71    let worker_nodes = net::list_stream_worker_nodes(handler_args.session.env()).await?;
72
73    let executor_stats = net::get_executor_stats(
74        &handler_args,
75        &worker_nodes,
76        &executor_ids,
77        &dispatcher_fragment_ids,
78        profiling_duration,
79    )
80    .await?;
81    tracing::debug!(?executor_stats, "collected executor stats");
82    let aggregated_stats = metrics::OperatorStats::aggregate(
83        operator_to_executor,
84        &executor_stats,
85        &fragment_parallelisms,
86    );
87    tracing::debug!(?aggregated_stats, "collected aggregated stats");
88
89    // Render graph with metrics
90    let rows = render_graph_with_metrics(
91        &adjacency_list,
92        root_node,
93        &aggregated_stats,
94        &profiling_duration,
95    );
96    let builder = RwPgResponseBuilder::empty(StatementType::EXPLAIN);
97    let builder = builder.rows(rows);
98    Ok(builder.into())
99}
100
101/// Binding pass, since we don't go through the binder.
102/// TODO(noel): Should this be in binder? But it may make compilation slower and doesn't require any binder logic...
103mod bind {
104    use risingwave_sqlparser::ast::AnalyzeTarget;
105
106    use crate::Binder;
107    use crate::catalog::root_catalog::SchemaPath;
108    use crate::error::Result;
109    use crate::handler::HandlerArgs;
110
111    /// Bind the analyze target relation to its actual id.
112    pub(super) fn bind_relation(
113        target_relation: &AnalyzeTarget,
114        handler_args: &HandlerArgs,
115    ) -> Result<u32> {
116        let job_id = match &target_relation {
117            AnalyzeTarget::Id(id) => *id,
118            AnalyzeTarget::Index(name)
119            | AnalyzeTarget::Table(name)
120            | AnalyzeTarget::Sink(name)
121            | AnalyzeTarget::MaterializedView(name) => {
122                let session = &handler_args.session;
123                let db_name = session.database();
124                let (schema_name, name) = Binder::resolve_schema_qualified_name(&db_name, name)?;
125                let search_path = session.config().search_path();
126                let user_name = &session.user_name();
127                let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
128
129                let catalog_reader = handler_args.session.env().catalog_reader();
130                let catalog = catalog_reader.read_guard();
131
132                match target_relation {
133                    AnalyzeTarget::Index(_) => {
134                        let (catalog, _schema_name) =
135                            catalog.get_any_index_by_name(&db_name, schema_path, &name)?;
136                        catalog.id.index_id
137                    }
138                    AnalyzeTarget::Table(_) => {
139                        let (catalog, _schema_name) =
140                            catalog.get_any_table_by_name(&db_name, schema_path, &name)?;
141                        catalog.id.as_raw_id()
142                    }
143                    AnalyzeTarget::Sink(_) => {
144                        let (catalog, _schema_name) =
145                            catalog.get_any_sink_by_name(&db_name, schema_path, &name)?;
146                        catalog.id.sink_id
147                    }
148                    AnalyzeTarget::MaterializedView(_) => {
149                        let (catalog, _schema_name) =
150                            catalog.get_any_table_by_name(&db_name, schema_path, &name)?;
151                        catalog.id.as_raw_id()
152                    }
153                    AnalyzeTarget::Id(_) => unreachable!(),
154                }
155            }
156        };
157        Ok(job_id)
158    }
159}
160
161/// Utilities for fetching stats from CN
162mod net {
163    use std::collections::HashSet;
164
165    use risingwave_pb::common::WorkerNode;
166    use risingwave_pb::id::JobId;
167    use risingwave_pb::meta::list_table_fragments_response::FragmentInfo;
168    use risingwave_pb::monitor_service::GetProfileStatsRequest;
169    use tokio::time::{Duration, sleep};
170
171    use crate::error::Result;
172    use crate::handler::HandlerArgs;
173    use crate::handler::explain_analyze_stream_job::graph::ExecutorId;
174    use crate::handler::explain_analyze_stream_job::metrics::ExecutorStats;
175    use crate::meta_client::FrontendMetaClient;
176    use crate::session::FrontendEnv;
177
178    pub(super) async fn list_stream_worker_nodes(env: &FrontendEnv) -> Result<Vec<WorkerNode>> {
179        let worker_nodes = env.meta_client().list_all_nodes().await?;
180        let stream_worker_nodes = worker_nodes
181            .into_iter()
182            .filter(|node| {
183                node.property
184                    .as_ref()
185                    .map(|p| p.is_streaming)
186                    .unwrap_or_else(|| false)
187            })
188            .collect::<Vec<_>>();
189        Ok(stream_worker_nodes)
190    }
191
192    // TODO(kwannoel): Only fetch the names, actor_ids and graph of the fragments
193    pub(super) async fn get_fragments(
194        meta_client: &dyn FrontendMetaClient,
195        job_id: JobId,
196    ) -> Result<Vec<FragmentInfo>> {
197        let mut fragment_map = meta_client.list_table_fragments(&[job_id]).await?;
198        assert_eq!(fragment_map.len(), 1, "expected only one fragment");
199        let (fragment_job_id, table_fragment_info) = fragment_map.drain().next().unwrap();
200        assert_eq!(fragment_job_id, job_id);
201        Ok(table_fragment_info.fragments)
202    }
203
204    pub(super) async fn get_executor_stats(
205        handler_args: &HandlerArgs,
206        worker_nodes: &[WorkerNode],
207        executor_ids: &HashSet<ExecutorId>,
208        dispatcher_fragment_ids: &HashSet<u32>,
209        profiling_duration: Duration,
210    ) -> Result<ExecutorStats> {
211        let dispatcher_fragment_ids = dispatcher_fragment_ids.iter().copied().collect::<Vec<_>>();
212        let mut initial_aggregated_stats = ExecutorStats::new();
213        for node in worker_nodes {
214            let mut compute_client = handler_args.session.env().client_pool().get(node).await?;
215            let stats = compute_client
216                .monitor_client
217                .get_profile_stats(GetProfileStatsRequest {
218                    executor_ids: executor_ids.iter().copied().collect(),
219                    dispatcher_fragment_ids: dispatcher_fragment_ids.clone(),
220                })
221                .await
222                .expect("get profiling stats failed");
223            initial_aggregated_stats.record(
224                executor_ids,
225                &dispatcher_fragment_ids,
226                &stats.into_inner(),
227            );
228        }
229        tracing::debug!(?initial_aggregated_stats, "initial aggregated stats");
230
231        sleep(profiling_duration).await;
232
233        let mut final_aggregated_stats = ExecutorStats::new();
234        for node in worker_nodes {
235            let mut compute_client = handler_args.session.env().client_pool().get(node).await?;
236            let stats = compute_client
237                .monitor_client
238                .get_profile_stats(GetProfileStatsRequest {
239                    executor_ids: executor_ids.iter().copied().collect(),
240                    dispatcher_fragment_ids: dispatcher_fragment_ids.clone(),
241                })
242                .await
243                .expect("get profiling stats failed");
244            final_aggregated_stats.record(
245                executor_ids,
246                &dispatcher_fragment_ids,
247                &stats.into_inner(),
248            );
249        }
250        tracing::debug!(?final_aggregated_stats, "final aggregated stats");
251
252        let delta_aggregated_stats = ExecutorStats::get_delta(
253            &initial_aggregated_stats,
254            &final_aggregated_stats,
255            executor_ids,
256            &dispatcher_fragment_ids,
257        );
258
259        Ok(delta_aggregated_stats)
260    }
261}
262
263/// Profiling metrics data structure and utilities
264/// We have 2 stages of metric collection:
265/// 1. Collect the stream node metrics at the **Executor** level.
266/// 2. Merge the stream node metrics into **Operator** level, avg, max, min, etc...
267mod metrics {
268    use std::collections::{HashMap, HashSet};
269
270    use risingwave_common::operator::unique_executor_id_into_parts;
271    use risingwave_pb::monitor_service::GetProfileStatsResponse;
272
273    use crate::catalog::FragmentId;
274    use crate::handler::explain_analyze_stream_job::graph::{ExecutorId, OperatorId};
275    use crate::handler::explain_analyze_stream_job::utils::operator_id_for_dispatch;
276
277    #[expect(dead_code)]
278    #[derive(Default, Debug)]
279    pub(super) struct ExecutorMetrics {
280        pub executor_id: ExecutorId,
281        pub epoch: u32,
282        pub total_output_throughput: u64,
283        pub total_output_pending_ns: u64,
284    }
285
286    #[derive(Default, Debug)]
287    pub(super) struct DispatchMetrics {
288        pub fragment_id: FragmentId,
289        pub epoch: u32,
290        pub total_output_throughput: u64,
291        pub total_output_pending_ns: u64,
292    }
293
294    #[derive(Debug)]
295    pub(super) struct ExecutorStats {
296        executor_stats: HashMap<ExecutorId, ExecutorMetrics>,
297        dispatch_stats: HashMap<FragmentId, DispatchMetrics>,
298    }
299
300    impl ExecutorStats {
301        pub(super) fn new() -> Self {
302            ExecutorStats {
303                executor_stats: HashMap::new(),
304                dispatch_stats: HashMap::new(),
305            }
306        }
307
308        pub fn get(&self, executor_id: &ExecutorId) -> Option<&ExecutorMetrics> {
309            self.executor_stats.get(executor_id)
310        }
311
312        /// Record metrics for profiling
313        pub(super) fn record<'a>(
314            &mut self,
315            executor_ids: &'a HashSet<ExecutorId>,
316            dispatch_fragment_ids: &'a [FragmentId],
317            metrics: &'a GetProfileStatsResponse,
318        ) {
319            for executor_id in executor_ids {
320                let Some(total_output_throughput) =
321                    metrics.stream_node_output_row_count.get(executor_id)
322                else {
323                    continue;
324                };
325                let Some(total_output_pending_ns) = metrics
326                    .stream_node_output_blocking_duration_ns
327                    .get(executor_id)
328                else {
329                    continue;
330                };
331                let stats = ExecutorMetrics {
332                    executor_id: *executor_id,
333                    epoch: 0,
334                    total_output_throughput: *total_output_throughput,
335                    total_output_pending_ns: *total_output_pending_ns,
336                };
337                // An executor should be scheduled on a single worker node,
338                // it should not be inserted multiple times.
339                if cfg!(madsim) {
340                    // If madsim is enabled, worker nodes will share the same process.
341                    // The metrics is stored as a global object, so querying each worker node
342                    // will return the same set of executor metrics.
343                    // So we should not assert here.
344                    self.executor_stats.insert(*executor_id, stats);
345                } else {
346                    assert!(self.executor_stats.insert(*executor_id, stats).is_none());
347                }
348            }
349
350            for fragment_id in dispatch_fragment_ids {
351                let Some(total_output_throughput) =
352                    metrics.dispatch_fragment_output_row_count.get(fragment_id)
353                else {
354                    continue;
355                };
356                let Some(total_output_pending_ns) = metrics
357                    .dispatch_fragment_output_blocking_duration_ns
358                    .get(fragment_id)
359                else {
360                    continue;
361                };
362                let stats = self.dispatch_stats.entry(*fragment_id).or_default();
363                stats.fragment_id = *fragment_id;
364                stats.epoch = 0;
365                // do a sum rather than insert
366                // because dispatchers are
367                // distributed across worker nodes.
368                stats.total_output_throughput += *total_output_throughput;
369                stats.total_output_pending_ns += *total_output_pending_ns;
370            }
371        }
372
373        pub(super) fn get_delta(
374            initial: &Self,
375            end: &Self,
376            executor_ids: &HashSet<ExecutorId>,
377            dispatch_fragment_ids: &[FragmentId],
378        ) -> Self {
379            let mut delta_aggregated_stats = Self::new();
380            for executor_id in executor_ids {
381                let (actor_id, operator_id) = unique_executor_id_into_parts(*executor_id);
382                let Some(initial_stats) = initial.executor_stats.get(executor_id) else {
383                    debug_panic_or_warn!(
384                        "missing initial stats for executor {} (actor {} operator {})",
385                        executor_id,
386                        actor_id,
387                        operator_id
388                    );
389                    continue;
390                };
391                let Some(end_stats) = end.executor_stats.get(executor_id) else {
392                    debug_panic_or_warn!(
393                        "missing final stats for executor {} (actor {} operator {})",
394                        executor_id,
395                        actor_id,
396                        operator_id
397                    );
398                    continue;
399                };
400
401                let initial_throughput = initial_stats.total_output_throughput;
402                let end_throughput = end_stats.total_output_throughput;
403                let Some(delta_throughput) = end_throughput.checked_sub(initial_throughput) else {
404                    debug_panic_or_warn!(
405                        "delta throughput is negative for actor {} operator {} (initial: {}, end: {})",
406                        actor_id,
407                        operator_id,
408                        initial_throughput,
409                        end_throughput
410                    );
411                    continue;
412                };
413
414                let initial_pending_ns = initial_stats.total_output_pending_ns;
415                let end_pending_ns = end_stats.total_output_pending_ns;
416                let Some(delta_pending_ns) = end_pending_ns.checked_sub(initial_pending_ns) else {
417                    debug_panic_or_warn!(
418                        "delta pending ns is negative for actor {} operator {} (initial: {}, end: {})",
419                        actor_id,
420                        operator_id,
421                        initial_pending_ns,
422                        end_pending_ns
423                    );
424                    continue;
425                };
426
427                let delta_stats = ExecutorMetrics {
428                    executor_id: *executor_id,
429                    epoch: 0,
430                    total_output_throughput: delta_throughput,
431                    total_output_pending_ns: delta_pending_ns,
432                };
433                delta_aggregated_stats
434                    .executor_stats
435                    .insert(*executor_id, delta_stats);
436            }
437
438            for fragment_id in dispatch_fragment_ids {
439                let Some(initial_stats) = initial.dispatch_stats.get(fragment_id) else {
440                    debug_panic_or_warn!("missing initial stats for fragment {}", fragment_id);
441                    continue;
442                };
443                let Some(end_stats) = end.dispatch_stats.get(fragment_id) else {
444                    debug_panic_or_warn!("missing final stats for fragment {}", fragment_id);
445                    continue;
446                };
447
448                let initial_throughput = initial_stats.total_output_throughput;
449                let end_throughput = end_stats.total_output_throughput;
450                let Some(delta_throughput) = end_throughput.checked_sub(initial_throughput) else {
451                    debug_panic_or_warn!(
452                        "delta throughput is negative for fragment {} (initial: {}, end: {})",
453                        fragment_id,
454                        initial_throughput,
455                        end_throughput
456                    );
457                    continue;
458                };
459
460                let initial_pending_ns = initial_stats.total_output_pending_ns;
461                let end_pending_ns = end_stats.total_output_pending_ns;
462                let Some(delta_pending_ns) = end_pending_ns.checked_sub(initial_pending_ns) else {
463                    debug_panic_or_warn!(
464                        "delta pending ns is negative for fragment {} (initial: {}, end: {})",
465                        fragment_id,
466                        initial_pending_ns,
467                        end_pending_ns
468                    );
469                    continue;
470                };
471
472                let delta_stats = DispatchMetrics {
473                    fragment_id: *fragment_id,
474                    epoch: 0,
475                    total_output_throughput: delta_throughput,
476                    total_output_pending_ns: delta_pending_ns,
477                };
478                delta_aggregated_stats
479                    .dispatch_stats
480                    .insert(*fragment_id, delta_stats);
481            }
482
483            delta_aggregated_stats
484        }
485    }
486
487    #[expect(dead_code)]
488    #[derive(Debug)]
489    pub(super) struct OperatorMetrics {
490        pub operator_id: OperatorId,
491        pub epoch: u32,
492        pub total_output_throughput: u64,
493        pub total_output_pending_ns: u64,
494    }
495
496    #[derive(Debug)]
497    pub(super) struct OperatorStats {
498        inner: HashMap<OperatorId, OperatorMetrics>,
499    }
500
501    impl OperatorStats {
502        /// Aggregates executor-level stats into operator-level stats
503        pub(super) fn aggregate(
504            operator_map: HashMap<OperatorId, HashSet<ExecutorId>>,
505            executor_stats: &ExecutorStats,
506            fragment_parallelisms: &HashMap<FragmentId, usize>,
507        ) -> Self {
508            let mut operator_stats = HashMap::new();
509            'operator_loop: for (operator_id, executor_ids) in operator_map {
510                let num_executors = executor_ids.len() as u64;
511                let mut total_output_throughput = 0;
512                let mut total_output_pending_ns = 0;
513                for executor_id in executor_ids {
514                    if let Some(stats) = executor_stats.get(&executor_id) {
515                        total_output_throughput += stats.total_output_throughput;
516                        total_output_pending_ns += stats.total_output_pending_ns;
517                    } else {
518                        // skip this operator if it doesn't have executor stats for any of its executors
519                        continue 'operator_loop;
520                    }
521                }
522                let total_output_throughput = total_output_throughput;
523                let total_output_pending_ns = total_output_pending_ns / num_executors;
524
525                operator_stats.insert(
526                    operator_id,
527                    OperatorMetrics {
528                        operator_id,
529                        epoch: 0,
530                        total_output_throughput,
531                        total_output_pending_ns,
532                    },
533                );
534            }
535
536            for (fragment_id, dispatch_metrics) in &executor_stats.dispatch_stats {
537                let operator_id = operator_id_for_dispatch(*fragment_id);
538                let total_output_throughput = dispatch_metrics.total_output_throughput;
539                let Some(fragment_parallelism) = fragment_parallelisms.get(fragment_id) else {
540                    debug_panic_or_warn!(
541                        "missing fragment parallelism for fragment {}",
542                        fragment_id
543                    );
544                    continue;
545                };
546                let total_output_pending_ns =
547                    dispatch_metrics.total_output_pending_ns / *fragment_parallelism as u64;
548
549                operator_stats.insert(
550                    operator_id,
551                    OperatorMetrics {
552                        operator_id,
553                        epoch: 0,
554                        total_output_throughput,
555                        total_output_pending_ns,
556                    },
557                );
558            }
559
560            OperatorStats {
561                inner: operator_stats,
562            }
563        }
564
565        pub fn get(&self, operator_id: &OperatorId) -> Option<&OperatorMetrics> {
566            self.inner.get(operator_id)
567        }
568    }
569}
570
571/// Utilities for the stream node graph:
572/// rendering, extracting, etc.
573mod graph {
574    use std::collections::{HashMap, HashSet};
575    use std::fmt::Debug;
576    use std::time::Duration;
577
578    use itertools::Itertools;
579    use risingwave_common::operator::{
580        unique_executor_id_from_unique_operator_id, unique_operator_id,
581        unique_operator_id_into_parts,
582    };
583    use risingwave_pb::meta::list_table_fragments_response::FragmentInfo;
584    use risingwave_pb::stream_plan::stream_node::{NodeBody, NodeBodyDiscriminants};
585    use risingwave_pb::stream_plan::{MergeNode, StreamNode as PbStreamNode};
586
587    use crate::catalog::FragmentId;
588    use crate::handler::explain_analyze_stream_job::ExplainAnalyzeStreamJobOutput;
589    use crate::handler::explain_analyze_stream_job::metrics::OperatorStats;
590    use crate::handler::explain_analyze_stream_job::utils::operator_id_for_dispatch;
591    pub(super) type OperatorId = u64;
592    pub(super) type ExecutorId = u64;
593
594    /// This is an internal struct used ONLY for explain analyze stream job.
595    pub(super) struct StreamNode {
596        operator_id: OperatorId,
597        fragment_id: u32,
598        identity: NodeBodyDiscriminants,
599        actor_ids: HashSet<u32>,
600        dependencies: Vec<u64>,
601    }
602
603    impl Debug for StreamNode {
604        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
605            let (actor_id, operator_id) = unique_operator_id_into_parts(self.operator_id);
606            let operator_id_str = format!(
607                "{}: (actor_id: {}, operator_id: {})",
608                self.operator_id, actor_id, operator_id
609            );
610            write!(
611                f,
612                "StreamNode {{ operator_id: {}, fragment_id: {}, identity: {:?}, actor_ids: {:?}, dependencies: {:?} }}",
613                operator_id_str, self.fragment_id, self.identity, self.actor_ids, self.dependencies
614            )
615        }
616    }
617
618    impl StreamNode {
619        fn new_for_dispatcher(fragment_id: u32) -> Self {
620            StreamNode {
621                operator_id: operator_id_for_dispatch(fragment_id),
622                fragment_id,
623                identity: NodeBodyDiscriminants::Exchange,
624                actor_ids: Default::default(),
625                dependencies: Default::default(),
626            }
627        }
628    }
629
630    /// Extracts the root node of the plan, as well as the adjacency list
631    pub(super) fn extract_stream_node_infos(
632        fragments: Vec<FragmentInfo>,
633    ) -> (
634        OperatorId,
635        HashSet<FragmentId>,
636        HashMap<OperatorId, StreamNode>,
637    ) {
638        let job_fragment_ids = fragments.iter().map(|f| f.id).collect::<HashSet<_>>();
639
640        // Finds root nodes of the graph
641        fn find_root_nodes(stream_nodes: &HashMap<u64, StreamNode>) -> HashSet<u64> {
642            let mut all_nodes = stream_nodes.keys().copied().collect::<HashSet<_>>();
643            for node in stream_nodes.values() {
644                for dependency in &node.dependencies {
645                    all_nodes.remove(dependency);
646                }
647            }
648            all_nodes
649        }
650
651        // Recursively extracts stream node info, and builds an adjacency list between stream nodes
652        // and their dependencies
653        fn extract_stream_node_info(
654            fragment_id: u32,
655            fragment_id_to_merge_operator_id: &mut HashMap<u32, OperatorId>,
656            operator_id_to_stream_node: &mut HashMap<OperatorId, StreamNode>,
657            node: &PbStreamNode,
658            actor_ids: &HashSet<u32>,
659        ) {
660            let identity = node
661                .node_body
662                .as_ref()
663                .expect("should have node body")
664                .into();
665            let operator_id = unique_operator_id(fragment_id, node.operator_id);
666            if let Some(merge_node) = node.node_body.as_ref()
667                && let NodeBody::Merge(box MergeNode {
668                    upstream_fragment_id,
669                    ..
670                }) = merge_node
671            {
672                fragment_id_to_merge_operator_id.insert(*upstream_fragment_id, operator_id);
673            }
674            let dependencies = &node.input;
675            let dependency_ids = dependencies
676                .iter()
677                .map(|input| unique_operator_id(fragment_id, input.operator_id))
678                .collect::<Vec<_>>();
679            operator_id_to_stream_node.insert(
680                operator_id,
681                StreamNode {
682                    operator_id,
683                    fragment_id,
684                    identity,
685                    actor_ids: actor_ids.clone(),
686                    dependencies: dependency_ids,
687                },
688            );
689            for dependency in dependencies {
690                extract_stream_node_info(
691                    fragment_id,
692                    fragment_id_to_merge_operator_id,
693                    operator_id_to_stream_node,
694                    dependency,
695                    actor_ids,
696                );
697            }
698        }
699
700        // build adjacency list and hanging merge edges.
701        // hanging merge edges will be filled in the following section.
702        let mut operator_id_to_stream_node = HashMap::new();
703        let mut fragment_id_to_merge_operator_id = HashMap::new();
704        for fragment in fragments {
705            let actors = fragment.actors;
706            assert!(
707                !actors.is_empty(),
708                "fragment {} should have at least one actor",
709                fragment.id
710            );
711            let actor_ids = actors.iter().map(|actor| actor.id).collect::<HashSet<_>>();
712            let node = actors[0].node.as_ref().expect("should have stream node");
713            extract_stream_node_info(
714                fragment.id,
715                &mut fragment_id_to_merge_operator_id,
716                &mut operator_id_to_stream_node,
717                node,
718                &actor_ids,
719            );
720        }
721
722        // find root node, and fill in dispatcher edges + nodes.
723        let root_or_dispatch_nodes = find_root_nodes(&operator_id_to_stream_node);
724        let mut root_node = None;
725        for operator_id in root_or_dispatch_nodes {
726            let node = operator_id_to_stream_node.get_mut(&operator_id).unwrap();
727            let fragment_id = node.fragment_id;
728            if let Some(merge_operator_id) = fragment_id_to_merge_operator_id.get(&fragment_id) {
729                let mut dispatcher = StreamNode::new_for_dispatcher(fragment_id);
730                let operator_id_for_dispatch = dispatcher.operator_id;
731                dispatcher.dependencies.push(operator_id);
732                assert!(
733                    operator_id_to_stream_node
734                        .insert(operator_id_for_dispatch as _, dispatcher)
735                        .is_none()
736                );
737                operator_id_to_stream_node
738                    .get_mut(merge_operator_id)
739                    .unwrap()
740                    .dependencies
741                    .push(operator_id_for_dispatch as _)
742            } else {
743                root_node = Some(operator_id);
744            }
745        }
746
747        let mut dispatcher_fragment_ids = HashSet::new();
748        for dispatcher_fragment_id in fragment_id_to_merge_operator_id.keys() {
749            if job_fragment_ids.contains(dispatcher_fragment_id) {
750                dispatcher_fragment_ids.insert(*dispatcher_fragment_id);
751            }
752        }
753
754        (
755            root_node.unwrap(),
756            dispatcher_fragment_ids,
757            operator_id_to_stream_node,
758        )
759    }
760
761    pub(super) fn extract_executor_infos(
762        adjacency_list: &HashMap<OperatorId, StreamNode>,
763    ) -> (HashSet<u64>, HashMap<u64, HashSet<u64>>) {
764        let mut executor_ids = HashSet::new();
765        let mut operator_to_executor = HashMap::new();
766        for (operator_id, node) in adjacency_list {
767            assert_eq!(*operator_id, node.operator_id);
768            let operator_id = node.operator_id;
769            for actor_id in &node.actor_ids {
770                let executor_id =
771                    unique_executor_id_from_unique_operator_id(*actor_id, operator_id);
772                if node.identity != NodeBodyDiscriminants::BatchPlan
773                // FIXME(kwannoel): Add back after https://github.com/risingwavelabs/risingwave/issues/22775 is resolved.
774                && node.identity != NodeBodyDiscriminants::Merge
775                && node.identity != NodeBodyDiscriminants::Project
776                {
777                    assert!(executor_ids.insert(executor_id));
778                }
779                assert!(
780                    operator_to_executor
781                        .entry(operator_id)
782                        .or_insert_with(HashSet::new)
783                        .insert(executor_id)
784                );
785            }
786        }
787        (executor_ids, operator_to_executor)
788    }
789
790    // Do a DFS based rendering. Each node will occupy its own row.
791    // Schema:
792    // | Operator ID | Identity | Actor IDs | Metrics ... |
793    // Each node will be indented based on its depth in the graph.
794    pub(super) fn render_graph_with_metrics(
795        adjacency_list: &HashMap<u64, StreamNode>,
796        root_node: u64,
797        stats: &OperatorStats,
798        profiling_duration: &Duration,
799    ) -> Vec<ExplainAnalyzeStreamJobOutput> {
800        let profiling_duration_secs = profiling_duration.as_secs_f64();
801        let mut rows = vec![];
802        let mut stack = vec![(String::new(), true, root_node)];
803        while let Some((prefix, last_child, node_id)) = stack.pop() {
804            let Some(node) = adjacency_list.get(&node_id) else {
805                continue;
806            };
807            let is_root = node_id == root_node;
808
809            let identity_rendered = if is_root {
810                node.identity.to_string()
811            } else {
812                let connector = if last_child { "└─ " } else { "├─ " };
813                format!("{}{}{}", prefix, connector, node.identity)
814            };
815
816            let child_prefix = if is_root {
817                ""
818            } else if last_child {
819                "   "
820            } else {
821                "│  "
822            };
823            let child_prefix = format!("{}{}", prefix, child_prefix);
824
825            let stats = stats.get(&node_id);
826            let (output_rows_per_second, downstream_backpressure_ratio) = match stats {
827                Some(stats) => (
828                    Some(
829                        (stats.total_output_throughput as f64 / profiling_duration_secs)
830                            .to_string(),
831                    ),
832                    Some(
833                        (Duration::from_nanos(stats.total_output_pending_ns).as_secs_f64()
834                            / usize::max(node.actor_ids.len(), 1) as f64
835                            / profiling_duration_secs)
836                            .to_string(),
837                    ),
838                ),
839                None => (None, None),
840            };
841            let row = ExplainAnalyzeStreamJobOutput {
842                identity: identity_rendered,
843                actor_ids: node
844                    .actor_ids
845                    .iter()
846                    .sorted()
847                    .map(|id| id.to_string())
848                    .collect::<Vec<_>>()
849                    .join(","),
850                output_rows_per_second,
851                downstream_backpressure_ratio,
852            };
853            rows.push(row);
854            for (position, dependency) in node.dependencies.iter().enumerate() {
855                stack.push((child_prefix.clone(), position == 0, *dependency));
856            }
857        }
858        rows
859    }
860}
861
862mod utils {
863    use risingwave_common::operator::unique_operator_id;
864
865    use crate::handler::explain_analyze_stream_job::graph::OperatorId;
866
867    pub(super) fn operator_id_for_dispatch(fragment_id: u32) -> OperatorId {
868        unique_operator_id(fragment_id, u32::MAX as u64)
869    }
870}