risingwave_frontend/handler/
explain_analyze_stream_job.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use pgwire::pg_response::StatementType;
18use risingwave_common::types::Fields;
19use risingwave_sqlparser::ast::AnalyzeTarget;
20use tokio::time::Duration;
21
22use crate::error::Result;
23use crate::handler::explain_analyze_stream_job::graph::{
24    extract_executor_infos, extract_stream_node_infos, render_graph_with_metrics,
25};
26use crate::handler::{HandlerArgs, RwPgResponse, RwPgResponseBuilder, RwPgResponseBuilderExt};
27
28#[macro_export]
29macro_rules! debug_panic_or_warn {
30    ($($arg:tt)*) => {
31        if cfg!(debug_assertions) || cfg!(madsim) {
32            panic!($($arg)*);
33        } else {
34            tracing::warn!($($arg)*);
35        }
36    };
37}
38
39#[derive(Fields)]
40struct ExplainAnalyzeStreamJobOutput {
41    identity: String,
42    actor_ids: String,
43    output_rows_per_second: Option<String>,
44    downstream_backpressure_ratio: Option<String>,
45}
46
47pub async fn handle_explain_analyze_stream_job(
48    handler_args: HandlerArgs,
49    target: AnalyzeTarget,
50    duration_secs: Option<u64>,
51) -> Result<RwPgResponse> {
52    let profiling_duration = Duration::from_secs(duration_secs.unwrap_or(10));
53    let job_id = bind::bind_relation(&target, &handler_args)?;
54
55    let meta_client = handler_args.session.env().meta_client();
56    let fragments = net::get_fragments(meta_client, job_id).await?;
57    let fragment_parallelisms = fragments
58        .iter()
59        .map(|f| (f.id, f.actors.len()))
60        .collect::<HashMap<_, _>>();
61    let (root_node, dispatcher_fragment_ids, adjacency_list) = extract_stream_node_infos(fragments);
62    let (executor_ids, operator_to_executor) = extract_executor_infos(&adjacency_list);
63    tracing::debug!(
64        ?fragment_parallelisms,
65        ?root_node,
66        ?dispatcher_fragment_ids,
67        ?adjacency_list,
68        "explain analyze metadata"
69    );
70
71    let worker_nodes = net::list_stream_worker_nodes(handler_args.session.env()).await?;
72
73    let executor_stats = net::get_executor_stats(
74        &handler_args,
75        &worker_nodes,
76        &executor_ids,
77        &dispatcher_fragment_ids,
78        profiling_duration,
79    )
80    .await?;
81    tracing::debug!(?executor_stats, "collected executor stats");
82    let aggregated_stats = metrics::OperatorStats::aggregate(
83        operator_to_executor,
84        &executor_stats,
85        &fragment_parallelisms,
86    );
87    tracing::debug!(?aggregated_stats, "collected aggregated stats");
88
89    // Render graph with metrics
90    let rows = render_graph_with_metrics(
91        &adjacency_list,
92        root_node,
93        &aggregated_stats,
94        &profiling_duration,
95    );
96    let builder = RwPgResponseBuilder::empty(StatementType::EXPLAIN);
97    let builder = builder.rows(rows);
98    Ok(builder.into())
99}
100
101/// Binding pass, since we don't go through the binder.
102/// TODO(noel): Should this be in binder? But it may make compilation slower and doesn't require any binder logic...
103mod bind {
104    use risingwave_common::id::JobId;
105    use risingwave_sqlparser::ast::AnalyzeTarget;
106
107    use crate::Binder;
108    use crate::catalog::root_catalog::SchemaPath;
109    use crate::error::Result;
110    use crate::handler::HandlerArgs;
111
112    /// Bind the analyze target relation to its actual id.
113    pub(super) fn bind_relation(
114        target_relation: &AnalyzeTarget,
115        handler_args: &HandlerArgs,
116    ) -> Result<JobId> {
117        let job_id = match &target_relation {
118            AnalyzeTarget::Id(id) => (*id).into(),
119            AnalyzeTarget::Index(name)
120            | AnalyzeTarget::Table(name)
121            | AnalyzeTarget::Sink(name)
122            | AnalyzeTarget::MaterializedView(name) => {
123                let session = &handler_args.session;
124                let db_name = session.database();
125                let (schema_name, name) = Binder::resolve_schema_qualified_name(&db_name, name)?;
126                let search_path = session.config().search_path();
127                let user_name = &session.user_name();
128                let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
129
130                let catalog_reader = handler_args.session.env().catalog_reader();
131                let catalog = catalog_reader.read_guard();
132
133                match target_relation {
134                    AnalyzeTarget::Index(_) => {
135                        let (catalog, _schema_name) =
136                            catalog.get_any_index_by_name(&db_name, schema_path, &name)?;
137                        catalog.id.as_job_id()
138                    }
139                    AnalyzeTarget::Table(_) => {
140                        let (catalog, _schema_name) =
141                            catalog.get_any_table_by_name(&db_name, schema_path, &name)?;
142                        catalog.id.as_job_id()
143                    }
144                    AnalyzeTarget::Sink(_) => {
145                        let (catalog, _schema_name) =
146                            catalog.get_any_sink_by_name(&db_name, schema_path, &name)?;
147                        catalog.id.as_job_id()
148                    }
149                    AnalyzeTarget::MaterializedView(_) => {
150                        let (catalog, _schema_name) =
151                            catalog.get_any_table_by_name(&db_name, schema_path, &name)?;
152                        catalog.id.as_job_id()
153                    }
154                    AnalyzeTarget::Id(_) => unreachable!(),
155                }
156            }
157        };
158        Ok(job_id)
159    }
160}
161
162/// Utilities for fetching stats from CN
163mod net {
164    use std::collections::HashSet;
165
166    use risingwave_common::bail;
167    use risingwave_common::id::{FragmentId, JobId};
168    use risingwave_pb::common::WorkerNode;
169    use risingwave_pb::id::ExecutorId;
170    use risingwave_pb::meta::list_table_fragments_response::FragmentInfo;
171    use risingwave_pb::monitor_service::GetProfileStatsRequest;
172    use tokio::time::{Duration, sleep};
173
174    use crate::error::Result;
175    use crate::handler::HandlerArgs;
176    use crate::handler::explain_analyze_stream_job::metrics::ExecutorStats;
177    use crate::meta_client::FrontendMetaClient;
178    use crate::session::FrontendEnv;
179
180    pub(super) async fn list_stream_worker_nodes(env: &FrontendEnv) -> Result<Vec<WorkerNode>> {
181        let worker_nodes = env.meta_client().list_all_nodes().await?;
182        let stream_worker_nodes = worker_nodes
183            .into_iter()
184            .filter(|node| {
185                node.property
186                    .as_ref()
187                    .map(|p| p.is_streaming)
188                    .unwrap_or_else(|| false)
189            })
190            .collect::<Vec<_>>();
191        Ok(stream_worker_nodes)
192    }
193
194    // TODO(kwannoel): Only fetch the names, actor_ids and graph of the fragments
195    pub(super) async fn get_fragments(
196        meta_client: &dyn FrontendMetaClient,
197        job_id: JobId,
198    ) -> Result<Vec<FragmentInfo>> {
199        let mut fragment_map = meta_client.list_table_fragments(&[job_id]).await?;
200        let mut table_fragments = fragment_map.drain();
201        let Some((fragment_job_id, table_fragment_info)) = table_fragments.next() else {
202            bail!("table fragment of job {job_id} not found");
203        };
204        assert_eq!(
205            table_fragments.next(),
206            None,
207            "expected only at most one fragment"
208        );
209        assert_eq!(fragment_job_id, job_id);
210        Ok(table_fragment_info.fragments)
211    }
212
213    pub(super) async fn get_executor_stats(
214        handler_args: &HandlerArgs,
215        worker_nodes: &[WorkerNode],
216        executor_ids: &HashSet<ExecutorId>,
217        dispatcher_fragment_ids: &HashSet<FragmentId>,
218        profiling_duration: Duration,
219    ) -> Result<ExecutorStats> {
220        let dispatcher_fragment_ids = dispatcher_fragment_ids.iter().copied().collect::<Vec<_>>();
221        let mut initial_aggregated_stats = ExecutorStats::new();
222        for node in worker_nodes {
223            let mut compute_client = handler_args.session.env().client_pool().get(node).await?;
224            let stats = compute_client
225                .monitor_client
226                .get_profile_stats(GetProfileStatsRequest {
227                    executor_ids: executor_ids.iter().copied().collect(),
228                    dispatcher_fragment_ids: dispatcher_fragment_ids.clone(),
229                })
230                .await
231                .expect("get profiling stats failed");
232            initial_aggregated_stats.record(
233                executor_ids,
234                &dispatcher_fragment_ids,
235                &stats.into_inner(),
236            );
237        }
238        tracing::debug!(?initial_aggregated_stats, "initial aggregated stats");
239
240        sleep(profiling_duration).await;
241
242        let mut final_aggregated_stats = ExecutorStats::new();
243        for node in worker_nodes {
244            let mut compute_client = handler_args.session.env().client_pool().get(node).await?;
245            let stats = compute_client
246                .monitor_client
247                .get_profile_stats(GetProfileStatsRequest {
248                    executor_ids: executor_ids.iter().copied().collect(),
249                    dispatcher_fragment_ids: dispatcher_fragment_ids.clone(),
250                })
251                .await
252                .expect("get profiling stats failed");
253            final_aggregated_stats.record(
254                executor_ids,
255                &dispatcher_fragment_ids,
256                &stats.into_inner(),
257            );
258        }
259        tracing::debug!(?final_aggregated_stats, "final aggregated stats");
260
261        let delta_aggregated_stats = ExecutorStats::get_delta(
262            &initial_aggregated_stats,
263            &final_aggregated_stats,
264            executor_ids,
265            &dispatcher_fragment_ids,
266        );
267
268        Ok(delta_aggregated_stats)
269    }
270}
271
272/// Profiling metrics data structure and utilities
273/// We have 2 stages of metric collection:
274/// 1. Collect the stream node metrics at the **Executor** level.
275/// 2. Merge the stream node metrics into **Operator** level, avg, max, min, etc...
276mod metrics {
277    use std::collections::{HashMap, HashSet};
278
279    use risingwave_common::operator::unique_executor_id_into_parts;
280    use risingwave_pb::id::{ExecutorId, GlobalOperatorId};
281    use risingwave_pb::monitor_service::GetProfileStatsResponse;
282
283    use crate::catalog::FragmentId;
284    use crate::handler::explain_analyze_stream_job::utils::operator_id_for_dispatch;
285
286    type OperatorId = GlobalOperatorId;
287
288    #[expect(dead_code)]
289    #[derive(Default, Debug)]
290    pub(super) struct ExecutorMetrics {
291        pub executor_id: ExecutorId,
292        pub epoch: u32,
293        pub total_output_throughput: u64,
294        pub total_output_pending_ns: u64,
295    }
296
297    #[derive(Default, Debug)]
298    pub(super) struct DispatchMetrics {
299        pub fragment_id: FragmentId,
300        pub epoch: u32,
301        pub total_output_throughput: u64,
302        pub total_output_pending_ns: u64,
303    }
304
305    #[derive(Debug)]
306    pub(super) struct ExecutorStats {
307        executor_stats: HashMap<ExecutorId, ExecutorMetrics>,
308        dispatch_stats: HashMap<FragmentId, DispatchMetrics>,
309    }
310
311    impl ExecutorStats {
312        pub(super) fn new() -> Self {
313            ExecutorStats {
314                executor_stats: HashMap::new(),
315                dispatch_stats: HashMap::new(),
316            }
317        }
318
319        pub fn get(&self, executor_id: &ExecutorId) -> Option<&ExecutorMetrics> {
320            self.executor_stats.get(executor_id)
321        }
322
323        /// Record metrics for profiling
324        pub(super) fn record<'a>(
325            &mut self,
326            executor_ids: &'a HashSet<ExecutorId>,
327            dispatch_fragment_ids: &'a [FragmentId],
328            metrics: &'a GetProfileStatsResponse,
329        ) {
330            for executor_id in executor_ids {
331                let Some(total_output_throughput) =
332                    metrics.stream_node_output_row_count.get(executor_id)
333                else {
334                    continue;
335                };
336                let Some(total_output_pending_ns) = metrics
337                    .stream_node_output_blocking_duration_ns
338                    .get(executor_id)
339                else {
340                    continue;
341                };
342                let stats = ExecutorMetrics {
343                    executor_id: *executor_id,
344                    epoch: 0,
345                    total_output_throughput: *total_output_throughput,
346                    total_output_pending_ns: *total_output_pending_ns,
347                };
348                // An executor should be scheduled on a single worker node,
349                // it should not be inserted multiple times.
350                if cfg!(madsim) {
351                    // If madsim is enabled, worker nodes will share the same process.
352                    // The metrics is stored as a global object, so querying each worker node
353                    // will return the same set of executor metrics.
354                    // So we should not assert here.
355                    self.executor_stats.insert(*executor_id, stats);
356                } else {
357                    assert!(self.executor_stats.insert(*executor_id, stats).is_none());
358                }
359            }
360
361            for fragment_id in dispatch_fragment_ids {
362                let Some(total_output_throughput) =
363                    metrics.dispatch_fragment_output_row_count.get(fragment_id)
364                else {
365                    continue;
366                };
367                let Some(total_output_pending_ns) = metrics
368                    .dispatch_fragment_output_blocking_duration_ns
369                    .get(fragment_id)
370                else {
371                    continue;
372                };
373                let stats = self.dispatch_stats.entry(*fragment_id).or_default();
374                stats.fragment_id = *fragment_id;
375                stats.epoch = 0;
376                // do a sum rather than insert
377                // because dispatchers are
378                // distributed across worker nodes.
379                stats.total_output_throughput += *total_output_throughput;
380                stats.total_output_pending_ns += *total_output_pending_ns;
381            }
382        }
383
384        pub(super) fn get_delta(
385            initial: &Self,
386            end: &Self,
387            executor_ids: &HashSet<ExecutorId>,
388            dispatch_fragment_ids: &[FragmentId],
389        ) -> Self {
390            let mut delta_aggregated_stats = Self::new();
391            for executor_id in executor_ids {
392                let (actor_id, operator_id) = unique_executor_id_into_parts(*executor_id);
393                let Some(initial_stats) = initial.executor_stats.get(executor_id) else {
394                    debug_panic_or_warn!(
395                        "missing initial stats for executor {} (actor {} operator {})",
396                        executor_id,
397                        actor_id,
398                        operator_id
399                    );
400                    continue;
401                };
402                let Some(end_stats) = end.executor_stats.get(executor_id) else {
403                    debug_panic_or_warn!(
404                        "missing final stats for executor {} (actor {} operator {})",
405                        executor_id,
406                        actor_id,
407                        operator_id
408                    );
409                    continue;
410                };
411
412                let initial_throughput = initial_stats.total_output_throughput;
413                let end_throughput = end_stats.total_output_throughput;
414                let Some(delta_throughput) = end_throughput.checked_sub(initial_throughput) else {
415                    debug_panic_or_warn!(
416                        "delta throughput is negative for actor {} operator {} (initial: {}, end: {})",
417                        actor_id,
418                        operator_id,
419                        initial_throughput,
420                        end_throughput
421                    );
422                    continue;
423                };
424
425                let initial_pending_ns = initial_stats.total_output_pending_ns;
426                let end_pending_ns = end_stats.total_output_pending_ns;
427                let Some(delta_pending_ns) = end_pending_ns.checked_sub(initial_pending_ns) else {
428                    debug_panic_or_warn!(
429                        "delta pending ns is negative for actor {} operator {} (initial: {}, end: {})",
430                        actor_id,
431                        operator_id,
432                        initial_pending_ns,
433                        end_pending_ns
434                    );
435                    continue;
436                };
437
438                let delta_stats = ExecutorMetrics {
439                    executor_id: *executor_id,
440                    epoch: 0,
441                    total_output_throughput: delta_throughput,
442                    total_output_pending_ns: delta_pending_ns,
443                };
444                delta_aggregated_stats
445                    .executor_stats
446                    .insert(*executor_id, delta_stats);
447            }
448
449            for fragment_id in dispatch_fragment_ids {
450                let Some(initial_stats) = initial.dispatch_stats.get(fragment_id) else {
451                    debug_panic_or_warn!("missing initial stats for fragment {}", fragment_id);
452                    continue;
453                };
454                let Some(end_stats) = end.dispatch_stats.get(fragment_id) else {
455                    debug_panic_or_warn!("missing final stats for fragment {}", fragment_id);
456                    continue;
457                };
458
459                let initial_throughput = initial_stats.total_output_throughput;
460                let end_throughput = end_stats.total_output_throughput;
461                let Some(delta_throughput) = end_throughput.checked_sub(initial_throughput) else {
462                    debug_panic_or_warn!(
463                        "delta throughput is negative for fragment {} (initial: {}, end: {})",
464                        fragment_id,
465                        initial_throughput,
466                        end_throughput
467                    );
468                    continue;
469                };
470
471                let initial_pending_ns = initial_stats.total_output_pending_ns;
472                let end_pending_ns = end_stats.total_output_pending_ns;
473                let Some(delta_pending_ns) = end_pending_ns.checked_sub(initial_pending_ns) else {
474                    debug_panic_or_warn!(
475                        "delta pending ns is negative for fragment {} (initial: {}, end: {})",
476                        fragment_id,
477                        initial_pending_ns,
478                        end_pending_ns
479                    );
480                    continue;
481                };
482
483                let delta_stats = DispatchMetrics {
484                    fragment_id: *fragment_id,
485                    epoch: 0,
486                    total_output_throughput: delta_throughput,
487                    total_output_pending_ns: delta_pending_ns,
488                };
489                delta_aggregated_stats
490                    .dispatch_stats
491                    .insert(*fragment_id, delta_stats);
492            }
493
494            delta_aggregated_stats
495        }
496    }
497
498    #[expect(dead_code)]
499    #[derive(Debug)]
500    pub(super) struct OperatorMetrics {
501        pub operator_id: GlobalOperatorId,
502        pub epoch: u32,
503        pub total_output_throughput: u64,
504        pub total_output_pending_ns: u64,
505    }
506
507    #[derive(Debug)]
508    pub(super) struct OperatorStats {
509        inner: HashMap<GlobalOperatorId, OperatorMetrics>,
510    }
511
512    impl OperatorStats {
513        /// Aggregates executor-level stats into operator-level stats
514        pub(super) fn aggregate(
515            operator_map: HashMap<OperatorId, HashSet<ExecutorId>>,
516            executor_stats: &ExecutorStats,
517            fragment_parallelisms: &HashMap<FragmentId, usize>,
518        ) -> Self {
519            let mut operator_stats = HashMap::new();
520            'operator_loop: for (operator_id, executor_ids) in operator_map {
521                let num_executors = executor_ids.len() as u64;
522                let mut total_output_throughput = 0;
523                let mut total_output_pending_ns = 0;
524                for executor_id in executor_ids {
525                    if let Some(stats) = executor_stats.get(&executor_id) {
526                        total_output_throughput += stats.total_output_throughput;
527                        total_output_pending_ns += stats.total_output_pending_ns;
528                    } else {
529                        // skip this operator if it doesn't have executor stats for any of its executors
530                        continue 'operator_loop;
531                    }
532                }
533                let total_output_throughput = total_output_throughput;
534                let total_output_pending_ns = total_output_pending_ns / num_executors;
535
536                operator_stats.insert(
537                    operator_id,
538                    OperatorMetrics {
539                        operator_id,
540                        epoch: 0,
541                        total_output_throughput,
542                        total_output_pending_ns,
543                    },
544                );
545            }
546
547            for (fragment_id, dispatch_metrics) in &executor_stats.dispatch_stats {
548                let operator_id = operator_id_for_dispatch(*fragment_id);
549                let total_output_throughput = dispatch_metrics.total_output_throughput;
550                let Some(fragment_parallelism) = fragment_parallelisms.get(fragment_id) else {
551                    debug_panic_or_warn!(
552                        "missing fragment parallelism for fragment {}",
553                        fragment_id
554                    );
555                    continue;
556                };
557                let total_output_pending_ns =
558                    dispatch_metrics.total_output_pending_ns / *fragment_parallelism as u64;
559
560                operator_stats.insert(
561                    operator_id,
562                    OperatorMetrics {
563                        operator_id,
564                        epoch: 0,
565                        total_output_throughput,
566                        total_output_pending_ns,
567                    },
568                );
569            }
570
571            OperatorStats {
572                inner: operator_stats,
573            }
574        }
575
576        pub fn get(&self, operator_id: &OperatorId) -> Option<&OperatorMetrics> {
577            self.inner.get(operator_id)
578        }
579    }
580}
581
582/// Utilities for the stream node graph:
583/// rendering, extracting, etc.
584mod graph {
585    use std::collections::{HashMap, HashSet};
586    use std::fmt::Debug;
587    use std::time::Duration;
588
589    use itertools::Itertools;
590    use risingwave_common::operator::{
591        unique_executor_id_from_unique_operator_id, unique_operator_id,
592        unique_operator_id_into_parts,
593    };
594    use risingwave_pb::id::{ActorId, GlobalOperatorId};
595    use risingwave_pb::meta::list_table_fragments_response::FragmentInfo;
596    use risingwave_pb::stream_plan::stream_node::{NodeBody, NodeBodyDiscriminants};
597    use risingwave_pb::stream_plan::{MergeNode, StreamNode as PbStreamNode};
598
599    use crate::catalog::FragmentId;
600    use crate::handler::explain_analyze_stream_job::ExplainAnalyzeStreamJobOutput;
601    use crate::handler::explain_analyze_stream_job::metrics::OperatorStats;
602    use crate::handler::explain_analyze_stream_job::utils::operator_id_for_dispatch;
603
604    pub(super) type OperatorId = GlobalOperatorId;
605    pub(super) use risingwave_pb::id::ExecutorId;
606
607    /// This is an internal struct used ONLY for explain analyze stream job.
608    pub(super) struct StreamNode {
609        operator_id: OperatorId,
610        fragment_id: FragmentId,
611        identity: NodeBodyDiscriminants,
612        actor_ids: HashSet<ActorId>,
613        dependencies: Vec<OperatorId>,
614    }
615
616    impl Debug for StreamNode {
617        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
618            let (actor_id, operator_id) = unique_operator_id_into_parts(self.operator_id);
619            let operator_id_str = format!(
620                "{}: (actor_id: {}, operator_id: {})",
621                self.operator_id, actor_id, operator_id
622            );
623            write!(
624                f,
625                "StreamNode {{ operator_id: {}, fragment_id: {}, identity: {:?}, actor_ids: {:?}, dependencies: {:?} }}",
626                operator_id_str, self.fragment_id, self.identity, self.actor_ids, self.dependencies
627            )
628        }
629    }
630
631    impl StreamNode {
632        fn new_for_dispatcher(fragment_id: FragmentId) -> Self {
633            StreamNode {
634                operator_id: operator_id_for_dispatch(fragment_id),
635                fragment_id,
636                identity: NodeBodyDiscriminants::Exchange,
637                actor_ids: Default::default(),
638                dependencies: Default::default(),
639            }
640        }
641    }
642
643    /// Extracts the root node of the plan, as well as the adjacency list
644    pub(super) fn extract_stream_node_infos(
645        fragments: Vec<FragmentInfo>,
646    ) -> (
647        OperatorId,
648        HashSet<FragmentId>,
649        HashMap<OperatorId, StreamNode>,
650    ) {
651        let job_fragment_ids = fragments
652            .iter()
653            .map(|f| f.id)
654            .collect::<HashSet<FragmentId>>();
655
656        // Finds root nodes of the graph
657        fn find_root_nodes(stream_nodes: &HashMap<OperatorId, StreamNode>) -> HashSet<OperatorId> {
658            let mut all_nodes = stream_nodes.keys().copied().collect::<HashSet<_>>();
659            for node in stream_nodes.values() {
660                for dependency in &node.dependencies {
661                    all_nodes.remove(dependency);
662                }
663            }
664            all_nodes
665        }
666
667        // Recursively extracts stream node info, and builds an adjacency list between stream nodes
668        // and their dependencies
669        fn extract_stream_node_info(
670            fragment_id: FragmentId,
671            fragment_id_to_merge_operator_id: &mut HashMap<FragmentId, OperatorId>,
672            operator_id_to_stream_node: &mut HashMap<OperatorId, StreamNode>,
673            node: &PbStreamNode,
674            actor_ids: &HashSet<ActorId>,
675        ) {
676            let identity = node
677                .node_body
678                .as_ref()
679                .expect("should have node body")
680                .into();
681            let operator_id = unique_operator_id(fragment_id, node.operator_id);
682            if let Some(merge_node) = node.node_body.as_ref()
683                && let NodeBody::Merge(box MergeNode {
684                    upstream_fragment_id,
685                    ..
686                }) = merge_node
687            {
688                fragment_id_to_merge_operator_id.insert(*upstream_fragment_id, operator_id);
689            }
690            let dependencies = &node.input;
691            let dependency_ids = dependencies
692                .iter()
693                .map(|input| unique_operator_id(fragment_id, input.operator_id))
694                .collect::<Vec<_>>();
695            operator_id_to_stream_node.insert(
696                operator_id,
697                StreamNode {
698                    operator_id,
699                    fragment_id,
700                    identity,
701                    actor_ids: actor_ids.clone(),
702                    dependencies: dependency_ids,
703                },
704            );
705            for dependency in dependencies {
706                extract_stream_node_info(
707                    fragment_id,
708                    fragment_id_to_merge_operator_id,
709                    operator_id_to_stream_node,
710                    dependency,
711                    actor_ids,
712                );
713            }
714        }
715
716        // build adjacency list and hanging merge edges.
717        // hanging merge edges will be filled in the following section.
718        let mut operator_id_to_stream_node = HashMap::new();
719        let mut fragment_id_to_merge_operator_id = HashMap::new();
720        for fragment in fragments {
721            let actors = fragment.actors;
722            assert!(
723                !actors.is_empty(),
724                "fragment {} should have at least one actor",
725                fragment.id
726            );
727            let actor_ids = actors.iter().map(|actor| actor.id).collect::<HashSet<_>>();
728            let node = actors[0].node.as_ref().expect("should have stream node");
729            extract_stream_node_info(
730                fragment.id,
731                &mut fragment_id_to_merge_operator_id,
732                &mut operator_id_to_stream_node,
733                node,
734                &actor_ids,
735            );
736        }
737
738        // find root node, and fill in dispatcher edges + nodes.
739        let root_or_dispatch_nodes = find_root_nodes(&operator_id_to_stream_node);
740        let mut root_node = None;
741        for operator_id in root_or_dispatch_nodes {
742            let node = operator_id_to_stream_node.get_mut(&operator_id).unwrap();
743            let fragment_id = node.fragment_id;
744            if let Some(merge_operator_id) = fragment_id_to_merge_operator_id.get(&fragment_id) {
745                let mut dispatcher = StreamNode::new_for_dispatcher(fragment_id);
746                let operator_id_for_dispatch = dispatcher.operator_id;
747                dispatcher.dependencies.push(operator_id);
748                assert!(
749                    operator_id_to_stream_node
750                        .insert(operator_id_for_dispatch as _, dispatcher)
751                        .is_none()
752                );
753                operator_id_to_stream_node
754                    .get_mut(merge_operator_id)
755                    .unwrap()
756                    .dependencies
757                    .push(operator_id_for_dispatch as _)
758            } else {
759                root_node = Some(operator_id);
760            }
761        }
762
763        let mut dispatcher_fragment_ids = HashSet::new();
764        for dispatcher_fragment_id in fragment_id_to_merge_operator_id.keys() {
765            if job_fragment_ids.contains(dispatcher_fragment_id) {
766                dispatcher_fragment_ids.insert(*dispatcher_fragment_id);
767            }
768        }
769
770        (
771            root_node.unwrap(),
772            dispatcher_fragment_ids,
773            operator_id_to_stream_node,
774        )
775    }
776
777    pub(super) fn extract_executor_infos(
778        adjacency_list: &HashMap<OperatorId, StreamNode>,
779    ) -> (
780        HashSet<ExecutorId>,
781        HashMap<OperatorId, HashSet<ExecutorId>>,
782    ) {
783        let mut executor_ids = HashSet::new();
784        let mut operator_to_executor = HashMap::new();
785        for (operator_id, node) in adjacency_list {
786            assert_eq!(*operator_id, node.operator_id);
787            let operator_id = node.operator_id;
788            for actor_id in &node.actor_ids {
789                let executor_id =
790                    unique_executor_id_from_unique_operator_id(*actor_id, operator_id);
791                if node.identity != NodeBodyDiscriminants::BatchPlan
792                // FIXME(kwannoel): Add back after https://github.com/risingwavelabs/risingwave/issues/22775 is resolved.
793                && node.identity != NodeBodyDiscriminants::Merge
794                && node.identity != NodeBodyDiscriminants::Project
795                {
796                    assert!(executor_ids.insert(executor_id));
797                }
798                assert!(
799                    operator_to_executor
800                        .entry(operator_id)
801                        .or_insert_with(HashSet::new)
802                        .insert(executor_id)
803                );
804            }
805        }
806        (executor_ids, operator_to_executor)
807    }
808
809    // Do a DFS based rendering. Each node will occupy its own row.
810    // Schema:
811    // | Operator ID | Identity | Actor IDs | Metrics ... |
812    // Each node will be indented based on its depth in the graph.
813    pub(super) fn render_graph_with_metrics(
814        adjacency_list: &HashMap<OperatorId, StreamNode>,
815        root_node: OperatorId,
816        stats: &OperatorStats,
817        profiling_duration: &Duration,
818    ) -> Vec<ExplainAnalyzeStreamJobOutput> {
819        let profiling_duration_secs = profiling_duration.as_secs_f64();
820        let mut rows = vec![];
821        let mut stack = vec![(String::new(), true, root_node)];
822        while let Some((prefix, last_child, node_id)) = stack.pop() {
823            let Some(node) = adjacency_list.get(&node_id) else {
824                continue;
825            };
826            let is_root = node_id == root_node;
827
828            let identity_rendered = if is_root {
829                node.identity.to_string()
830            } else {
831                let connector = if last_child { "└─ " } else { "├─ " };
832                format!("{}{}{}", prefix, connector, node.identity)
833            };
834
835            let child_prefix = if is_root {
836                ""
837            } else if last_child {
838                "   "
839            } else {
840                "│  "
841            };
842            let child_prefix = format!("{}{}", prefix, child_prefix);
843
844            let stats = stats.get(&node_id);
845            let (output_rows_per_second, downstream_backpressure_ratio) = match stats {
846                Some(stats) => (
847                    Some(
848                        (stats.total_output_throughput as f64 / profiling_duration_secs)
849                            .to_string(),
850                    ),
851                    Some(
852                        (Duration::from_nanos(stats.total_output_pending_ns).as_secs_f64()
853                            / usize::max(node.actor_ids.len(), 1) as f64
854                            / profiling_duration_secs)
855                            .to_string(),
856                    ),
857                ),
858                None => (None, None),
859            };
860            let row = ExplainAnalyzeStreamJobOutput {
861                identity: identity_rendered,
862                actor_ids: node
863                    .actor_ids
864                    .iter()
865                    .sorted()
866                    .map(|id| id.to_string())
867                    .collect::<Vec<_>>()
868                    .join(","),
869                output_rows_per_second,
870                downstream_backpressure_ratio,
871            };
872            rows.push(row);
873            for (position, dependency) in node.dependencies.iter().enumerate() {
874                stack.push((child_prefix.clone(), position == 0, *dependency));
875            }
876        }
877        rows
878    }
879}
880
881mod utils {
882    use risingwave_common::operator::unique_operator_id;
883    use risingwave_pb::id::GlobalOperatorId;
884
885    use crate::catalog::FragmentId;
886
887    pub(super) fn operator_id_for_dispatch(fragment_id: FragmentId) -> GlobalOperatorId {
888        unique_operator_id(fragment_id, u32::MAX)
889    }
890}