risingwave_frontend/handler/
explain_analyze_stream_job.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use pgwire::pg_response::StatementType;
18use risingwave_common::types::Fields;
19use risingwave_sqlparser::ast::AnalyzeTarget;
20use tokio::time::Duration;
21
22use crate::error::Result;
23use crate::handler::explain_analyze_stream_job::graph::{
24    extract_executor_infos, extract_stream_node_infos, render_graph_with_metrics,
25};
26use crate::handler::{HandlerArgs, RwPgResponse, RwPgResponseBuilder, RwPgResponseBuilderExt};
27
28#[derive(Fields)]
29struct ExplainAnalyzeStreamJobOutput {
30    identity: String,
31    actor_ids: String,
32    output_rows_per_second: String,
33    downstream_backpressure_ratio: String,
34}
35
36pub async fn handle_explain_analyze_stream_job(
37    handler_args: HandlerArgs,
38    target: AnalyzeTarget,
39    duration_secs: Option<u64>,
40) -> Result<RwPgResponse> {
41    let profiling_duration = Duration::from_secs(duration_secs.unwrap_or(10));
42    let job_id = bind::bind_relation(&target, &handler_args)?;
43
44    let meta_client = handler_args.session.env().meta_client();
45    let fragments = net::get_fragments(meta_client, job_id).await?;
46    let dispatcher_fragment_ids = fragments.iter().map(|f| f.id).collect::<Vec<_>>();
47    let fragment_parallelisms = fragments
48        .iter()
49        .map(|f| (f.id, f.actors.len()))
50        .collect::<HashMap<_, _>>();
51    let (root_node, adjacency_list) = extract_stream_node_infos(fragments);
52    let (executor_ids, operator_to_executor) = extract_executor_infos(&adjacency_list);
53
54    let worker_nodes = net::list_stream_worker_nodes(handler_args.session.env()).await?;
55
56    let executor_stats = net::get_executor_stats(
57        &handler_args,
58        &worker_nodes,
59        &executor_ids,
60        &dispatcher_fragment_ids,
61        profiling_duration,
62    )
63    .await?;
64    tracing::debug!(?executor_stats, "collected executor stats");
65    let aggregated_stats = metrics::OperatorStats::aggregate(
66        operator_to_executor,
67        &executor_stats,
68        &fragment_parallelisms,
69    );
70    tracing::debug!(?aggregated_stats, "collected aggregated stats");
71
72    // Render graph with metrics
73    let rows = render_graph_with_metrics(
74        &adjacency_list,
75        root_node,
76        &aggregated_stats,
77        &profiling_duration,
78    );
79    let builder = RwPgResponseBuilder::empty(StatementType::EXPLAIN);
80    let builder = builder.rows(rows);
81    Ok(builder.into())
82}
83
84/// Binding pass, since we don't go through the binder.
85/// TODO(noel): Should this be in binder? But it may make compilation slower and doesn't require any binder logic...
86mod bind {
87    use risingwave_sqlparser::ast::AnalyzeTarget;
88
89    use crate::Binder;
90    use crate::catalog::root_catalog::SchemaPath;
91    use crate::error::Result;
92    use crate::handler::HandlerArgs;
93
94    /// Bind the analyze target relation to its actual id.
95    pub(super) fn bind_relation(
96        target_relation: &AnalyzeTarget,
97        handler_args: &HandlerArgs,
98    ) -> Result<u32> {
99        let job_id = match &target_relation {
100            AnalyzeTarget::Id(id) => *id,
101            AnalyzeTarget::Index(name)
102            | AnalyzeTarget::Table(name)
103            | AnalyzeTarget::Sink(name)
104            | AnalyzeTarget::MaterializedView(name) => {
105                let session = &handler_args.session;
106                let db_name = session.database();
107                let (schema_name, name) =
108                    Binder::resolve_schema_qualified_name(&db_name, name.clone())?;
109                let search_path = session.config().search_path();
110                let user_name = &session.user_name();
111                let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
112
113                let catalog_reader = handler_args.session.env().catalog_reader();
114                let catalog = catalog_reader.read_guard();
115
116                match target_relation {
117                    AnalyzeTarget::Index(_) => {
118                        let (catalog, _schema_name) =
119                            catalog.get_index_by_name(&db_name, schema_path, &name)?;
120                        catalog.id.index_id
121                    }
122                    AnalyzeTarget::Table(_) => {
123                        let (catalog, _schema_name) =
124                            catalog.get_any_table_by_name(&db_name, schema_path, &name)?;
125                        catalog.id.table_id
126                    }
127                    AnalyzeTarget::Sink(_) => {
128                        let (catalog, _schema_name) =
129                            catalog.get_sink_by_name(&db_name, schema_path, &name)?;
130                        catalog.id.sink_id
131                    }
132                    AnalyzeTarget::MaterializedView(_) => {
133                        let (catalog, _schema_name) =
134                            catalog.get_any_table_by_name(&db_name, schema_path, &name)?;
135                        catalog.id.table_id
136                    }
137                    AnalyzeTarget::Id(_) => unreachable!(),
138                }
139            }
140        };
141        Ok(job_id)
142    }
143}
144
145/// Utilities for fetching stats from CN
146mod net {
147    use risingwave_pb::common::WorkerNode;
148    use risingwave_pb::meta::list_table_fragments_response::FragmentInfo;
149    use risingwave_pb::monitor_service::GetProfileStatsRequest;
150    use tokio::time::{Duration, sleep};
151
152    use crate::error::Result;
153    use crate::handler::HandlerArgs;
154    use crate::handler::explain_analyze_stream_job::graph::ExecutorId;
155    use crate::handler::explain_analyze_stream_job::metrics::ExecutorStats;
156    use crate::meta_client::FrontendMetaClient;
157    use crate::session::FrontendEnv;
158
159    pub(super) async fn list_stream_worker_nodes(env: &FrontendEnv) -> Result<Vec<WorkerNode>> {
160        let worker_nodes = env.meta_client().list_all_nodes().await?;
161        let stream_worker_nodes = worker_nodes
162            .into_iter()
163            .filter(|node| {
164                node.property
165                    .as_ref()
166                    .map(|p| p.is_streaming)
167                    .unwrap_or_else(|| false)
168            })
169            .collect::<Vec<_>>();
170        Ok(stream_worker_nodes)
171    }
172
173    // TODO(kwannoel): Only fetch the names, actor_ids and graph of the fragments
174    pub(super) async fn get_fragments(
175        meta_client: &dyn FrontendMetaClient,
176        job_id: u32,
177    ) -> Result<Vec<FragmentInfo>> {
178        let mut fragment_map = meta_client.list_table_fragments(&[job_id]).await?;
179        assert_eq!(fragment_map.len(), 1, "expected only one fragment");
180        let (fragment_job_id, table_fragment_info) = fragment_map.drain().next().unwrap();
181        assert_eq!(fragment_job_id, job_id);
182        Ok(table_fragment_info.fragments)
183    }
184
185    pub(super) async fn get_executor_stats(
186        handler_args: &HandlerArgs,
187        worker_nodes: &[WorkerNode],
188        executor_ids: &[ExecutorId],
189        dispatcher_fragment_ids: &[u32],
190        profiling_duration: Duration,
191    ) -> Result<ExecutorStats> {
192        let mut aggregated_stats = ExecutorStats::new();
193        for node in worker_nodes {
194            let mut compute_client = handler_args.session.env().client_pool().get(node).await?;
195            let stats = compute_client
196                .monitor_client
197                .get_profile_stats(GetProfileStatsRequest {
198                    executor_ids: executor_ids.into(),
199                    dispatcher_fragment_ids: dispatcher_fragment_ids.into(),
200                })
201                .await
202                .expect("get profiling stats failed");
203            aggregated_stats.start_record(
204                executor_ids,
205                dispatcher_fragment_ids,
206                &stats.into_inner(),
207            );
208        }
209
210        sleep(profiling_duration).await;
211
212        for node in worker_nodes {
213            let mut compute_client = handler_args.session.env().client_pool().get(node).await?;
214            let stats = compute_client
215                .monitor_client
216                .get_profile_stats(GetProfileStatsRequest {
217                    executor_ids: executor_ids.into(),
218                    dispatcher_fragment_ids: dispatcher_fragment_ids.into(),
219                })
220                .await
221                .expect("get profiling stats failed");
222            aggregated_stats.finish_record(
223                executor_ids,
224                dispatcher_fragment_ids,
225                &stats.into_inner(),
226            );
227        }
228
229        Ok(aggregated_stats)
230    }
231}
232
233/// Profiling metrics data structure and utilities
234/// We have 2 stages of metric collection:
235/// 1. Collect the stream node metrics at the **Executor** level.
236/// 2. Merge the stream node metrics into **Operator** level, avg, max, min, etc...
237mod metrics {
238    use std::collections::HashMap;
239
240    use risingwave_pb::monitor_service::GetProfileStatsResponse;
241
242    use crate::catalog::FragmentId;
243    use crate::handler::explain_analyze_stream_job::graph::{ExecutorId, OperatorId};
244
245    #[derive(Default, Debug)]
246    pub(super) struct ExecutorMetrics {
247        pub executor_id: ExecutorId,
248        pub epoch: u32,
249        pub total_output_throughput: u64,
250        pub total_output_pending_ns: u64,
251    }
252
253    #[derive(Default, Debug)]
254    pub(super) struct DispatchMetrics {
255        pub fragment_id: FragmentId,
256        pub epoch: u32,
257        pub total_output_throughput: u64,
258        pub total_output_pending_ns: u64,
259    }
260
261    #[derive(Debug)]
262    pub(super) struct ExecutorStats {
263        executor_stats: HashMap<ExecutorId, ExecutorMetrics>,
264        dispatch_stats: HashMap<FragmentId, DispatchMetrics>,
265    }
266
267    impl ExecutorStats {
268        pub(super) fn new() -> Self {
269            ExecutorStats {
270                executor_stats: HashMap::new(),
271                dispatch_stats: HashMap::new(),
272            }
273        }
274
275        pub fn get(&self, executor_id: &ExecutorId) -> Option<&ExecutorMetrics> {
276            self.executor_stats.get(executor_id)
277        }
278
279        /// Establish metrics baseline for profiling
280        pub(super) fn start_record<'a>(
281            &mut self,
282            executor_ids: &'a [ExecutorId],
283            dispatch_fragment_ids: &'a [FragmentId],
284            metrics: &'a GetProfileStatsResponse,
285        ) {
286            for executor_id in executor_ids {
287                let stats = self.executor_stats.entry(*executor_id).or_default();
288                stats.executor_id = *executor_id;
289                stats.epoch = 0;
290                stats.total_output_throughput += metrics
291                    .stream_node_output_row_count
292                    .get(executor_id)
293                    .cloned()
294                    .unwrap_or(0);
295                stats.total_output_pending_ns += metrics
296                    .stream_node_output_blocking_duration_ns
297                    .get(executor_id)
298                    .cloned()
299                    .unwrap_or(0);
300            }
301
302            for fragment_id in dispatch_fragment_ids {
303                let stats = self.dispatch_stats.entry(*fragment_id).or_default();
304                stats.fragment_id = *fragment_id;
305                stats.epoch = 0;
306                stats.total_output_throughput += metrics
307                    .dispatch_fragment_output_row_count
308                    .get(fragment_id)
309                    .cloned()
310                    .unwrap_or(0);
311                stats.total_output_pending_ns += metrics
312                    .dispatch_fragment_output_blocking_duration_ns
313                    .get(fragment_id)
314                    .cloned()
315                    .unwrap_or(0);
316            }
317        }
318
319        /// Compute the deltas for reporting
320        pub(super) fn finish_record<'a>(
321            &mut self,
322            executor_ids: &'a [ExecutorId],
323            dispatch_fragment_ids: &'a [FragmentId],
324            metrics: &'a GetProfileStatsResponse,
325        ) {
326            for executor_id in executor_ids {
327                if let Some(stats) = self.executor_stats.get_mut(executor_id) {
328                    stats.total_output_throughput = metrics
329                        .stream_node_output_row_count
330                        .get(executor_id)
331                        .cloned()
332                        .unwrap_or(0)
333                        - stats.total_output_throughput;
334                    stats.total_output_pending_ns = metrics
335                        .stream_node_output_blocking_duration_ns
336                        .get(executor_id)
337                        .cloned()
338                        .unwrap_or(0)
339                        - stats.total_output_pending_ns;
340                } else {
341                    // TODO: warn missing metrics!
342                }
343            }
344
345            for fragment_id in dispatch_fragment_ids {
346                if let Some(stats) = self.dispatch_stats.get_mut(fragment_id) {
347                    stats.total_output_throughput = metrics
348                        .dispatch_fragment_output_row_count
349                        .get(fragment_id)
350                        .cloned()
351                        .unwrap_or(0)
352                        - stats.total_output_throughput;
353                    stats.total_output_pending_ns = metrics
354                        .dispatch_fragment_output_blocking_duration_ns
355                        .get(fragment_id)
356                        .cloned()
357                        .unwrap_or(0)
358                        - stats.total_output_pending_ns;
359                } else {
360                    // TODO: warn missing metrics!
361                }
362            }
363        }
364    }
365
366    #[expect(dead_code)]
367    #[derive(Debug)]
368    pub(super) struct OperatorMetrics {
369        pub operator_id: OperatorId,
370        pub epoch: u32,
371        pub total_output_throughput: u64,
372        pub total_output_pending_ns: u64,
373    }
374
375    #[derive(Debug)]
376    pub(super) struct OperatorStats {
377        inner: HashMap<OperatorId, OperatorMetrics>,
378    }
379
380    impl OperatorStats {
381        /// Aggregates executor-level stats into operator-level stats
382        pub(super) fn aggregate(
383            operator_map: HashMap<OperatorId, Vec<ExecutorId>>,
384            executor_stats: &ExecutorStats,
385            fragment_parallelisms: &HashMap<FragmentId, usize>,
386        ) -> Self {
387            let mut operator_stats = HashMap::new();
388            for (operator_id, executor_ids) in operator_map {
389                let num_executors = executor_ids.len() as u64;
390                let mut total_output_throughput = 0;
391                let mut total_output_pending_ns = 0;
392                for executor_id in executor_ids {
393                    if let Some(stats) = executor_stats.get(&executor_id) {
394                        total_output_throughput += stats.total_output_throughput;
395                        total_output_pending_ns += stats.total_output_pending_ns;
396                    }
397                }
398                let total_output_throughput = total_output_throughput;
399                let total_output_pending_ns = total_output_pending_ns / num_executors;
400
401                operator_stats.insert(
402                    operator_id,
403                    OperatorMetrics {
404                        operator_id,
405                        epoch: 0,
406                        total_output_throughput,
407                        total_output_pending_ns,
408                    },
409                );
410            }
411
412            for (fragment_id, dispatch_metrics) in &executor_stats.dispatch_stats {
413                let operator_id = *fragment_id as OperatorId;
414                let total_output_throughput = dispatch_metrics.total_output_throughput;
415                let fragment_parallelism = fragment_parallelisms
416                    .get(fragment_id)
417                    .copied()
418                    .expect("should have fragment parallelism");
419                let total_output_pending_ns =
420                    dispatch_metrics.total_output_pending_ns / fragment_parallelism as u64;
421
422                operator_stats.insert(
423                    operator_id,
424                    OperatorMetrics {
425                        operator_id,
426                        epoch: 0,
427                        total_output_throughput,
428                        total_output_pending_ns,
429                    },
430                );
431            }
432
433            OperatorStats {
434                inner: operator_stats,
435            }
436        }
437
438        pub fn get(&self, operator_id: &OperatorId) -> Option<&OperatorMetrics> {
439            self.inner.get(operator_id)
440        }
441    }
442}
443
444/// Utilities for the stream node graph:
445/// rendering, extracting, etc.
446mod graph {
447    use std::collections::{HashMap, HashSet};
448    use std::time::Duration;
449
450    use risingwave_common::operator::{
451        unique_executor_id_from_unique_operator_id, unique_operator_id,
452    };
453    use risingwave_pb::meta::list_table_fragments_response::FragmentInfo;
454    use risingwave_pb::stream_plan::stream_node::NodeBody;
455    use risingwave_pb::stream_plan::{MergeNode, StreamNode as PbStreamNode};
456
457    use crate::handler::explain_analyze_stream_job::ExplainAnalyzeStreamJobOutput;
458    use crate::handler::explain_analyze_stream_job::metrics::OperatorStats;
459
460    pub(super) type OperatorId = u64;
461    pub(super) type ExecutorId = u64;
462
463    /// This is an internal struct used ONLY for explain analyze stream job.
464    #[derive(Debug)]
465    pub(super) struct StreamNode {
466        operator_id: OperatorId,
467        fragment_id: u32,
468        identity: String,
469        actor_ids: Vec<u32>,
470        dependencies: Vec<u64>,
471    }
472
473    impl StreamNode {
474        fn new_for_dispatcher(fragment_id: u32) -> Self {
475            StreamNode {
476                operator_id: fragment_id as u64,
477                fragment_id,
478                identity: "Dispatcher".to_owned(),
479                actor_ids: vec![],
480                dependencies: vec![],
481            }
482        }
483    }
484
485    /// Extracts the root node of the plan, as well as the adjacency list
486    pub(super) fn extract_stream_node_infos(
487        fragments: Vec<FragmentInfo>,
488    ) -> (OperatorId, HashMap<OperatorId, StreamNode>) {
489        // Finds root nodes of the graph
490
491        fn find_root_nodes(stream_nodes: &HashMap<u64, StreamNode>) -> HashSet<u64> {
492            let mut all_nodes = stream_nodes.keys().copied().collect::<HashSet<_>>();
493            for node in stream_nodes.values() {
494                for dependency in &node.dependencies {
495                    all_nodes.remove(dependency);
496                }
497            }
498            all_nodes
499        }
500
501        // Recursively extracts stream node info, and builds an adjacency list between stream nodes
502        // and their dependencies
503        fn extract_stream_node_info(
504            fragment_id: u32,
505            fragment_id_to_merge_operator_id: &mut HashMap<u32, OperatorId>,
506            operator_id_to_stream_node: &mut HashMap<OperatorId, StreamNode>,
507            node: &PbStreamNode,
508            actor_id: u32,
509        ) {
510            let identity = node
511                .identity
512                .split_ascii_whitespace()
513                .next()
514                .unwrap()
515                .to_owned();
516            let operator_id = unique_operator_id(fragment_id, node.operator_id);
517            if let Some(merge_node) = node.node_body.as_ref()
518                && let NodeBody::Merge(box MergeNode {
519                    upstream_fragment_id,
520                    ..
521                }) = merge_node
522            {
523                fragment_id_to_merge_operator_id.insert(*upstream_fragment_id, operator_id);
524            }
525            let dependencies = &node.input;
526            let entry = operator_id_to_stream_node
527                .entry(operator_id)
528                .or_insert_with(|| {
529                    let dependencies = dependencies
530                        .iter()
531                        .map(|input| unique_operator_id(fragment_id, input.operator_id))
532                        .collect();
533                    StreamNode {
534                        operator_id,
535                        fragment_id,
536                        identity,
537                        actor_ids: vec![],
538                        dependencies,
539                    }
540                });
541            entry.actor_ids.push(actor_id);
542            for dependency in dependencies {
543                extract_stream_node_info(
544                    fragment_id,
545                    fragment_id_to_merge_operator_id,
546                    operator_id_to_stream_node,
547                    dependency,
548                    actor_id,
549                );
550            }
551        }
552
553        // build adjacency list and hanging merge edges.
554        // hanging merge edges will be filled in the following section.
555        let mut operator_id_to_stream_node = HashMap::new();
556        let mut fragment_id_to_merge_operator_id = HashMap::new();
557        for fragment in fragments {
558            let actors = fragment.actors;
559            for actor in actors {
560                let actor_id = actor.id;
561                let node = actor.node.unwrap();
562                extract_stream_node_info(
563                    fragment.id,
564                    &mut fragment_id_to_merge_operator_id,
565                    &mut operator_id_to_stream_node,
566                    &node,
567                    actor_id,
568                );
569            }
570        }
571
572        // find root node, and fill in dispatcher edges + nodes.
573        let root_or_dispatch_nodes = find_root_nodes(&operator_id_to_stream_node);
574        let mut root_node = None;
575        for operator_id in root_or_dispatch_nodes {
576            let node = operator_id_to_stream_node.get_mut(&operator_id).unwrap();
577            let fragment_id = node.fragment_id;
578            if let Some(merge_operator_id) = fragment_id_to_merge_operator_id.get(&fragment_id) {
579                let mut dispatcher = StreamNode::new_for_dispatcher(fragment_id);
580                dispatcher.dependencies.push(operator_id);
581                assert!(
582                    operator_id_to_stream_node
583                        .insert(fragment_id as _, dispatcher)
584                        .is_none()
585                );
586                operator_id_to_stream_node
587                    .get_mut(merge_operator_id)
588                    .unwrap()
589                    .dependencies
590                    .push(fragment_id as _);
591            } else {
592                root_node = Some(operator_id);
593            }
594        }
595
596        (root_node.unwrap(), operator_id_to_stream_node)
597    }
598
599    pub(super) fn extract_executor_infos(
600        adjacency_list: &HashMap<u64, StreamNode>,
601    ) -> (Vec<u64>, HashMap<u64, Vec<u64>>) {
602        let mut executor_ids: Vec<_> = Default::default();
603        let mut operator_to_executor: HashMap<_, _> = Default::default();
604        for node in adjacency_list.values() {
605            let operator_id = node.operator_id;
606            for actor_id in &node.actor_ids {
607                let executor_id =
608                    unique_executor_id_from_unique_operator_id(*actor_id, operator_id);
609                executor_ids.push(executor_id);
610                operator_to_executor
611                    .entry(operator_id)
612                    .or_insert_with(Vec::new)
613                    .push(executor_id);
614            }
615        }
616        (executor_ids, operator_to_executor)
617    }
618
619    // Do a DFS based rendering. Each node will occupy its own row.
620    // Schema:
621    // | Operator ID | Identity | Actor IDs | Metrics ... |
622    // Each node will be indented based on its depth in the graph.
623    pub(super) fn render_graph_with_metrics(
624        adjacency_list: &HashMap<u64, StreamNode>,
625        root_node: u64,
626        stats: &OperatorStats,
627        profiling_duration: &Duration,
628    ) -> Vec<ExplainAnalyzeStreamJobOutput> {
629        let profiling_duration_secs = profiling_duration.as_secs_f64();
630        let mut rows = vec![];
631        let mut stack = vec![(String::new(), true, root_node)];
632        while let Some((prefix, last_child, node_id)) = stack.pop() {
633            let Some(node) = adjacency_list.get(&node_id) else {
634                continue;
635            };
636            let is_root = node_id == root_node;
637
638            let identity_rendered = if is_root {
639                node.identity.clone()
640            } else {
641                let connector = if last_child { "└─ " } else { "├─ " };
642                format!("{}{}{}", prefix, connector, node.identity)
643            };
644
645            let child_prefix = if is_root {
646                ""
647            } else if last_child {
648                "   "
649            } else {
650                "│  "
651            };
652            let child_prefix = format!("{}{}", prefix, child_prefix);
653
654            let stats = stats.get(&node_id);
655            let (output_throughput, output_latency) = stats
656                .map(|stats| (stats.total_output_throughput, stats.total_output_pending_ns))
657                .unwrap_or((0, 0));
658            let row = ExplainAnalyzeStreamJobOutput {
659                identity: identity_rendered,
660                actor_ids: node
661                    .actor_ids
662                    .iter()
663                    .map(|id| id.to_string())
664                    .collect::<Vec<_>>()
665                    .join(","),
666                output_rows_per_second: (output_throughput as f64 / profiling_duration_secs)
667                    .to_string(),
668                downstream_backpressure_ratio: (Duration::from_nanos(output_latency).as_secs_f64()
669                    / usize::max(node.actor_ids.len(), 1) as f64
670                    / profiling_duration_secs)
671                    .to_string(),
672            };
673            rows.push(row);
674            for (position, dependency) in node.dependencies.iter().enumerate() {
675                stack.push((child_prefix.clone(), position == 0, *dependency));
676            }
677        }
678        rows
679    }
680}