risingwave_frontend/handler/
explain_analyze_stream_job.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use pgwire::pg_response::StatementType;
18use risingwave_common::types::Fields;
19use risingwave_sqlparser::ast::AnalyzeTarget;
20use tokio::time::Duration;
21
22use crate::error::Result;
23use crate::handler::explain_analyze_stream_job::graph::{
24    extract_executor_infos, extract_stream_node_infos, render_graph_with_metrics,
25};
26use crate::handler::{HandlerArgs, RwPgResponse, RwPgResponseBuilder, RwPgResponseBuilderExt};
27
28#[macro_export]
29macro_rules! debug_panic_or_warn {
30    ($($arg:tt)*) => {
31        if cfg!(debug_assertions) || cfg!(madsim) {
32            panic!($($arg)*);
33        } else {
34            tracing::warn!($($arg)*);
35        }
36    };
37}
38
39#[derive(Fields)]
40struct ExplainAnalyzeStreamJobOutput {
41    identity: String,
42    actor_ids: String,
43    output_rows_per_second: Option<String>,
44    downstream_backpressure_ratio: Option<String>,
45}
46
47pub async fn handle_explain_analyze_stream_job(
48    handler_args: HandlerArgs,
49    target: AnalyzeTarget,
50    duration_secs: Option<u64>,
51) -> Result<RwPgResponse> {
52    let profiling_duration = Duration::from_secs(duration_secs.unwrap_or(10));
53    let job_id = bind::bind_relation(&target, &handler_args)?;
54
55    let meta_client = handler_args.session.env().meta_client();
56    let fragments = net::get_fragments(meta_client, job_id).await?;
57    let fragment_parallelisms = fragments
58        .iter()
59        .map(|f| (f.id, f.actors.len()))
60        .collect::<HashMap<_, _>>();
61    let (root_node, dispatcher_fragment_ids, adjacency_list) = extract_stream_node_infos(fragments);
62    let (executor_ids, operator_to_executor) = extract_executor_infos(&adjacency_list);
63    tracing::debug!(
64        ?fragment_parallelisms,
65        ?root_node,
66        ?dispatcher_fragment_ids,
67        ?adjacency_list,
68        "explain analyze metadata"
69    );
70
71    let worker_nodes = net::list_stream_worker_nodes(handler_args.session.env()).await?;
72
73    let executor_stats = net::get_executor_stats(
74        &handler_args,
75        &worker_nodes,
76        &executor_ids,
77        &dispatcher_fragment_ids,
78        profiling_duration,
79    )
80    .await?;
81    tracing::debug!(?executor_stats, "collected executor stats");
82    let aggregated_stats = metrics::OperatorStats::aggregate(
83        operator_to_executor,
84        &executor_stats,
85        &fragment_parallelisms,
86    );
87    tracing::debug!(?aggregated_stats, "collected aggregated stats");
88
89    // Render graph with metrics
90    let rows = render_graph_with_metrics(
91        &adjacency_list,
92        root_node,
93        &aggregated_stats,
94        &profiling_duration,
95    );
96    let builder = RwPgResponseBuilder::empty(StatementType::EXPLAIN);
97    let builder = builder.rows(rows);
98    Ok(builder.into())
99}
100
101/// Binding pass, since we don't go through the binder.
102/// TODO(noel): Should this be in binder? But it may make compilation slower and doesn't require any binder logic...
103mod bind {
104    use risingwave_common::id::JobId;
105    use risingwave_sqlparser::ast::AnalyzeTarget;
106
107    use crate::Binder;
108    use crate::catalog::root_catalog::SchemaPath;
109    use crate::error::Result;
110    use crate::handler::HandlerArgs;
111
112    /// Bind the analyze target relation to its actual id.
113    pub(super) fn bind_relation(
114        target_relation: &AnalyzeTarget,
115        handler_args: &HandlerArgs,
116    ) -> Result<JobId> {
117        let job_id = match &target_relation {
118            AnalyzeTarget::Id(id) => (*id).into(),
119            AnalyzeTarget::Index(name)
120            | AnalyzeTarget::Table(name)
121            | AnalyzeTarget::Sink(name)
122            | AnalyzeTarget::MaterializedView(name) => {
123                let session = &handler_args.session;
124                let db_name = session.database();
125                let (schema_name, name) = Binder::resolve_schema_qualified_name(&db_name, name)?;
126                let search_path = session.config().search_path();
127                let user_name = &session.user_name();
128                let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
129
130                let catalog_reader = handler_args.session.env().catalog_reader();
131                let catalog = catalog_reader.read_guard();
132
133                match target_relation {
134                    AnalyzeTarget::Index(_) => {
135                        let (catalog, _schema_name) =
136                            catalog.get_any_index_by_name(&db_name, schema_path, &name)?;
137                        catalog.id.as_job_id()
138                    }
139                    AnalyzeTarget::Table(_) => {
140                        let (catalog, _schema_name) =
141                            catalog.get_any_table_by_name(&db_name, schema_path, &name)?;
142                        catalog.id.as_job_id()
143                    }
144                    AnalyzeTarget::Sink(_) => {
145                        let (catalog, _schema_name) =
146                            catalog.get_any_sink_by_name(&db_name, schema_path, &name)?;
147                        catalog.id.as_job_id()
148                    }
149                    AnalyzeTarget::MaterializedView(_) => {
150                        let (catalog, _schema_name) =
151                            catalog.get_any_table_by_name(&db_name, schema_path, &name)?;
152                        catalog.id.as_job_id()
153                    }
154                    AnalyzeTarget::Id(_) => unreachable!(),
155                }
156            }
157        };
158        Ok(job_id)
159    }
160}
161
162/// Utilities for fetching stats from CN
163mod net {
164    use std::collections::HashSet;
165
166    use risingwave_common::bail;
167    use risingwave_common::id::{FragmentId, JobId};
168    use risingwave_pb::common::WorkerNode;
169    use risingwave_pb::id::ExecutorId;
170    use risingwave_pb::meta::list_table_fragments_response::FragmentInfo;
171    use risingwave_pb::monitor_service::GetProfileStatsRequest;
172    use tokio::time::{Duration, sleep};
173
174    use crate::error::Result;
175    use crate::handler::HandlerArgs;
176    use crate::handler::explain_analyze_stream_job::metrics::ExecutorStats;
177    use crate::meta_client::FrontendMetaClient;
178    use crate::session::FrontendEnv;
179
180    pub(super) async fn list_stream_worker_nodes(env: &FrontendEnv) -> Result<Vec<WorkerNode>> {
181        let worker_nodes = env.meta_client().list_all_nodes().await?;
182        let stream_worker_nodes = worker_nodes
183            .into_iter()
184            .filter(|node| {
185                node.property
186                    .as_ref()
187                    .map(|p| p.is_streaming)
188                    .unwrap_or_else(|| false)
189            })
190            .collect::<Vec<_>>();
191        Ok(stream_worker_nodes)
192    }
193
194    // TODO(kwannoel): Only fetch the names, actor_ids and graph of the fragments
195    pub(super) async fn get_fragments(
196        meta_client: &dyn FrontendMetaClient,
197        job_id: JobId,
198    ) -> Result<Vec<FragmentInfo>> {
199        let mut fragment_map = meta_client.list_table_fragments(&[job_id]).await?;
200        let mut table_fragments = fragment_map.drain();
201        let Some((fragment_job_id, table_fragment_info)) = table_fragments.next() else {
202            bail!("table fragment of job {job_id} not found");
203        };
204        assert_eq!(
205            table_fragments.next(),
206            None,
207            "expected only at most one fragment"
208        );
209        assert_eq!(fragment_job_id, job_id);
210        Ok(table_fragment_info.fragments)
211    }
212
213    pub(super) async fn get_executor_stats(
214        handler_args: &HandlerArgs,
215        worker_nodes: &[WorkerNode],
216        executor_ids: &HashSet<ExecutorId>,
217        dispatcher_fragment_ids: &HashSet<FragmentId>,
218        profiling_duration: Duration,
219    ) -> Result<ExecutorStats> {
220        let dispatcher_fragment_ids = dispatcher_fragment_ids.iter().copied().collect::<Vec<_>>();
221        let mut initial_aggregated_stats = ExecutorStats::new();
222        let monitor_client_pool = handler_args.session.env().monitor_client_pool();
223
224        for node in worker_nodes {
225            let client = monitor_client_pool.get(node).await?;
226            let stats = client
227                .get_profile_stats(GetProfileStatsRequest {
228                    executor_ids: executor_ids.iter().copied().collect(),
229                    dispatcher_fragment_ids: dispatcher_fragment_ids.clone(),
230                })
231                .await
232                .expect("get profiling stats failed");
233            initial_aggregated_stats.record(executor_ids, &dispatcher_fragment_ids, &stats);
234        }
235        tracing::debug!(?initial_aggregated_stats, "initial aggregated stats");
236
237        sleep(profiling_duration).await;
238
239        let mut final_aggregated_stats = ExecutorStats::new();
240        for node in worker_nodes {
241            let client = monitor_client_pool.get(node).await?;
242            let stats = client
243                .get_profile_stats(GetProfileStatsRequest {
244                    executor_ids: executor_ids.iter().copied().collect(),
245                    dispatcher_fragment_ids: dispatcher_fragment_ids.clone(),
246                })
247                .await
248                .expect("get profiling stats failed");
249            final_aggregated_stats.record(executor_ids, &dispatcher_fragment_ids, &stats);
250        }
251        tracing::debug!(?final_aggregated_stats, "final aggregated stats");
252
253        let delta_aggregated_stats = ExecutorStats::get_delta(
254            &initial_aggregated_stats,
255            &final_aggregated_stats,
256            executor_ids,
257            &dispatcher_fragment_ids,
258        );
259
260        Ok(delta_aggregated_stats)
261    }
262}
263
264/// Profiling metrics data structure and utilities
265/// We have 2 stages of metric collection:
266/// 1. Collect the stream node metrics at the **Executor** level.
267/// 2. Merge the stream node metrics into **Operator** level, avg, max, min, etc...
268mod metrics {
269    use std::collections::{HashMap, HashSet};
270
271    use risingwave_common::operator::unique_executor_id_into_parts;
272    use risingwave_pb::id::{ExecutorId, GlobalOperatorId};
273    use risingwave_pb::monitor_service::GetProfileStatsResponse;
274
275    use crate::catalog::FragmentId;
276    use crate::handler::explain_analyze_stream_job::utils::operator_id_for_dispatch;
277
278    type OperatorId = GlobalOperatorId;
279
280    #[expect(dead_code)]
281    #[derive(Default, Debug)]
282    pub(super) struct ExecutorMetrics {
283        pub executor_id: ExecutorId,
284        pub epoch: u32,
285        pub total_output_throughput: u64,
286        pub total_output_pending_ns: u64,
287    }
288
289    #[derive(Default, Debug)]
290    pub(super) struct DispatchMetrics {
291        pub fragment_id: FragmentId,
292        pub epoch: u32,
293        pub total_output_throughput: u64,
294        pub total_output_pending_ns: u64,
295    }
296
297    #[derive(Debug)]
298    pub(super) struct ExecutorStats {
299        executor_stats: HashMap<ExecutorId, ExecutorMetrics>,
300        dispatch_stats: HashMap<FragmentId, DispatchMetrics>,
301    }
302
303    impl ExecutorStats {
304        pub(super) fn new() -> Self {
305            ExecutorStats {
306                executor_stats: HashMap::new(),
307                dispatch_stats: HashMap::new(),
308            }
309        }
310
311        pub fn get(&self, executor_id: &ExecutorId) -> Option<&ExecutorMetrics> {
312            self.executor_stats.get(executor_id)
313        }
314
315        /// Record metrics for profiling
316        pub(super) fn record<'a>(
317            &mut self,
318            executor_ids: &'a HashSet<ExecutorId>,
319            dispatch_fragment_ids: &'a [FragmentId],
320            metrics: &'a GetProfileStatsResponse,
321        ) {
322            for executor_id in executor_ids {
323                let Some(total_output_throughput) =
324                    metrics.stream_node_output_row_count.get(executor_id)
325                else {
326                    continue;
327                };
328                let Some(total_output_pending_ns) = metrics
329                    .stream_node_output_blocking_duration_ns
330                    .get(executor_id)
331                else {
332                    continue;
333                };
334                let stats = ExecutorMetrics {
335                    executor_id: *executor_id,
336                    epoch: 0,
337                    total_output_throughput: *total_output_throughput,
338                    total_output_pending_ns: *total_output_pending_ns,
339                };
340                // An executor should be scheduled on a single worker node,
341                // it should not be inserted multiple times.
342                if cfg!(madsim) {
343                    // If madsim is enabled, worker nodes will share the same process.
344                    // The metrics is stored as a global object, so querying each worker node
345                    // will return the same set of executor metrics.
346                    // So we should not assert here.
347                    self.executor_stats.insert(*executor_id, stats);
348                } else {
349                    assert!(self.executor_stats.insert(*executor_id, stats).is_none());
350                }
351            }
352
353            for fragment_id in dispatch_fragment_ids {
354                let Some(total_output_throughput) =
355                    metrics.dispatch_fragment_output_row_count.get(fragment_id)
356                else {
357                    continue;
358                };
359                let Some(total_output_pending_ns) = metrics
360                    .dispatch_fragment_output_blocking_duration_ns
361                    .get(fragment_id)
362                else {
363                    continue;
364                };
365                let stats = self.dispatch_stats.entry(*fragment_id).or_default();
366                stats.fragment_id = *fragment_id;
367                stats.epoch = 0;
368                // do a sum rather than insert
369                // because dispatchers are
370                // distributed across worker nodes.
371                stats.total_output_throughput += *total_output_throughput;
372                stats.total_output_pending_ns += *total_output_pending_ns;
373            }
374        }
375
376        pub(super) fn get_delta(
377            initial: &Self,
378            end: &Self,
379            executor_ids: &HashSet<ExecutorId>,
380            dispatch_fragment_ids: &[FragmentId],
381        ) -> Self {
382            let mut delta_aggregated_stats = Self::new();
383            for executor_id in executor_ids {
384                let (actor_id, operator_id) = unique_executor_id_into_parts(*executor_id);
385                let Some(initial_stats) = initial.executor_stats.get(executor_id) else {
386                    debug_panic_or_warn!(
387                        "missing initial stats for executor {} (actor {} operator {})",
388                        executor_id,
389                        actor_id,
390                        operator_id
391                    );
392                    continue;
393                };
394                let Some(end_stats) = end.executor_stats.get(executor_id) else {
395                    debug_panic_or_warn!(
396                        "missing final stats for executor {} (actor {} operator {})",
397                        executor_id,
398                        actor_id,
399                        operator_id
400                    );
401                    continue;
402                };
403
404                let initial_throughput = initial_stats.total_output_throughput;
405                let end_throughput = end_stats.total_output_throughput;
406                let Some(delta_throughput) = end_throughput.checked_sub(initial_throughput) else {
407                    debug_panic_or_warn!(
408                        "delta throughput is negative for actor {} operator {} (initial: {}, end: {})",
409                        actor_id,
410                        operator_id,
411                        initial_throughput,
412                        end_throughput
413                    );
414                    continue;
415                };
416
417                let initial_pending_ns = initial_stats.total_output_pending_ns;
418                let end_pending_ns = end_stats.total_output_pending_ns;
419                let Some(delta_pending_ns) = end_pending_ns.checked_sub(initial_pending_ns) else {
420                    debug_panic_or_warn!(
421                        "delta pending ns is negative for actor {} operator {} (initial: {}, end: {})",
422                        actor_id,
423                        operator_id,
424                        initial_pending_ns,
425                        end_pending_ns
426                    );
427                    continue;
428                };
429
430                let delta_stats = ExecutorMetrics {
431                    executor_id: *executor_id,
432                    epoch: 0,
433                    total_output_throughput: delta_throughput,
434                    total_output_pending_ns: delta_pending_ns,
435                };
436                delta_aggregated_stats
437                    .executor_stats
438                    .insert(*executor_id, delta_stats);
439            }
440
441            for fragment_id in dispatch_fragment_ids {
442                let Some(initial_stats) = initial.dispatch_stats.get(fragment_id) else {
443                    debug_panic_or_warn!("missing initial stats for fragment {}", fragment_id);
444                    continue;
445                };
446                let Some(end_stats) = end.dispatch_stats.get(fragment_id) else {
447                    debug_panic_or_warn!("missing final stats for fragment {}", fragment_id);
448                    continue;
449                };
450
451                let initial_throughput = initial_stats.total_output_throughput;
452                let end_throughput = end_stats.total_output_throughput;
453                let Some(delta_throughput) = end_throughput.checked_sub(initial_throughput) else {
454                    debug_panic_or_warn!(
455                        "delta throughput is negative for fragment {} (initial: {}, end: {})",
456                        fragment_id,
457                        initial_throughput,
458                        end_throughput
459                    );
460                    continue;
461                };
462
463                let initial_pending_ns = initial_stats.total_output_pending_ns;
464                let end_pending_ns = end_stats.total_output_pending_ns;
465                let Some(delta_pending_ns) = end_pending_ns.checked_sub(initial_pending_ns) else {
466                    debug_panic_or_warn!(
467                        "delta pending ns is negative for fragment {} (initial: {}, end: {})",
468                        fragment_id,
469                        initial_pending_ns,
470                        end_pending_ns
471                    );
472                    continue;
473                };
474
475                let delta_stats = DispatchMetrics {
476                    fragment_id: *fragment_id,
477                    epoch: 0,
478                    total_output_throughput: delta_throughput,
479                    total_output_pending_ns: delta_pending_ns,
480                };
481                delta_aggregated_stats
482                    .dispatch_stats
483                    .insert(*fragment_id, delta_stats);
484            }
485
486            delta_aggregated_stats
487        }
488    }
489
490    #[expect(dead_code)]
491    #[derive(Debug)]
492    pub(super) struct OperatorMetrics {
493        pub operator_id: GlobalOperatorId,
494        pub epoch: u32,
495        pub total_output_throughput: u64,
496        pub total_output_pending_ns: u64,
497    }
498
499    #[derive(Debug)]
500    pub(super) struct OperatorStats {
501        inner: HashMap<GlobalOperatorId, OperatorMetrics>,
502    }
503
504    impl OperatorStats {
505        /// Aggregates executor-level stats into operator-level stats
506        pub(super) fn aggregate(
507            operator_map: HashMap<OperatorId, HashSet<ExecutorId>>,
508            executor_stats: &ExecutorStats,
509            fragment_parallelisms: &HashMap<FragmentId, usize>,
510        ) -> Self {
511            let mut operator_stats = HashMap::new();
512            'operator_loop: for (operator_id, executor_ids) in operator_map {
513                let num_executors = executor_ids.len() as u64;
514                let mut total_output_throughput = 0;
515                let mut total_output_pending_ns = 0;
516                for executor_id in executor_ids {
517                    if let Some(stats) = executor_stats.get(&executor_id) {
518                        total_output_throughput += stats.total_output_throughput;
519                        total_output_pending_ns += stats.total_output_pending_ns;
520                    } else {
521                        // skip this operator if it doesn't have executor stats for any of its executors
522                        continue 'operator_loop;
523                    }
524                }
525                let total_output_throughput = total_output_throughput;
526                let total_output_pending_ns = total_output_pending_ns / num_executors;
527
528                operator_stats.insert(
529                    operator_id,
530                    OperatorMetrics {
531                        operator_id,
532                        epoch: 0,
533                        total_output_throughput,
534                        total_output_pending_ns,
535                    },
536                );
537            }
538
539            for (fragment_id, dispatch_metrics) in &executor_stats.dispatch_stats {
540                let operator_id = operator_id_for_dispatch(*fragment_id);
541                let total_output_throughput = dispatch_metrics.total_output_throughput;
542                let Some(fragment_parallelism) = fragment_parallelisms.get(fragment_id) else {
543                    debug_panic_or_warn!(
544                        "missing fragment parallelism for fragment {}",
545                        fragment_id
546                    );
547                    continue;
548                };
549                let total_output_pending_ns =
550                    dispatch_metrics.total_output_pending_ns / *fragment_parallelism as u64;
551
552                operator_stats.insert(
553                    operator_id,
554                    OperatorMetrics {
555                        operator_id,
556                        epoch: 0,
557                        total_output_throughput,
558                        total_output_pending_ns,
559                    },
560                );
561            }
562
563            OperatorStats {
564                inner: operator_stats,
565            }
566        }
567
568        pub fn get(&self, operator_id: &OperatorId) -> Option<&OperatorMetrics> {
569            self.inner.get(operator_id)
570        }
571    }
572}
573
574/// Utilities for the stream node graph:
575/// rendering, extracting, etc.
576mod graph {
577    use std::collections::{HashMap, HashSet};
578    use std::fmt::Debug;
579    use std::time::Duration;
580
581    use itertools::Itertools;
582    use risingwave_common::operator::{
583        unique_executor_id_from_unique_operator_id, unique_operator_id,
584        unique_operator_id_into_parts,
585    };
586    use risingwave_pb::id::{ActorId, GlobalOperatorId};
587    use risingwave_pb::meta::list_table_fragments_response::FragmentInfo;
588    use risingwave_pb::stream_plan::stream_node::{NodeBody, NodeBodyDiscriminants};
589    use risingwave_pb::stream_plan::{MergeNode, StreamNode as PbStreamNode};
590
591    use crate::catalog::FragmentId;
592    use crate::handler::explain_analyze_stream_job::ExplainAnalyzeStreamJobOutput;
593    use crate::handler::explain_analyze_stream_job::metrics::OperatorStats;
594    use crate::handler::explain_analyze_stream_job::utils::operator_id_for_dispatch;
595
596    pub(super) type OperatorId = GlobalOperatorId;
597    pub(super) use risingwave_pb::id::ExecutorId;
598
599    /// This is an internal struct used ONLY for explain analyze stream job.
600    pub(super) struct StreamNode {
601        operator_id: OperatorId,
602        fragment_id: FragmentId,
603        identity: NodeBodyDiscriminants,
604        actor_ids: HashSet<ActorId>,
605        dependencies: Vec<OperatorId>,
606    }
607
608    impl Debug for StreamNode {
609        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
610            let (actor_id, operator_id) = unique_operator_id_into_parts(self.operator_id);
611            let operator_id_str = format!(
612                "{}: (actor_id: {}, operator_id: {})",
613                self.operator_id, actor_id, operator_id
614            );
615            write!(
616                f,
617                "StreamNode {{ operator_id: {}, fragment_id: {}, identity: {:?}, actor_ids: {:?}, dependencies: {:?} }}",
618                operator_id_str, self.fragment_id, self.identity, self.actor_ids, self.dependencies
619            )
620        }
621    }
622
623    impl StreamNode {
624        fn new_for_dispatcher(fragment_id: FragmentId) -> Self {
625            StreamNode {
626                operator_id: operator_id_for_dispatch(fragment_id),
627                fragment_id,
628                identity: NodeBodyDiscriminants::Exchange,
629                actor_ids: Default::default(),
630                dependencies: Default::default(),
631            }
632        }
633    }
634
635    /// Extracts the root node of the plan, as well as the adjacency list
636    pub(super) fn extract_stream_node_infos(
637        fragments: Vec<FragmentInfo>,
638    ) -> (
639        OperatorId,
640        HashSet<FragmentId>,
641        HashMap<OperatorId, StreamNode>,
642    ) {
643        let job_fragment_ids = fragments
644            .iter()
645            .map(|f| f.id)
646            .collect::<HashSet<FragmentId>>();
647
648        // Finds root nodes of the graph
649        fn find_root_nodes(stream_nodes: &HashMap<OperatorId, StreamNode>) -> HashSet<OperatorId> {
650            let mut all_nodes = stream_nodes.keys().copied().collect::<HashSet<_>>();
651            for node in stream_nodes.values() {
652                for dependency in &node.dependencies {
653                    all_nodes.remove(dependency);
654                }
655            }
656            all_nodes
657        }
658
659        // Recursively extracts stream node info, and builds an adjacency list between stream nodes
660        // and their dependencies
661        fn extract_stream_node_info(
662            fragment_id: FragmentId,
663            fragment_id_to_merge_operator_id: &mut HashMap<FragmentId, OperatorId>,
664            operator_id_to_stream_node: &mut HashMap<OperatorId, StreamNode>,
665            node: &PbStreamNode,
666            actor_ids: &HashSet<ActorId>,
667        ) {
668            let identity = node
669                .node_body
670                .as_ref()
671                .expect("should have node body")
672                .into();
673            let operator_id = unique_operator_id(fragment_id, node.operator_id);
674            if let Some(merge_node) = node.node_body.as_ref()
675                && let NodeBody::Merge(box MergeNode {
676                    upstream_fragment_id,
677                    ..
678                }) = merge_node
679            {
680                fragment_id_to_merge_operator_id.insert(*upstream_fragment_id, operator_id);
681            }
682            let dependencies = &node.input;
683            let dependency_ids = dependencies
684                .iter()
685                .map(|input| unique_operator_id(fragment_id, input.operator_id))
686                .collect::<Vec<_>>();
687            operator_id_to_stream_node.insert(
688                operator_id,
689                StreamNode {
690                    operator_id,
691                    fragment_id,
692                    identity,
693                    actor_ids: actor_ids.clone(),
694                    dependencies: dependency_ids,
695                },
696            );
697            for dependency in dependencies {
698                extract_stream_node_info(
699                    fragment_id,
700                    fragment_id_to_merge_operator_id,
701                    operator_id_to_stream_node,
702                    dependency,
703                    actor_ids,
704                );
705            }
706        }
707
708        // build adjacency list and hanging merge edges.
709        // hanging merge edges will be filled in the following section.
710        let mut operator_id_to_stream_node = HashMap::new();
711        let mut fragment_id_to_merge_operator_id = HashMap::new();
712        for fragment in fragments {
713            let actors = fragment.actors;
714            assert!(
715                !actors.is_empty(),
716                "fragment {} should have at least one actor",
717                fragment.id
718            );
719            let actor_ids = actors.iter().map(|actor| actor.id).collect::<HashSet<_>>();
720            let node = actors[0].node.as_ref().expect("should have stream node");
721            extract_stream_node_info(
722                fragment.id,
723                &mut fragment_id_to_merge_operator_id,
724                &mut operator_id_to_stream_node,
725                node,
726                &actor_ids,
727            );
728        }
729
730        // find root node, and fill in dispatcher edges + nodes.
731        let root_or_dispatch_nodes = find_root_nodes(&operator_id_to_stream_node);
732        let mut root_node = None;
733        for operator_id in root_or_dispatch_nodes {
734            let node = operator_id_to_stream_node.get_mut(&operator_id).unwrap();
735            let fragment_id = node.fragment_id;
736            if let Some(merge_operator_id) = fragment_id_to_merge_operator_id.get(&fragment_id) {
737                let mut dispatcher = StreamNode::new_for_dispatcher(fragment_id);
738                let operator_id_for_dispatch = dispatcher.operator_id;
739                dispatcher.dependencies.push(operator_id);
740                assert!(
741                    operator_id_to_stream_node
742                        .insert(operator_id_for_dispatch as _, dispatcher)
743                        .is_none()
744                );
745                operator_id_to_stream_node
746                    .get_mut(merge_operator_id)
747                    .unwrap()
748                    .dependencies
749                    .push(operator_id_for_dispatch as _)
750            } else {
751                root_node = Some(operator_id);
752            }
753        }
754
755        let mut dispatcher_fragment_ids = HashSet::new();
756        for dispatcher_fragment_id in fragment_id_to_merge_operator_id.keys() {
757            if job_fragment_ids.contains(dispatcher_fragment_id) {
758                dispatcher_fragment_ids.insert(*dispatcher_fragment_id);
759            }
760        }
761
762        (
763            root_node.unwrap(),
764            dispatcher_fragment_ids,
765            operator_id_to_stream_node,
766        )
767    }
768
769    pub(super) fn extract_executor_infos(
770        adjacency_list: &HashMap<OperatorId, StreamNode>,
771    ) -> (
772        HashSet<ExecutorId>,
773        HashMap<OperatorId, HashSet<ExecutorId>>,
774    ) {
775        let mut executor_ids = HashSet::new();
776        let mut operator_to_executor = HashMap::new();
777        for (operator_id, node) in adjacency_list {
778            assert_eq!(*operator_id, node.operator_id);
779            let operator_id = node.operator_id;
780            for actor_id in &node.actor_ids {
781                let executor_id =
782                    unique_executor_id_from_unique_operator_id(*actor_id, operator_id);
783                if node.identity != NodeBodyDiscriminants::BatchPlan
784                // FIXME(kwannoel): Add back after https://github.com/risingwavelabs/risingwave/issues/22775 is resolved.
785                && node.identity != NodeBodyDiscriminants::Merge
786                && node.identity != NodeBodyDiscriminants::Project
787                {
788                    assert!(executor_ids.insert(executor_id));
789                }
790                assert!(
791                    operator_to_executor
792                        .entry(operator_id)
793                        .or_insert_with(HashSet::new)
794                        .insert(executor_id)
795                );
796            }
797        }
798        (executor_ids, operator_to_executor)
799    }
800
801    // Do a DFS based rendering. Each node will occupy its own row.
802    // Schema:
803    // | Operator ID | Identity | Actor IDs | Metrics ... |
804    // Each node will be indented based on its depth in the graph.
805    pub(super) fn render_graph_with_metrics(
806        adjacency_list: &HashMap<OperatorId, StreamNode>,
807        root_node: OperatorId,
808        stats: &OperatorStats,
809        profiling_duration: &Duration,
810    ) -> Vec<ExplainAnalyzeStreamJobOutput> {
811        let profiling_duration_secs = profiling_duration.as_secs_f64();
812        let mut rows = vec![];
813        let mut stack = vec![(String::new(), true, root_node)];
814        while let Some((prefix, last_child, node_id)) = stack.pop() {
815            let Some(node) = adjacency_list.get(&node_id) else {
816                continue;
817            };
818            let is_root = node_id == root_node;
819
820            let identity_rendered = if is_root {
821                node.identity.to_string()
822            } else {
823                let connector = if last_child { "└─ " } else { "├─ " };
824                format!("{}{}{}", prefix, connector, node.identity)
825            };
826
827            let child_prefix = if is_root {
828                ""
829            } else if last_child {
830                "   "
831            } else {
832                "│  "
833            };
834            let child_prefix = format!("{}{}", prefix, child_prefix);
835
836            let stats = stats.get(&node_id);
837            let (output_rows_per_second, downstream_backpressure_ratio) = match stats {
838                Some(stats) => (
839                    Some(
840                        (stats.total_output_throughput as f64 / profiling_duration_secs)
841                            .to_string(),
842                    ),
843                    Some(
844                        (Duration::from_nanos(stats.total_output_pending_ns).as_secs_f64()
845                            / usize::max(node.actor_ids.len(), 1) as f64
846                            / profiling_duration_secs)
847                            .to_string(),
848                    ),
849                ),
850                None => (None, None),
851            };
852            let row = ExplainAnalyzeStreamJobOutput {
853                identity: identity_rendered,
854                actor_ids: node
855                    .actor_ids
856                    .iter()
857                    .sorted()
858                    .map(|id| id.to_string())
859                    .collect::<Vec<_>>()
860                    .join(","),
861                output_rows_per_second,
862                downstream_backpressure_ratio,
863            };
864            rows.push(row);
865            for (position, dependency) in node.dependencies.iter().enumerate() {
866                stack.push((child_prefix.clone(), position == 0, *dependency));
867            }
868        }
869        rows
870    }
871}
872
873mod utils {
874    use risingwave_common::operator::unique_operator_id;
875    use risingwave_pb::id::GlobalOperatorId;
876
877    use crate::catalog::FragmentId;
878
879    pub(super) fn operator_id_for_dispatch(fragment_id: FragmentId) -> GlobalOperatorId {
880        unique_operator_id(fragment_id, u32::MAX)
881    }
882}