risingwave_frontend/handler/
explain_analyze_stream_job.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use pgwire::pg_response::StatementType;
18use risingwave_common::types::Fields;
19use risingwave_sqlparser::ast::AnalyzeTarget;
20use tokio::time::Duration;
21
22use crate::error::Result;
23use crate::handler::explain_analyze_stream_job::graph::{
24    extract_executor_infos, extract_stream_node_infos, render_graph_with_metrics,
25};
26use crate::handler::{HandlerArgs, RwPgResponse, RwPgResponseBuilder, RwPgResponseBuilderExt};
27
28#[macro_export]
29macro_rules! debug_panic_or_warn {
30    ($($arg:tt)*) => {
31        if cfg!(debug_assertions) || cfg!(madsim) {
32            panic!($($arg)*);
33        } else {
34            tracing::warn!($($arg)*);
35        }
36    };
37}
38
39#[derive(Fields)]
40struct ExplainAnalyzeStreamJobOutput {
41    identity: String,
42    actor_ids: String,
43    output_rows_per_second: Option<String>,
44    downstream_backpressure_ratio: Option<String>,
45}
46
47pub async fn handle_explain_analyze_stream_job(
48    handler_args: HandlerArgs,
49    target: AnalyzeTarget,
50    duration_secs: Option<u64>,
51) -> Result<RwPgResponse> {
52    let profiling_duration = Duration::from_secs(duration_secs.unwrap_or(10));
53    let job_id = bind::bind_relation(&target, &handler_args)?;
54
55    let meta_client = handler_args.session.env().meta_client();
56    let fragments = net::get_fragments(meta_client, job_id).await?;
57    let fragment_parallelisms = fragments
58        .iter()
59        .map(|f| (f.id, f.actors.len()))
60        .collect::<HashMap<_, _>>();
61    let (root_node, dispatcher_fragment_ids, adjacency_list) = extract_stream_node_infos(fragments);
62    let (executor_ids, operator_to_executor) = extract_executor_infos(&adjacency_list);
63    tracing::debug!(
64        ?fragment_parallelisms,
65        ?root_node,
66        ?dispatcher_fragment_ids,
67        ?adjacency_list,
68        "explain analyze metadata"
69    );
70
71    let worker_nodes = net::list_stream_worker_nodes(handler_args.session.env()).await?;
72
73    let executor_stats = net::get_executor_stats(
74        &handler_args,
75        &worker_nodes,
76        &executor_ids,
77        &dispatcher_fragment_ids,
78        profiling_duration,
79    )
80    .await?;
81    tracing::debug!(?executor_stats, "collected executor stats");
82    let aggregated_stats = metrics::OperatorStats::aggregate(
83        operator_to_executor,
84        &executor_stats,
85        &fragment_parallelisms,
86    );
87    tracing::debug!(?aggregated_stats, "collected aggregated stats");
88
89    // Render graph with metrics
90    let rows = render_graph_with_metrics(
91        &adjacency_list,
92        root_node,
93        &aggregated_stats,
94        &profiling_duration,
95    );
96    let builder = RwPgResponseBuilder::empty(StatementType::EXPLAIN);
97    let builder = builder.rows(rows);
98    Ok(builder.into())
99}
100
101/// Binding pass, since we don't go through the binder.
102/// TODO(noel): Should this be in binder? But it may make compilation slower and doesn't require any binder logic...
103mod bind {
104    use risingwave_common::id::JobId;
105    use risingwave_sqlparser::ast::AnalyzeTarget;
106
107    use crate::Binder;
108    use crate::catalog::root_catalog::SchemaPath;
109    use crate::error::Result;
110    use crate::handler::HandlerArgs;
111
112    /// Bind the analyze target relation to its actual id.
113    pub(super) fn bind_relation(
114        target_relation: &AnalyzeTarget,
115        handler_args: &HandlerArgs,
116    ) -> Result<JobId> {
117        let job_id = match &target_relation {
118            AnalyzeTarget::Id(id) => (*id).into(),
119            AnalyzeTarget::Index(name)
120            | AnalyzeTarget::Table(name)
121            | AnalyzeTarget::Sink(name)
122            | AnalyzeTarget::MaterializedView(name) => {
123                let session = &handler_args.session;
124                let db_name = session.database();
125                let (schema_name, name) = Binder::resolve_schema_qualified_name(&db_name, name)?;
126                let search_path = session.config().search_path();
127                let user_name = &session.user_name();
128                let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
129
130                let catalog_reader = handler_args.session.env().catalog_reader();
131                let catalog = catalog_reader.read_guard();
132
133                match target_relation {
134                    AnalyzeTarget::Index(_) => {
135                        let (catalog, _schema_name) =
136                            catalog.get_any_index_by_name(&db_name, schema_path, &name)?;
137                        catalog.id.as_job_id()
138                    }
139                    AnalyzeTarget::Table(_) => {
140                        let (catalog, _schema_name) =
141                            catalog.get_any_table_by_name(&db_name, schema_path, &name)?;
142                        catalog.id.as_job_id()
143                    }
144                    AnalyzeTarget::Sink(_) => {
145                        let (catalog, _schema_name) =
146                            catalog.get_any_sink_by_name(&db_name, schema_path, &name)?;
147                        catalog.id.as_job_id()
148                    }
149                    AnalyzeTarget::MaterializedView(_) => {
150                        let (catalog, _schema_name) =
151                            catalog.get_any_table_by_name(&db_name, schema_path, &name)?;
152                        catalog.id.as_job_id()
153                    }
154                    AnalyzeTarget::Id(_) => unreachable!(),
155                }
156            }
157        };
158        Ok(job_id)
159    }
160}
161
162/// Utilities for fetching stats from CN
163mod net {
164    use std::collections::HashSet;
165
166    use risingwave_common::id::{FragmentId, JobId};
167    use risingwave_pb::common::WorkerNode;
168    use risingwave_pb::meta::list_table_fragments_response::FragmentInfo;
169    use risingwave_pb::monitor_service::GetProfileStatsRequest;
170    use tokio::time::{Duration, sleep};
171
172    use crate::error::Result;
173    use crate::handler::HandlerArgs;
174    use crate::handler::explain_analyze_stream_job::graph::ExecutorId;
175    use crate::handler::explain_analyze_stream_job::metrics::ExecutorStats;
176    use crate::meta_client::FrontendMetaClient;
177    use crate::session::FrontendEnv;
178
179    pub(super) async fn list_stream_worker_nodes(env: &FrontendEnv) -> Result<Vec<WorkerNode>> {
180        let worker_nodes = env.meta_client().list_all_nodes().await?;
181        let stream_worker_nodes = worker_nodes
182            .into_iter()
183            .filter(|node| {
184                node.property
185                    .as_ref()
186                    .map(|p| p.is_streaming)
187                    .unwrap_or_else(|| false)
188            })
189            .collect::<Vec<_>>();
190        Ok(stream_worker_nodes)
191    }
192
193    // TODO(kwannoel): Only fetch the names, actor_ids and graph of the fragments
194    pub(super) async fn get_fragments(
195        meta_client: &dyn FrontendMetaClient,
196        job_id: JobId,
197    ) -> Result<Vec<FragmentInfo>> {
198        let mut fragment_map = meta_client.list_table_fragments(&[job_id]).await?;
199        assert_eq!(fragment_map.len(), 1, "expected only one fragment");
200        let (fragment_job_id, table_fragment_info) = fragment_map.drain().next().unwrap();
201        assert_eq!(fragment_job_id, job_id);
202        Ok(table_fragment_info.fragments)
203    }
204
205    pub(super) async fn get_executor_stats(
206        handler_args: &HandlerArgs,
207        worker_nodes: &[WorkerNode],
208        executor_ids: &HashSet<ExecutorId>,
209        dispatcher_fragment_ids: &HashSet<FragmentId>,
210        profiling_duration: Duration,
211    ) -> Result<ExecutorStats> {
212        let dispatcher_fragment_ids = dispatcher_fragment_ids.iter().copied().collect::<Vec<_>>();
213        let mut initial_aggregated_stats = ExecutorStats::new();
214        for node in worker_nodes {
215            let mut compute_client = handler_args.session.env().client_pool().get(node).await?;
216            let stats = compute_client
217                .monitor_client
218                .get_profile_stats(GetProfileStatsRequest {
219                    executor_ids: executor_ids.iter().copied().collect(),
220                    dispatcher_fragment_ids: dispatcher_fragment_ids.clone(),
221                })
222                .await
223                .expect("get profiling stats failed");
224            initial_aggregated_stats.record(
225                executor_ids,
226                &dispatcher_fragment_ids,
227                &stats.into_inner(),
228            );
229        }
230        tracing::debug!(?initial_aggregated_stats, "initial aggregated stats");
231
232        sleep(profiling_duration).await;
233
234        let mut final_aggregated_stats = ExecutorStats::new();
235        for node in worker_nodes {
236            let mut compute_client = handler_args.session.env().client_pool().get(node).await?;
237            let stats = compute_client
238                .monitor_client
239                .get_profile_stats(GetProfileStatsRequest {
240                    executor_ids: executor_ids.iter().copied().collect(),
241                    dispatcher_fragment_ids: dispatcher_fragment_ids.clone(),
242                })
243                .await
244                .expect("get profiling stats failed");
245            final_aggregated_stats.record(
246                executor_ids,
247                &dispatcher_fragment_ids,
248                &stats.into_inner(),
249            );
250        }
251        tracing::debug!(?final_aggregated_stats, "final aggregated stats");
252
253        let delta_aggregated_stats = ExecutorStats::get_delta(
254            &initial_aggregated_stats,
255            &final_aggregated_stats,
256            executor_ids,
257            &dispatcher_fragment_ids,
258        );
259
260        Ok(delta_aggregated_stats)
261    }
262}
263
264/// Profiling metrics data structure and utilities
265/// We have 2 stages of metric collection:
266/// 1. Collect the stream node metrics at the **Executor** level.
267/// 2. Merge the stream node metrics into **Operator** level, avg, max, min, etc...
268mod metrics {
269    use std::collections::{HashMap, HashSet};
270
271    use risingwave_common::operator::unique_executor_id_into_parts;
272    use risingwave_pb::monitor_service::GetProfileStatsResponse;
273
274    use crate::catalog::FragmentId;
275    use crate::handler::explain_analyze_stream_job::graph::{ExecutorId, OperatorId};
276    use crate::handler::explain_analyze_stream_job::utils::operator_id_for_dispatch;
277
278    #[expect(dead_code)]
279    #[derive(Default, Debug)]
280    pub(super) struct ExecutorMetrics {
281        pub executor_id: ExecutorId,
282        pub epoch: u32,
283        pub total_output_throughput: u64,
284        pub total_output_pending_ns: u64,
285    }
286
287    #[derive(Default, Debug)]
288    pub(super) struct DispatchMetrics {
289        pub fragment_id: FragmentId,
290        pub epoch: u32,
291        pub total_output_throughput: u64,
292        pub total_output_pending_ns: u64,
293    }
294
295    #[derive(Debug)]
296    pub(super) struct ExecutorStats {
297        executor_stats: HashMap<ExecutorId, ExecutorMetrics>,
298        dispatch_stats: HashMap<FragmentId, DispatchMetrics>,
299    }
300
301    impl ExecutorStats {
302        pub(super) fn new() -> Self {
303            ExecutorStats {
304                executor_stats: HashMap::new(),
305                dispatch_stats: HashMap::new(),
306            }
307        }
308
309        pub fn get(&self, executor_id: &ExecutorId) -> Option<&ExecutorMetrics> {
310            self.executor_stats.get(executor_id)
311        }
312
313        /// Record metrics for profiling
314        pub(super) fn record<'a>(
315            &mut self,
316            executor_ids: &'a HashSet<ExecutorId>,
317            dispatch_fragment_ids: &'a [FragmentId],
318            metrics: &'a GetProfileStatsResponse,
319        ) {
320            for executor_id in executor_ids {
321                let Some(total_output_throughput) =
322                    metrics.stream_node_output_row_count.get(executor_id)
323                else {
324                    continue;
325                };
326                let Some(total_output_pending_ns) = metrics
327                    .stream_node_output_blocking_duration_ns
328                    .get(executor_id)
329                else {
330                    continue;
331                };
332                let stats = ExecutorMetrics {
333                    executor_id: *executor_id,
334                    epoch: 0,
335                    total_output_throughput: *total_output_throughput,
336                    total_output_pending_ns: *total_output_pending_ns,
337                };
338                // An executor should be scheduled on a single worker node,
339                // it should not be inserted multiple times.
340                if cfg!(madsim) {
341                    // If madsim is enabled, worker nodes will share the same process.
342                    // The metrics is stored as a global object, so querying each worker node
343                    // will return the same set of executor metrics.
344                    // So we should not assert here.
345                    self.executor_stats.insert(*executor_id, stats);
346                } else {
347                    assert!(self.executor_stats.insert(*executor_id, stats).is_none());
348                }
349            }
350
351            for fragment_id in dispatch_fragment_ids {
352                let Some(total_output_throughput) =
353                    metrics.dispatch_fragment_output_row_count.get(fragment_id)
354                else {
355                    continue;
356                };
357                let Some(total_output_pending_ns) = metrics
358                    .dispatch_fragment_output_blocking_duration_ns
359                    .get(fragment_id)
360                else {
361                    continue;
362                };
363                let stats = self.dispatch_stats.entry(*fragment_id).or_default();
364                stats.fragment_id = *fragment_id;
365                stats.epoch = 0;
366                // do a sum rather than insert
367                // because dispatchers are
368                // distributed across worker nodes.
369                stats.total_output_throughput += *total_output_throughput;
370                stats.total_output_pending_ns += *total_output_pending_ns;
371            }
372        }
373
374        pub(super) fn get_delta(
375            initial: &Self,
376            end: &Self,
377            executor_ids: &HashSet<ExecutorId>,
378            dispatch_fragment_ids: &[FragmentId],
379        ) -> Self {
380            let mut delta_aggregated_stats = Self::new();
381            for executor_id in executor_ids {
382                let (actor_id, operator_id) = unique_executor_id_into_parts(*executor_id);
383                let Some(initial_stats) = initial.executor_stats.get(executor_id) else {
384                    debug_panic_or_warn!(
385                        "missing initial stats for executor {} (actor {} operator {})",
386                        executor_id,
387                        actor_id,
388                        operator_id
389                    );
390                    continue;
391                };
392                let Some(end_stats) = end.executor_stats.get(executor_id) else {
393                    debug_panic_or_warn!(
394                        "missing final stats for executor {} (actor {} operator {})",
395                        executor_id,
396                        actor_id,
397                        operator_id
398                    );
399                    continue;
400                };
401
402                let initial_throughput = initial_stats.total_output_throughput;
403                let end_throughput = end_stats.total_output_throughput;
404                let Some(delta_throughput) = end_throughput.checked_sub(initial_throughput) else {
405                    debug_panic_or_warn!(
406                        "delta throughput is negative for actor {} operator {} (initial: {}, end: {})",
407                        actor_id,
408                        operator_id,
409                        initial_throughput,
410                        end_throughput
411                    );
412                    continue;
413                };
414
415                let initial_pending_ns = initial_stats.total_output_pending_ns;
416                let end_pending_ns = end_stats.total_output_pending_ns;
417                let Some(delta_pending_ns) = end_pending_ns.checked_sub(initial_pending_ns) else {
418                    debug_panic_or_warn!(
419                        "delta pending ns is negative for actor {} operator {} (initial: {}, end: {})",
420                        actor_id,
421                        operator_id,
422                        initial_pending_ns,
423                        end_pending_ns
424                    );
425                    continue;
426                };
427
428                let delta_stats = ExecutorMetrics {
429                    executor_id: *executor_id,
430                    epoch: 0,
431                    total_output_throughput: delta_throughput,
432                    total_output_pending_ns: delta_pending_ns,
433                };
434                delta_aggregated_stats
435                    .executor_stats
436                    .insert(*executor_id, delta_stats);
437            }
438
439            for fragment_id in dispatch_fragment_ids {
440                let Some(initial_stats) = initial.dispatch_stats.get(fragment_id) else {
441                    debug_panic_or_warn!("missing initial stats for fragment {}", fragment_id);
442                    continue;
443                };
444                let Some(end_stats) = end.dispatch_stats.get(fragment_id) else {
445                    debug_panic_or_warn!("missing final stats for fragment {}", fragment_id);
446                    continue;
447                };
448
449                let initial_throughput = initial_stats.total_output_throughput;
450                let end_throughput = end_stats.total_output_throughput;
451                let Some(delta_throughput) = end_throughput.checked_sub(initial_throughput) else {
452                    debug_panic_or_warn!(
453                        "delta throughput is negative for fragment {} (initial: {}, end: {})",
454                        fragment_id,
455                        initial_throughput,
456                        end_throughput
457                    );
458                    continue;
459                };
460
461                let initial_pending_ns = initial_stats.total_output_pending_ns;
462                let end_pending_ns = end_stats.total_output_pending_ns;
463                let Some(delta_pending_ns) = end_pending_ns.checked_sub(initial_pending_ns) else {
464                    debug_panic_or_warn!(
465                        "delta pending ns is negative for fragment {} (initial: {}, end: {})",
466                        fragment_id,
467                        initial_pending_ns,
468                        end_pending_ns
469                    );
470                    continue;
471                };
472
473                let delta_stats = DispatchMetrics {
474                    fragment_id: *fragment_id,
475                    epoch: 0,
476                    total_output_throughput: delta_throughput,
477                    total_output_pending_ns: delta_pending_ns,
478                };
479                delta_aggregated_stats
480                    .dispatch_stats
481                    .insert(*fragment_id, delta_stats);
482            }
483
484            delta_aggregated_stats
485        }
486    }
487
488    #[expect(dead_code)]
489    #[derive(Debug)]
490    pub(super) struct OperatorMetrics {
491        pub operator_id: OperatorId,
492        pub epoch: u32,
493        pub total_output_throughput: u64,
494        pub total_output_pending_ns: u64,
495    }
496
497    #[derive(Debug)]
498    pub(super) struct OperatorStats {
499        inner: HashMap<OperatorId, OperatorMetrics>,
500    }
501
502    impl OperatorStats {
503        /// Aggregates executor-level stats into operator-level stats
504        pub(super) fn aggregate(
505            operator_map: HashMap<OperatorId, HashSet<ExecutorId>>,
506            executor_stats: &ExecutorStats,
507            fragment_parallelisms: &HashMap<FragmentId, usize>,
508        ) -> Self {
509            let mut operator_stats = HashMap::new();
510            'operator_loop: for (operator_id, executor_ids) in operator_map {
511                let num_executors = executor_ids.len() as u64;
512                let mut total_output_throughput = 0;
513                let mut total_output_pending_ns = 0;
514                for executor_id in executor_ids {
515                    if let Some(stats) = executor_stats.get(&executor_id) {
516                        total_output_throughput += stats.total_output_throughput;
517                        total_output_pending_ns += stats.total_output_pending_ns;
518                    } else {
519                        // skip this operator if it doesn't have executor stats for any of its executors
520                        continue 'operator_loop;
521                    }
522                }
523                let total_output_throughput = total_output_throughput;
524                let total_output_pending_ns = total_output_pending_ns / num_executors;
525
526                operator_stats.insert(
527                    operator_id,
528                    OperatorMetrics {
529                        operator_id,
530                        epoch: 0,
531                        total_output_throughput,
532                        total_output_pending_ns,
533                    },
534                );
535            }
536
537            for (fragment_id, dispatch_metrics) in &executor_stats.dispatch_stats {
538                let operator_id = operator_id_for_dispatch(*fragment_id);
539                let total_output_throughput = dispatch_metrics.total_output_throughput;
540                let Some(fragment_parallelism) = fragment_parallelisms.get(fragment_id) else {
541                    debug_panic_or_warn!(
542                        "missing fragment parallelism for fragment {}",
543                        fragment_id
544                    );
545                    continue;
546                };
547                let total_output_pending_ns =
548                    dispatch_metrics.total_output_pending_ns / *fragment_parallelism as u64;
549
550                operator_stats.insert(
551                    operator_id,
552                    OperatorMetrics {
553                        operator_id,
554                        epoch: 0,
555                        total_output_throughput,
556                        total_output_pending_ns,
557                    },
558                );
559            }
560
561            OperatorStats {
562                inner: operator_stats,
563            }
564        }
565
566        pub fn get(&self, operator_id: &OperatorId) -> Option<&OperatorMetrics> {
567            self.inner.get(operator_id)
568        }
569    }
570}
571
572/// Utilities for the stream node graph:
573/// rendering, extracting, etc.
574mod graph {
575    use std::collections::{HashMap, HashSet};
576    use std::fmt::Debug;
577    use std::time::Duration;
578
579    use itertools::Itertools;
580    use risingwave_common::operator::{
581        unique_executor_id_from_unique_operator_id, unique_operator_id,
582        unique_operator_id_into_parts,
583    };
584    use risingwave_pb::id::ActorId;
585    use risingwave_pb::meta::list_table_fragments_response::FragmentInfo;
586    use risingwave_pb::stream_plan::stream_node::{NodeBody, NodeBodyDiscriminants};
587    use risingwave_pb::stream_plan::{MergeNode, StreamNode as PbStreamNode};
588
589    use crate::catalog::FragmentId;
590    use crate::handler::explain_analyze_stream_job::ExplainAnalyzeStreamJobOutput;
591    use crate::handler::explain_analyze_stream_job::metrics::OperatorStats;
592    use crate::handler::explain_analyze_stream_job::utils::operator_id_for_dispatch;
593    pub(super) type OperatorId = u64;
594    pub(super) type ExecutorId = u64;
595
596    /// This is an internal struct used ONLY for explain analyze stream job.
597    pub(super) struct StreamNode {
598        operator_id: OperatorId,
599        fragment_id: FragmentId,
600        identity: NodeBodyDiscriminants,
601        actor_ids: HashSet<ActorId>,
602        dependencies: Vec<u64>,
603    }
604
605    impl Debug for StreamNode {
606        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
607            let (actor_id, operator_id) = unique_operator_id_into_parts(self.operator_id);
608            let operator_id_str = format!(
609                "{}: (actor_id: {}, operator_id: {})",
610                self.operator_id, actor_id, operator_id
611            );
612            write!(
613                f,
614                "StreamNode {{ operator_id: {}, fragment_id: {}, identity: {:?}, actor_ids: {:?}, dependencies: {:?} }}",
615                operator_id_str, self.fragment_id, self.identity, self.actor_ids, self.dependencies
616            )
617        }
618    }
619
620    impl StreamNode {
621        fn new_for_dispatcher(fragment_id: FragmentId) -> Self {
622            StreamNode {
623                operator_id: operator_id_for_dispatch(fragment_id),
624                fragment_id,
625                identity: NodeBodyDiscriminants::Exchange,
626                actor_ids: Default::default(),
627                dependencies: Default::default(),
628            }
629        }
630    }
631
632    /// Extracts the root node of the plan, as well as the adjacency list
633    pub(super) fn extract_stream_node_infos(
634        fragments: Vec<FragmentInfo>,
635    ) -> (
636        OperatorId,
637        HashSet<FragmentId>,
638        HashMap<OperatorId, StreamNode>,
639    ) {
640        let job_fragment_ids = fragments
641            .iter()
642            .map(|f| f.id)
643            .collect::<HashSet<FragmentId>>();
644
645        // Finds root nodes of the graph
646        fn find_root_nodes(stream_nodes: &HashMap<u64, StreamNode>) -> HashSet<u64> {
647            let mut all_nodes = stream_nodes.keys().copied().collect::<HashSet<_>>();
648            for node in stream_nodes.values() {
649                for dependency in &node.dependencies {
650                    all_nodes.remove(dependency);
651                }
652            }
653            all_nodes
654        }
655
656        // Recursively extracts stream node info, and builds an adjacency list between stream nodes
657        // and their dependencies
658        fn extract_stream_node_info(
659            fragment_id: FragmentId,
660            fragment_id_to_merge_operator_id: &mut HashMap<FragmentId, OperatorId>,
661            operator_id_to_stream_node: &mut HashMap<OperatorId, StreamNode>,
662            node: &PbStreamNode,
663            actor_ids: &HashSet<ActorId>,
664        ) {
665            let identity = node
666                .node_body
667                .as_ref()
668                .expect("should have node body")
669                .into();
670            let operator_id = unique_operator_id(fragment_id, node.operator_id);
671            if let Some(merge_node) = node.node_body.as_ref()
672                && let NodeBody::Merge(box MergeNode {
673                    upstream_fragment_id,
674                    ..
675                }) = merge_node
676            {
677                fragment_id_to_merge_operator_id.insert(*upstream_fragment_id, operator_id);
678            }
679            let dependencies = &node.input;
680            let dependency_ids = dependencies
681                .iter()
682                .map(|input| unique_operator_id(fragment_id, input.operator_id))
683                .collect::<Vec<_>>();
684            operator_id_to_stream_node.insert(
685                operator_id,
686                StreamNode {
687                    operator_id,
688                    fragment_id,
689                    identity,
690                    actor_ids: actor_ids.clone(),
691                    dependencies: dependency_ids,
692                },
693            );
694            for dependency in dependencies {
695                extract_stream_node_info(
696                    fragment_id,
697                    fragment_id_to_merge_operator_id,
698                    operator_id_to_stream_node,
699                    dependency,
700                    actor_ids,
701                );
702            }
703        }
704
705        // build adjacency list and hanging merge edges.
706        // hanging merge edges will be filled in the following section.
707        let mut operator_id_to_stream_node = HashMap::new();
708        let mut fragment_id_to_merge_operator_id = HashMap::new();
709        for fragment in fragments {
710            let actors = fragment.actors;
711            assert!(
712                !actors.is_empty(),
713                "fragment {} should have at least one actor",
714                fragment.id
715            );
716            let actor_ids = actors.iter().map(|actor| actor.id).collect::<HashSet<_>>();
717            let node = actors[0].node.as_ref().expect("should have stream node");
718            extract_stream_node_info(
719                fragment.id,
720                &mut fragment_id_to_merge_operator_id,
721                &mut operator_id_to_stream_node,
722                node,
723                &actor_ids,
724            );
725        }
726
727        // find root node, and fill in dispatcher edges + nodes.
728        let root_or_dispatch_nodes = find_root_nodes(&operator_id_to_stream_node);
729        let mut root_node = None;
730        for operator_id in root_or_dispatch_nodes {
731            let node = operator_id_to_stream_node.get_mut(&operator_id).unwrap();
732            let fragment_id = node.fragment_id;
733            if let Some(merge_operator_id) = fragment_id_to_merge_operator_id.get(&fragment_id) {
734                let mut dispatcher = StreamNode::new_for_dispatcher(fragment_id);
735                let operator_id_for_dispatch = dispatcher.operator_id;
736                dispatcher.dependencies.push(operator_id);
737                assert!(
738                    operator_id_to_stream_node
739                        .insert(operator_id_for_dispatch as _, dispatcher)
740                        .is_none()
741                );
742                operator_id_to_stream_node
743                    .get_mut(merge_operator_id)
744                    .unwrap()
745                    .dependencies
746                    .push(operator_id_for_dispatch as _)
747            } else {
748                root_node = Some(operator_id);
749            }
750        }
751
752        let mut dispatcher_fragment_ids = HashSet::new();
753        for dispatcher_fragment_id in fragment_id_to_merge_operator_id.keys() {
754            if job_fragment_ids.contains(dispatcher_fragment_id) {
755                dispatcher_fragment_ids.insert(*dispatcher_fragment_id);
756            }
757        }
758
759        (
760            root_node.unwrap(),
761            dispatcher_fragment_ids,
762            operator_id_to_stream_node,
763        )
764    }
765
766    pub(super) fn extract_executor_infos(
767        adjacency_list: &HashMap<OperatorId, StreamNode>,
768    ) -> (HashSet<u64>, HashMap<u64, HashSet<u64>>) {
769        let mut executor_ids = HashSet::new();
770        let mut operator_to_executor = HashMap::new();
771        for (operator_id, node) in adjacency_list {
772            assert_eq!(*operator_id, node.operator_id);
773            let operator_id = node.operator_id;
774            for actor_id in &node.actor_ids {
775                let executor_id =
776                    unique_executor_id_from_unique_operator_id(*actor_id, operator_id);
777                if node.identity != NodeBodyDiscriminants::BatchPlan
778                // FIXME(kwannoel): Add back after https://github.com/risingwavelabs/risingwave/issues/22775 is resolved.
779                && node.identity != NodeBodyDiscriminants::Merge
780                && node.identity != NodeBodyDiscriminants::Project
781                {
782                    assert!(executor_ids.insert(executor_id));
783                }
784                assert!(
785                    operator_to_executor
786                        .entry(operator_id)
787                        .or_insert_with(HashSet::new)
788                        .insert(executor_id)
789                );
790            }
791        }
792        (executor_ids, operator_to_executor)
793    }
794
795    // Do a DFS based rendering. Each node will occupy its own row.
796    // Schema:
797    // | Operator ID | Identity | Actor IDs | Metrics ... |
798    // Each node will be indented based on its depth in the graph.
799    pub(super) fn render_graph_with_metrics(
800        adjacency_list: &HashMap<u64, StreamNode>,
801        root_node: u64,
802        stats: &OperatorStats,
803        profiling_duration: &Duration,
804    ) -> Vec<ExplainAnalyzeStreamJobOutput> {
805        let profiling_duration_secs = profiling_duration.as_secs_f64();
806        let mut rows = vec![];
807        let mut stack = vec![(String::new(), true, root_node)];
808        while let Some((prefix, last_child, node_id)) = stack.pop() {
809            let Some(node) = adjacency_list.get(&node_id) else {
810                continue;
811            };
812            let is_root = node_id == root_node;
813
814            let identity_rendered = if is_root {
815                node.identity.to_string()
816            } else {
817                let connector = if last_child { "└─ " } else { "├─ " };
818                format!("{}{}{}", prefix, connector, node.identity)
819            };
820
821            let child_prefix = if is_root {
822                ""
823            } else if last_child {
824                "   "
825            } else {
826                "│  "
827            };
828            let child_prefix = format!("{}{}", prefix, child_prefix);
829
830            let stats = stats.get(&node_id);
831            let (output_rows_per_second, downstream_backpressure_ratio) = match stats {
832                Some(stats) => (
833                    Some(
834                        (stats.total_output_throughput as f64 / profiling_duration_secs)
835                            .to_string(),
836                    ),
837                    Some(
838                        (Duration::from_nanos(stats.total_output_pending_ns).as_secs_f64()
839                            / usize::max(node.actor_ids.len(), 1) as f64
840                            / profiling_duration_secs)
841                            .to_string(),
842                    ),
843                ),
844                None => (None, None),
845            };
846            let row = ExplainAnalyzeStreamJobOutput {
847                identity: identity_rendered,
848                actor_ids: node
849                    .actor_ids
850                    .iter()
851                    .sorted()
852                    .map(|id| id.to_string())
853                    .collect::<Vec<_>>()
854                    .join(","),
855                output_rows_per_second,
856                downstream_backpressure_ratio,
857            };
858            rows.push(row);
859            for (position, dependency) in node.dependencies.iter().enumerate() {
860                stack.push((child_prefix.clone(), position == 0, *dependency));
861            }
862        }
863        rows
864    }
865}
866
867mod utils {
868    use risingwave_common::operator::unique_operator_id;
869
870    use crate::catalog::FragmentId;
871    use crate::handler::explain_analyze_stream_job::graph::OperatorId;
872
873    pub(super) fn operator_id_for_dispatch(fragment_id: FragmentId) -> OperatorId {
874        unique_operator_id(fragment_id, u32::MAX as u64)
875    }
876}