1use std::collections::HashMap;
16
17use pgwire::pg_response::StatementType;
18use risingwave_common::types::Fields;
19use risingwave_sqlparser::ast::AnalyzeTarget;
20use tokio::time::Duration;
21
22use crate::error::Result;
23use crate::handler::explain_analyze_stream_job::graph::{
24 extract_executor_infos, extract_stream_node_infos, render_graph_with_metrics,
25};
26use crate::handler::{HandlerArgs, RwPgResponse, RwPgResponseBuilder, RwPgResponseBuilderExt};
27
28#[derive(Fields)]
29struct ExplainAnalyzeStreamJobOutput {
30 identity: String,
31 actor_ids: String,
32 output_rows_per_second: Option<String>,
33 downstream_backpressure_ratio: Option<String>,
34}
35
36pub async fn handle_explain_analyze_stream_job(
37 handler_args: HandlerArgs,
38 target: AnalyzeTarget,
39 duration_secs: Option<u64>,
40) -> Result<RwPgResponse> {
41 let profiling_duration = Duration::from_secs(duration_secs.unwrap_or(10));
42 let job_id = bind::bind_relation(&target, &handler_args)?;
43
44 let meta_client = handler_args.session.env().meta_client();
45 let fragments = net::get_fragments(meta_client, job_id).await?;
46 let dispatcher_fragment_ids = fragments.iter().map(|f| f.id).collect::<Vec<_>>();
47 let fragment_parallelisms = fragments
48 .iter()
49 .map(|f| (f.id, f.actors.len()))
50 .collect::<HashMap<_, _>>();
51 let (root_node, adjacency_list) = extract_stream_node_infos(fragments);
52 let (executor_ids, operator_to_executor) = extract_executor_infos(&adjacency_list);
53
54 let worker_nodes = net::list_stream_worker_nodes(handler_args.session.env()).await?;
55
56 let executor_stats = net::get_executor_stats(
57 &handler_args,
58 &worker_nodes,
59 &executor_ids,
60 &dispatcher_fragment_ids,
61 profiling_duration,
62 )
63 .await?;
64 tracing::debug!(?executor_stats, "collected executor stats");
65 let aggregated_stats = metrics::OperatorStats::aggregate(
66 operator_to_executor,
67 &executor_stats,
68 &fragment_parallelisms,
69 );
70 tracing::debug!(?aggregated_stats, "collected aggregated stats");
71
72 let rows = render_graph_with_metrics(
74 &adjacency_list,
75 root_node,
76 &aggregated_stats,
77 &profiling_duration,
78 );
79 let builder = RwPgResponseBuilder::empty(StatementType::EXPLAIN);
80 let builder = builder.rows(rows);
81 Ok(builder.into())
82}
83
84mod bind {
87 use risingwave_sqlparser::ast::AnalyzeTarget;
88
89 use crate::Binder;
90 use crate::catalog::root_catalog::SchemaPath;
91 use crate::error::Result;
92 use crate::handler::HandlerArgs;
93
94 pub(super) fn bind_relation(
96 target_relation: &AnalyzeTarget,
97 handler_args: &HandlerArgs,
98 ) -> Result<u32> {
99 let job_id = match &target_relation {
100 AnalyzeTarget::Id(id) => *id,
101 AnalyzeTarget::Index(name)
102 | AnalyzeTarget::Table(name)
103 | AnalyzeTarget::Sink(name)
104 | AnalyzeTarget::MaterializedView(name) => {
105 let session = &handler_args.session;
106 let db_name = session.database();
107 let (schema_name, name) =
108 Binder::resolve_schema_qualified_name(&db_name, name.clone())?;
109 let search_path = session.config().search_path();
110 let user_name = &session.user_name();
111 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
112
113 let catalog_reader = handler_args.session.env().catalog_reader();
114 let catalog = catalog_reader.read_guard();
115
116 match target_relation {
117 AnalyzeTarget::Index(_) => {
118 let (catalog, _schema_name) =
119 catalog.get_index_by_name(&db_name, schema_path, &name)?;
120 catalog.id.index_id
121 }
122 AnalyzeTarget::Table(_) => {
123 let (catalog, _schema_name) =
124 catalog.get_any_table_by_name(&db_name, schema_path, &name)?;
125 catalog.id.table_id
126 }
127 AnalyzeTarget::Sink(_) => {
128 let (catalog, _schema_name) =
129 catalog.get_sink_by_name(&db_name, schema_path, &name)?;
130 catalog.id.sink_id
131 }
132 AnalyzeTarget::MaterializedView(_) => {
133 let (catalog, _schema_name) =
134 catalog.get_any_table_by_name(&db_name, schema_path, &name)?;
135 catalog.id.table_id
136 }
137 AnalyzeTarget::Id(_) => unreachable!(),
138 }
139 }
140 };
141 Ok(job_id)
142 }
143}
144
145mod net {
147 use std::collections::HashSet;
148
149 use risingwave_pb::common::WorkerNode;
150 use risingwave_pb::meta::list_table_fragments_response::FragmentInfo;
151 use risingwave_pb::monitor_service::GetProfileStatsRequest;
152 use tokio::time::{Duration, sleep};
153
154 use crate::error::Result;
155 use crate::handler::HandlerArgs;
156 use crate::handler::explain_analyze_stream_job::graph::ExecutorId;
157 use crate::handler::explain_analyze_stream_job::metrics::ExecutorStats;
158 use crate::meta_client::FrontendMetaClient;
159 use crate::session::FrontendEnv;
160
161 pub(super) async fn list_stream_worker_nodes(env: &FrontendEnv) -> Result<Vec<WorkerNode>> {
162 let worker_nodes = env.meta_client().list_all_nodes().await?;
163 let stream_worker_nodes = worker_nodes
164 .into_iter()
165 .filter(|node| {
166 node.property
167 .as_ref()
168 .map(|p| p.is_streaming)
169 .unwrap_or_else(|| false)
170 })
171 .collect::<Vec<_>>();
172 Ok(stream_worker_nodes)
173 }
174
175 pub(super) async fn get_fragments(
177 meta_client: &dyn FrontendMetaClient,
178 job_id: u32,
179 ) -> Result<Vec<FragmentInfo>> {
180 let mut fragment_map = meta_client.list_table_fragments(&[job_id]).await?;
181 assert_eq!(fragment_map.len(), 1, "expected only one fragment");
182 let (fragment_job_id, table_fragment_info) = fragment_map.drain().next().unwrap();
183 assert_eq!(fragment_job_id, job_id);
184 Ok(table_fragment_info.fragments)
185 }
186
187 pub(super) async fn get_executor_stats(
188 handler_args: &HandlerArgs,
189 worker_nodes: &[WorkerNode],
190 executor_ids: &HashSet<ExecutorId>,
191 dispatcher_fragment_ids: &[u32],
192 profiling_duration: Duration,
193 ) -> Result<ExecutorStats> {
194 let mut aggregated_stats = ExecutorStats::new();
195 for node in worker_nodes {
196 let mut compute_client = handler_args.session.env().client_pool().get(node).await?;
197 let stats = compute_client
198 .monitor_client
199 .get_profile_stats(GetProfileStatsRequest {
200 executor_ids: executor_ids.iter().copied().collect(),
201 dispatcher_fragment_ids: dispatcher_fragment_ids.into(),
202 })
203 .await
204 .expect("get profiling stats failed");
205 aggregated_stats.start_record(
206 executor_ids,
207 dispatcher_fragment_ids,
208 &stats.into_inner(),
209 );
210 }
211
212 sleep(profiling_duration).await;
213
214 for node in worker_nodes {
215 let mut compute_client = handler_args.session.env().client_pool().get(node).await?;
216 let stats = compute_client
217 .monitor_client
218 .get_profile_stats(GetProfileStatsRequest {
219 executor_ids: executor_ids.iter().copied().collect(),
220 dispatcher_fragment_ids: dispatcher_fragment_ids.into(),
221 })
222 .await
223 .expect("get profiling stats failed");
224 aggregated_stats.finish_record(
225 executor_ids,
226 dispatcher_fragment_ids,
227 &stats.into_inner(),
228 );
229 }
230
231 Ok(aggregated_stats)
232 }
233}
234
235mod metrics {
240 use std::collections::{HashMap, HashSet};
241
242 use risingwave_pb::monitor_service::GetProfileStatsResponse;
243
244 use crate::catalog::FragmentId;
245 use crate::handler::explain_analyze_stream_job::graph::{ExecutorId, OperatorId};
246 use crate::handler::explain_analyze_stream_job::utils::operator_id_for_dispatch;
247
248 #[expect(dead_code)]
249 #[derive(Default, Debug)]
250 pub(super) struct ExecutorMetrics {
251 pub executor_id: ExecutorId,
252 pub epoch: u32,
253 pub total_output_throughput: u64,
254 pub total_output_pending_ns: u64,
255 }
256
257 #[derive(Default, Debug)]
258 pub(super) struct DispatchMetrics {
259 pub fragment_id: FragmentId,
260 pub epoch: u32,
261 pub total_output_throughput: u64,
262 pub total_output_pending_ns: u64,
263 }
264
265 #[derive(Debug)]
266 pub(super) struct ExecutorStats {
267 executor_stats: HashMap<ExecutorId, ExecutorMetrics>,
268 dispatch_stats: HashMap<FragmentId, DispatchMetrics>,
269 }
270
271 impl ExecutorStats {
272 pub(super) fn new() -> Self {
273 ExecutorStats {
274 executor_stats: HashMap::new(),
275 dispatch_stats: HashMap::new(),
276 }
277 }
278
279 pub fn get(&self, executor_id: &ExecutorId) -> Option<&ExecutorMetrics> {
280 self.executor_stats.get(executor_id)
281 }
282
283 pub(super) fn start_record<'a>(
285 &mut self,
286 executor_ids: &'a HashSet<ExecutorId>,
287 dispatch_fragment_ids: &'a [FragmentId],
288 metrics: &'a GetProfileStatsResponse,
289 ) {
290 for executor_id in executor_ids {
291 let Some(total_output_throughput) =
292 metrics.stream_node_output_row_count.get(executor_id)
293 else {
294 continue;
295 };
296 let Some(total_output_pending_ns) = metrics
297 .stream_node_output_blocking_duration_ns
298 .get(executor_id)
299 else {
300 continue;
301 };
302 let stats = ExecutorMetrics {
303 executor_id: *executor_id,
304 epoch: 0,
305 total_output_throughput: *total_output_throughput,
306 total_output_pending_ns: *total_output_pending_ns,
307 };
308 assert!(self.executor_stats.insert(*executor_id, stats).is_none());
311 }
312
313 for fragment_id in dispatch_fragment_ids {
314 let Some(total_output_throughput) =
315 metrics.dispatch_fragment_output_row_count.get(fragment_id)
316 else {
317 continue;
318 };
319 let Some(total_output_pending_ns) = metrics
320 .dispatch_fragment_output_blocking_duration_ns
321 .get(fragment_id)
322 else {
323 continue;
324 };
325 let stats = self.dispatch_stats.entry(*fragment_id).or_default();
326 stats.fragment_id = *fragment_id;
327 stats.epoch = 0;
328 stats.total_output_throughput += *total_output_throughput;
332 stats.total_output_pending_ns += *total_output_pending_ns;
333 }
334 }
335
336 pub(super) fn finish_record<'a>(
338 &mut self,
339 executor_ids: &'a HashSet<ExecutorId>,
340 dispatch_fragment_ids: &'a [FragmentId],
341 metrics: &'a GetProfileStatsResponse,
342 ) {
343 for executor_id in executor_ids {
344 let Some(stats) = self.executor_stats.get_mut(executor_id) else {
345 continue;
346 };
347 let Some(total_output_throughput) =
348 metrics.stream_node_output_row_count.get(executor_id)
349 else {
350 continue;
351 };
352 let Some(total_output_pending_ns) = metrics
353 .stream_node_output_blocking_duration_ns
354 .get(executor_id)
355 else {
356 continue;
357 };
358 let Some(throughput_delta) =
359 total_output_throughput.checked_sub(stats.total_output_throughput)
360 else {
361 continue;
362 };
363 let Some(output_ns_delta) =
364 total_output_pending_ns.checked_sub(stats.total_output_pending_ns)
365 else {
366 continue;
367 };
368 stats.total_output_throughput = throughput_delta;
369 stats.total_output_pending_ns = output_ns_delta;
370 }
371
372 for fragment_id in dispatch_fragment_ids {
373 let Some(stats) = self.dispatch_stats.get_mut(fragment_id) else {
374 continue;
375 };
376 let Some(total_output_throughput) =
377 metrics.dispatch_fragment_output_row_count.get(fragment_id)
378 else {
379 continue;
380 };
381 let Some(total_output_pending_ns) = metrics
382 .dispatch_fragment_output_blocking_duration_ns
383 .get(fragment_id)
384 else {
385 continue;
386 };
387 let Some(throughput_delta) =
388 total_output_throughput.checked_sub(stats.total_output_throughput)
389 else {
390 continue;
391 };
392 let Some(output_ns_delta) =
393 total_output_pending_ns.checked_sub(stats.total_output_pending_ns)
394 else {
395 continue;
396 };
397 stats.total_output_throughput = throughput_delta;
398 stats.total_output_pending_ns = output_ns_delta;
399 }
400 }
401 }
402
403 #[expect(dead_code)]
404 #[derive(Debug)]
405 pub(super) struct OperatorMetrics {
406 pub operator_id: OperatorId,
407 pub epoch: u32,
408 pub total_output_throughput: u64,
409 pub total_output_pending_ns: u64,
410 }
411
412 #[derive(Debug)]
413 pub(super) struct OperatorStats {
414 inner: HashMap<OperatorId, OperatorMetrics>,
415 }
416
417 impl OperatorStats {
418 pub(super) fn aggregate(
420 operator_map: HashMap<OperatorId, HashSet<ExecutorId>>,
421 executor_stats: &ExecutorStats,
422 fragment_parallelisms: &HashMap<FragmentId, usize>,
423 ) -> Self {
424 let mut operator_stats = HashMap::new();
425 'operator_loop: for (operator_id, executor_ids) in operator_map {
426 let num_executors = executor_ids.len() as u64;
427 let mut total_output_throughput = 0;
428 let mut total_output_pending_ns = 0;
429 for executor_id in executor_ids {
430 if let Some(stats) = executor_stats.get(&executor_id) {
431 total_output_throughput += stats.total_output_throughput;
432 total_output_pending_ns += stats.total_output_pending_ns;
433 } else {
434 continue 'operator_loop;
436 }
437 }
438 let total_output_throughput = total_output_throughput;
439 let total_output_pending_ns = total_output_pending_ns / num_executors;
440
441 operator_stats.insert(
442 operator_id,
443 OperatorMetrics {
444 operator_id,
445 epoch: 0,
446 total_output_throughput,
447 total_output_pending_ns,
448 },
449 );
450 }
451
452 for (fragment_id, dispatch_metrics) in &executor_stats.dispatch_stats {
453 let operator_id = operator_id_for_dispatch(*fragment_id);
454 let total_output_throughput = dispatch_metrics.total_output_throughput;
455 let fragment_parallelism = fragment_parallelisms
456 .get(fragment_id)
457 .copied()
458 .expect("should have fragment parallelism");
459 let total_output_pending_ns =
460 dispatch_metrics.total_output_pending_ns / fragment_parallelism as u64;
461
462 operator_stats.insert(
463 operator_id,
464 OperatorMetrics {
465 operator_id,
466 epoch: 0,
467 total_output_throughput,
468 total_output_pending_ns,
469 },
470 );
471 }
472
473 OperatorStats {
474 inner: operator_stats,
475 }
476 }
477
478 pub fn get(&self, operator_id: &OperatorId) -> Option<&OperatorMetrics> {
479 self.inner.get(operator_id)
480 }
481 }
482}
483
484mod graph {
487 use std::collections::{HashMap, HashSet};
488 use std::time::Duration;
489
490 use itertools::Itertools;
491 use risingwave_common::operator::{
492 unique_executor_id_from_unique_operator_id, unique_operator_id,
493 };
494 use risingwave_pb::meta::list_table_fragments_response::FragmentInfo;
495 use risingwave_pb::stream_plan::stream_node::{NodeBody, NodeBodyDiscriminants};
496 use risingwave_pb::stream_plan::{MergeNode, StreamNode as PbStreamNode};
497
498 use crate::handler::explain_analyze_stream_job::ExplainAnalyzeStreamJobOutput;
499 use crate::handler::explain_analyze_stream_job::metrics::OperatorStats;
500 use crate::handler::explain_analyze_stream_job::utils::operator_id_for_dispatch;
501 pub(super) type OperatorId = u64;
502 pub(super) type ExecutorId = u64;
503
504 #[derive(Debug)]
506 pub(super) struct StreamNode {
507 operator_id: OperatorId,
508 fragment_id: u32,
509 identity: NodeBodyDiscriminants,
510 actor_ids: HashSet<u32>,
511 dependencies: Vec<u64>,
512 }
513
514 impl StreamNode {
515 fn new_for_dispatcher(fragment_id: u32) -> Self {
516 StreamNode {
517 operator_id: operator_id_for_dispatch(fragment_id),
518 fragment_id,
519 identity: NodeBodyDiscriminants::Exchange,
520 actor_ids: Default::default(),
521 dependencies: Default::default(),
522 }
523 }
524 }
525
526 pub(super) fn extract_stream_node_infos(
528 fragments: Vec<FragmentInfo>,
529 ) -> (OperatorId, HashMap<OperatorId, StreamNode>) {
530 fn find_root_nodes(stream_nodes: &HashMap<u64, StreamNode>) -> HashSet<u64> {
533 let mut all_nodes = stream_nodes.keys().copied().collect::<HashSet<_>>();
534 for node in stream_nodes.values() {
535 for dependency in &node.dependencies {
536 all_nodes.remove(dependency);
537 }
538 }
539 all_nodes
540 }
541
542 fn extract_stream_node_info(
545 fragment_id: u32,
546 fragment_id_to_merge_operator_id: &mut HashMap<u32, OperatorId>,
547 operator_id_to_stream_node: &mut HashMap<OperatorId, StreamNode>,
548 node: &PbStreamNode,
549 actor_ids: &HashSet<u32>,
550 ) {
551 let identity = node
552 .node_body
553 .as_ref()
554 .expect("should have node body")
555 .into();
556 let operator_id = unique_operator_id(fragment_id, node.operator_id);
557 if let Some(merge_node) = node.node_body.as_ref()
558 && let NodeBody::Merge(box MergeNode {
559 upstream_fragment_id,
560 ..
561 }) = merge_node
562 {
563 fragment_id_to_merge_operator_id.insert(*upstream_fragment_id, operator_id);
564 }
565 let dependencies = &node.input;
566 let dependency_ids = dependencies
567 .iter()
568 .map(|input| unique_operator_id(fragment_id, input.operator_id))
569 .collect::<Vec<_>>();
570 operator_id_to_stream_node.insert(
571 operator_id,
572 StreamNode {
573 operator_id,
574 fragment_id,
575 identity,
576 actor_ids: actor_ids.clone(),
577 dependencies: dependency_ids,
578 },
579 );
580 for dependency in dependencies {
581 extract_stream_node_info(
582 fragment_id,
583 fragment_id_to_merge_operator_id,
584 operator_id_to_stream_node,
585 dependency,
586 actor_ids,
587 );
588 }
589 }
590
591 let mut operator_id_to_stream_node = HashMap::new();
594 let mut fragment_id_to_merge_operator_id = HashMap::new();
595 for fragment in fragments {
596 let actors = fragment.actors;
597 assert!(
598 !actors.is_empty(),
599 "fragment {} should have at least one actor",
600 fragment.id
601 );
602 let actor_ids = actors.iter().map(|actor| actor.id).collect::<HashSet<_>>();
603 let node = actors[0].node.as_ref().expect("should have stream node");
604 extract_stream_node_info(
605 fragment.id,
606 &mut fragment_id_to_merge_operator_id,
607 &mut operator_id_to_stream_node,
608 node,
609 &actor_ids,
610 );
611 }
612
613 let root_or_dispatch_nodes = find_root_nodes(&operator_id_to_stream_node);
615 let mut root_node = None;
616 for operator_id in root_or_dispatch_nodes {
617 let node = operator_id_to_stream_node.get_mut(&operator_id).unwrap();
618 let fragment_id = node.fragment_id;
619 if let Some(merge_operator_id) = fragment_id_to_merge_operator_id.get(&fragment_id) {
620 let mut dispatcher = StreamNode::new_for_dispatcher(fragment_id);
621 let operator_id_for_dispatch = dispatcher.operator_id;
622 dispatcher.dependencies.push(operator_id);
623 assert!(
624 operator_id_to_stream_node
625 .insert(operator_id_for_dispatch as _, dispatcher)
626 .is_none()
627 );
628 operator_id_to_stream_node
629 .get_mut(merge_operator_id)
630 .unwrap()
631 .dependencies
632 .push(operator_id_for_dispatch as _)
633 } else {
634 root_node = Some(operator_id);
635 }
636 }
637
638 (root_node.unwrap(), operator_id_to_stream_node)
639 }
640
641 pub(super) fn extract_executor_infos(
642 adjacency_list: &HashMap<OperatorId, StreamNode>,
643 ) -> (HashSet<u64>, HashMap<u64, HashSet<u64>>) {
644 let mut executor_ids = HashSet::new();
645 let mut operator_to_executor = HashMap::new();
646 for (operator_id, node) in adjacency_list {
647 assert_eq!(*operator_id, node.operator_id);
648 let operator_id = node.operator_id;
649 for actor_id in &node.actor_ids {
650 let executor_id =
651 unique_executor_id_from_unique_operator_id(*actor_id, operator_id);
652 assert!(executor_ids.insert(executor_id));
653 assert!(
654 operator_to_executor
655 .entry(operator_id)
656 .or_insert_with(HashSet::new)
657 .insert(executor_id)
658 );
659 }
660 }
661 (executor_ids, operator_to_executor)
662 }
663
664 pub(super) fn render_graph_with_metrics(
669 adjacency_list: &HashMap<u64, StreamNode>,
670 root_node: u64,
671 stats: &OperatorStats,
672 profiling_duration: &Duration,
673 ) -> Vec<ExplainAnalyzeStreamJobOutput> {
674 let profiling_duration_secs = profiling_duration.as_secs_f64();
675 let mut rows = vec![];
676 let mut stack = vec![(String::new(), true, root_node)];
677 while let Some((prefix, last_child, node_id)) = stack.pop() {
678 let Some(node) = adjacency_list.get(&node_id) else {
679 continue;
680 };
681 let is_root = node_id == root_node;
682
683 let identity_rendered = if is_root {
684 node.identity.to_string()
685 } else {
686 let connector = if last_child { "└─ " } else { "├─ " };
687 format!("{}{}{}", prefix, connector, node.identity)
688 };
689
690 let child_prefix = if is_root {
691 ""
692 } else if last_child {
693 " "
694 } else {
695 "│ "
696 };
697 let child_prefix = format!("{}{}", prefix, child_prefix);
698
699 let stats = stats.get(&node_id);
700 let (output_rows_per_second, downstream_backpressure_ratio) = match stats {
701 Some(stats) => (
702 Some(
703 (stats.total_output_throughput as f64 / profiling_duration_secs)
704 .to_string(),
705 ),
706 Some(
707 (Duration::from_nanos(stats.total_output_pending_ns).as_secs_f64()
708 / usize::max(node.actor_ids.len(), 1) as f64
709 / profiling_duration_secs)
710 .to_string(),
711 ),
712 ),
713 None => (None, None),
714 };
715 let row = ExplainAnalyzeStreamJobOutput {
716 identity: identity_rendered,
717 actor_ids: node
718 .actor_ids
719 .iter()
720 .sorted()
721 .map(|id| id.to_string())
722 .collect::<Vec<_>>()
723 .join(","),
724 output_rows_per_second,
725 downstream_backpressure_ratio,
726 };
727 rows.push(row);
728 for (position, dependency) in node.dependencies.iter().enumerate() {
729 stack.push((child_prefix.clone(), position == 0, *dependency));
730 }
731 }
732 rows
733 }
734}
735
736mod utils {
737 use risingwave_common::operator::unique_operator_id;
738
739 use crate::handler::explain_analyze_stream_job::graph::OperatorId;
740
741 pub(super) fn operator_id_for_dispatch(fragment_id: u32) -> OperatorId {
742 unique_operator_id(fragment_id, u32::MAX as u64)
743 }
744}