1use std::collections::HashMap;
16
17use pgwire::pg_response::StatementType;
18use risingwave_common::types::Fields;
19use risingwave_sqlparser::ast::AnalyzeTarget;
20use tokio::time::Duration;
21
22use crate::error::Result;
23use crate::handler::explain_analyze_stream_job::graph::{
24 extract_executor_infos, extract_stream_node_infos, render_graph_with_metrics,
25};
26use crate::handler::{HandlerArgs, RwPgResponse, RwPgResponseBuilder, RwPgResponseBuilderExt};
27
28#[macro_export]
29macro_rules! debug_panic_or_warn {
30 ($($arg:tt)*) => {
31 if cfg!(debug_assertions) || cfg!(madsim) {
32 panic!($($arg)*);
33 } else {
34 tracing::warn!($($arg)*);
35 }
36 };
37}
38
39#[derive(Fields)]
40struct ExplainAnalyzeStreamJobOutput {
41 identity: String,
42 actor_ids: String,
43 output_rows_per_second: Option<String>,
44 downstream_backpressure_ratio: Option<String>,
45}
46
47pub async fn handle_explain_analyze_stream_job(
48 handler_args: HandlerArgs,
49 target: AnalyzeTarget,
50 duration_secs: Option<u64>,
51) -> Result<RwPgResponse> {
52 let profiling_duration = Duration::from_secs(duration_secs.unwrap_or(10));
53 let job_id = bind::bind_relation(&target, &handler_args)?;
54
55 let meta_client = handler_args.session.env().meta_client();
56 let fragments = net::get_fragments(meta_client, job_id).await?;
57 let fragment_parallelisms = fragments
58 .iter()
59 .map(|f| (f.id, f.actors.len()))
60 .collect::<HashMap<_, _>>();
61 let (root_node, dispatcher_fragment_ids, adjacency_list) = extract_stream_node_infos(fragments);
62 let (executor_ids, operator_to_executor) = extract_executor_infos(&adjacency_list);
63 tracing::debug!(
64 ?fragment_parallelisms,
65 ?root_node,
66 ?dispatcher_fragment_ids,
67 ?adjacency_list,
68 "explain analyze metadata"
69 );
70
71 let worker_nodes = net::list_stream_worker_nodes(handler_args.session.env()).await?;
72
73 let executor_stats = net::get_executor_stats(
74 &handler_args,
75 &worker_nodes,
76 &executor_ids,
77 &dispatcher_fragment_ids,
78 profiling_duration,
79 )
80 .await?;
81 tracing::debug!(?executor_stats, "collected executor stats");
82 let aggregated_stats = metrics::OperatorStats::aggregate(
83 operator_to_executor,
84 &executor_stats,
85 &fragment_parallelisms,
86 );
87 tracing::debug!(?aggregated_stats, "collected aggregated stats");
88
89 let rows = render_graph_with_metrics(
91 &adjacency_list,
92 root_node,
93 &aggregated_stats,
94 &profiling_duration,
95 );
96 let builder = RwPgResponseBuilder::empty(StatementType::EXPLAIN);
97 let builder = builder.rows(rows);
98 Ok(builder.into())
99}
100
101mod bind {
104 use risingwave_sqlparser::ast::AnalyzeTarget;
105
106 use crate::Binder;
107 use crate::catalog::root_catalog::SchemaPath;
108 use crate::error::Result;
109 use crate::handler::HandlerArgs;
110
111 pub(super) fn bind_relation(
113 target_relation: &AnalyzeTarget,
114 handler_args: &HandlerArgs,
115 ) -> Result<u32> {
116 let job_id = match &target_relation {
117 AnalyzeTarget::Id(id) => *id,
118 AnalyzeTarget::Index(name)
119 | AnalyzeTarget::Table(name)
120 | AnalyzeTarget::Sink(name)
121 | AnalyzeTarget::MaterializedView(name) => {
122 let session = &handler_args.session;
123 let db_name = session.database();
124 let (schema_name, name) = Binder::resolve_schema_qualified_name(&db_name, name)?;
125 let search_path = session.config().search_path();
126 let user_name = &session.user_name();
127 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
128
129 let catalog_reader = handler_args.session.env().catalog_reader();
130 let catalog = catalog_reader.read_guard();
131
132 match target_relation {
133 AnalyzeTarget::Index(_) => {
134 let (catalog, _schema_name) =
135 catalog.get_index_by_name(&db_name, schema_path, &name)?;
136 catalog.id.index_id
137 }
138 AnalyzeTarget::Table(_) => {
139 let (catalog, _schema_name) =
140 catalog.get_any_table_by_name(&db_name, schema_path, &name)?;
141 catalog.id.table_id
142 }
143 AnalyzeTarget::Sink(_) => {
144 let (catalog, _schema_name) =
145 catalog.get_any_sink_by_name(&db_name, schema_path, &name)?;
146 catalog.id.sink_id
147 }
148 AnalyzeTarget::MaterializedView(_) => {
149 let (catalog, _schema_name) =
150 catalog.get_any_table_by_name(&db_name, schema_path, &name)?;
151 catalog.id.table_id
152 }
153 AnalyzeTarget::Id(_) => unreachable!(),
154 }
155 }
156 };
157 Ok(job_id)
158 }
159}
160
161mod net {
163 use std::collections::HashSet;
164
165 use risingwave_pb::common::WorkerNode;
166 use risingwave_pb::meta::list_table_fragments_response::FragmentInfo;
167 use risingwave_pb::monitor_service::GetProfileStatsRequest;
168 use tokio::time::{Duration, sleep};
169
170 use crate::error::Result;
171 use crate::handler::HandlerArgs;
172 use crate::handler::explain_analyze_stream_job::graph::ExecutorId;
173 use crate::handler::explain_analyze_stream_job::metrics::ExecutorStats;
174 use crate::meta_client::FrontendMetaClient;
175 use crate::session::FrontendEnv;
176
177 pub(super) async fn list_stream_worker_nodes(env: &FrontendEnv) -> Result<Vec<WorkerNode>> {
178 let worker_nodes = env.meta_client().list_all_nodes().await?;
179 let stream_worker_nodes = worker_nodes
180 .into_iter()
181 .filter(|node| {
182 node.property
183 .as_ref()
184 .map(|p| p.is_streaming)
185 .unwrap_or_else(|| false)
186 })
187 .collect::<Vec<_>>();
188 Ok(stream_worker_nodes)
189 }
190
191 pub(super) async fn get_fragments(
193 meta_client: &dyn FrontendMetaClient,
194 job_id: u32,
195 ) -> Result<Vec<FragmentInfo>> {
196 let mut fragment_map = meta_client.list_table_fragments(&[job_id]).await?;
197 assert_eq!(fragment_map.len(), 1, "expected only one fragment");
198 let (fragment_job_id, table_fragment_info) = fragment_map.drain().next().unwrap();
199 assert_eq!(fragment_job_id, job_id);
200 Ok(table_fragment_info.fragments)
201 }
202
203 pub(super) async fn get_executor_stats(
204 handler_args: &HandlerArgs,
205 worker_nodes: &[WorkerNode],
206 executor_ids: &HashSet<ExecutorId>,
207 dispatcher_fragment_ids: &HashSet<u32>,
208 profiling_duration: Duration,
209 ) -> Result<ExecutorStats> {
210 let dispatcher_fragment_ids = dispatcher_fragment_ids.iter().copied().collect::<Vec<_>>();
211 let mut initial_aggregated_stats = ExecutorStats::new();
212 for node in worker_nodes {
213 let mut compute_client = handler_args.session.env().client_pool().get(node).await?;
214 let stats = compute_client
215 .monitor_client
216 .get_profile_stats(GetProfileStatsRequest {
217 executor_ids: executor_ids.iter().copied().collect(),
218 dispatcher_fragment_ids: dispatcher_fragment_ids.clone(),
219 })
220 .await
221 .expect("get profiling stats failed");
222 initial_aggregated_stats.record(
223 executor_ids,
224 &dispatcher_fragment_ids,
225 &stats.into_inner(),
226 );
227 }
228 tracing::debug!(?initial_aggregated_stats, "initial aggregated stats");
229
230 sleep(profiling_duration).await;
231
232 let mut final_aggregated_stats = ExecutorStats::new();
233 for node in worker_nodes {
234 let mut compute_client = handler_args.session.env().client_pool().get(node).await?;
235 let stats = compute_client
236 .monitor_client
237 .get_profile_stats(GetProfileStatsRequest {
238 executor_ids: executor_ids.iter().copied().collect(),
239 dispatcher_fragment_ids: dispatcher_fragment_ids.clone(),
240 })
241 .await
242 .expect("get profiling stats failed");
243 final_aggregated_stats.record(
244 executor_ids,
245 &dispatcher_fragment_ids,
246 &stats.into_inner(),
247 );
248 }
249 tracing::debug!(?final_aggregated_stats, "final aggregated stats");
250
251 let delta_aggregated_stats = ExecutorStats::get_delta(
252 &initial_aggregated_stats,
253 &final_aggregated_stats,
254 executor_ids,
255 &dispatcher_fragment_ids,
256 );
257
258 Ok(delta_aggregated_stats)
259 }
260}
261
262mod metrics {
267 use std::collections::{HashMap, HashSet};
268
269 use risingwave_common::operator::unique_executor_id_into_parts;
270 use risingwave_pb::monitor_service::GetProfileStatsResponse;
271
272 use crate::catalog::FragmentId;
273 use crate::handler::explain_analyze_stream_job::graph::{ExecutorId, OperatorId};
274 use crate::handler::explain_analyze_stream_job::utils::operator_id_for_dispatch;
275
276 #[expect(dead_code)]
277 #[derive(Default, Debug)]
278 pub(super) struct ExecutorMetrics {
279 pub executor_id: ExecutorId,
280 pub epoch: u32,
281 pub total_output_throughput: u64,
282 pub total_output_pending_ns: u64,
283 }
284
285 #[derive(Default, Debug)]
286 pub(super) struct DispatchMetrics {
287 pub fragment_id: FragmentId,
288 pub epoch: u32,
289 pub total_output_throughput: u64,
290 pub total_output_pending_ns: u64,
291 }
292
293 #[derive(Debug)]
294 pub(super) struct ExecutorStats {
295 executor_stats: HashMap<ExecutorId, ExecutorMetrics>,
296 dispatch_stats: HashMap<FragmentId, DispatchMetrics>,
297 }
298
299 impl ExecutorStats {
300 pub(super) fn new() -> Self {
301 ExecutorStats {
302 executor_stats: HashMap::new(),
303 dispatch_stats: HashMap::new(),
304 }
305 }
306
307 pub fn get(&self, executor_id: &ExecutorId) -> Option<&ExecutorMetrics> {
308 self.executor_stats.get(executor_id)
309 }
310
311 pub(super) fn record<'a>(
313 &mut self,
314 executor_ids: &'a HashSet<ExecutorId>,
315 dispatch_fragment_ids: &'a [FragmentId],
316 metrics: &'a GetProfileStatsResponse,
317 ) {
318 for executor_id in executor_ids {
319 let Some(total_output_throughput) =
320 metrics.stream_node_output_row_count.get(executor_id)
321 else {
322 continue;
323 };
324 let Some(total_output_pending_ns) = metrics
325 .stream_node_output_blocking_duration_ns
326 .get(executor_id)
327 else {
328 continue;
329 };
330 let stats = ExecutorMetrics {
331 executor_id: *executor_id,
332 epoch: 0,
333 total_output_throughput: *total_output_throughput,
334 total_output_pending_ns: *total_output_pending_ns,
335 };
336 if cfg!(madsim) {
339 self.executor_stats.insert(*executor_id, stats);
344 } else {
345 assert!(self.executor_stats.insert(*executor_id, stats).is_none());
346 }
347 }
348
349 for fragment_id in dispatch_fragment_ids {
350 let Some(total_output_throughput) =
351 metrics.dispatch_fragment_output_row_count.get(fragment_id)
352 else {
353 continue;
354 };
355 let Some(total_output_pending_ns) = metrics
356 .dispatch_fragment_output_blocking_duration_ns
357 .get(fragment_id)
358 else {
359 continue;
360 };
361 let stats = self.dispatch_stats.entry(*fragment_id).or_default();
362 stats.fragment_id = *fragment_id;
363 stats.epoch = 0;
364 stats.total_output_throughput += *total_output_throughput;
368 stats.total_output_pending_ns += *total_output_pending_ns;
369 }
370 }
371
372 pub(super) fn get_delta(
373 initial: &Self,
374 end: &Self,
375 executor_ids: &HashSet<ExecutorId>,
376 dispatch_fragment_ids: &[FragmentId],
377 ) -> Self {
378 let mut delta_aggregated_stats = Self::new();
379 for executor_id in executor_ids {
380 let (actor_id, operator_id) = unique_executor_id_into_parts(*executor_id);
381 let Some(initial_stats) = initial.executor_stats.get(executor_id) else {
382 debug_panic_or_warn!(
383 "missing initial stats for executor {} (actor {} operator {})",
384 executor_id,
385 actor_id,
386 operator_id
387 );
388 continue;
389 };
390 let Some(end_stats) = end.executor_stats.get(executor_id) else {
391 debug_panic_or_warn!(
392 "missing final stats for executor {} (actor {} operator {})",
393 executor_id,
394 actor_id,
395 operator_id
396 );
397 continue;
398 };
399
400 let initial_throughput = initial_stats.total_output_throughput;
401 let end_throughput = end_stats.total_output_throughput;
402 let Some(delta_throughput) = end_throughput.checked_sub(initial_throughput) else {
403 debug_panic_or_warn!(
404 "delta throughput is negative for actor {} operator {} (initial: {}, end: {})",
405 actor_id,
406 operator_id,
407 initial_throughput,
408 end_throughput
409 );
410 continue;
411 };
412
413 let initial_pending_ns = initial_stats.total_output_pending_ns;
414 let end_pending_ns = end_stats.total_output_pending_ns;
415 let Some(delta_pending_ns) = end_pending_ns.checked_sub(initial_pending_ns) else {
416 debug_panic_or_warn!(
417 "delta pending ns is negative for actor {} operator {} (initial: {}, end: {})",
418 actor_id,
419 operator_id,
420 initial_pending_ns,
421 end_pending_ns
422 );
423 continue;
424 };
425
426 let delta_stats = ExecutorMetrics {
427 executor_id: *executor_id,
428 epoch: 0,
429 total_output_throughput: delta_throughput,
430 total_output_pending_ns: delta_pending_ns,
431 };
432 delta_aggregated_stats
433 .executor_stats
434 .insert(*executor_id, delta_stats);
435 }
436
437 for fragment_id in dispatch_fragment_ids {
438 let Some(initial_stats) = initial.dispatch_stats.get(fragment_id) else {
439 debug_panic_or_warn!("missing initial stats for fragment {}", fragment_id);
440 continue;
441 };
442 let Some(end_stats) = end.dispatch_stats.get(fragment_id) else {
443 debug_panic_or_warn!("missing final stats for fragment {}", fragment_id);
444 continue;
445 };
446
447 let initial_throughput = initial_stats.total_output_throughput;
448 let end_throughput = end_stats.total_output_throughput;
449 let Some(delta_throughput) = end_throughput.checked_sub(initial_throughput) else {
450 debug_panic_or_warn!(
451 "delta throughput is negative for fragment {} (initial: {}, end: {})",
452 fragment_id,
453 initial_throughput,
454 end_throughput
455 );
456 continue;
457 };
458
459 let initial_pending_ns = initial_stats.total_output_pending_ns;
460 let end_pending_ns = end_stats.total_output_pending_ns;
461 let Some(delta_pending_ns) = end_pending_ns.checked_sub(initial_pending_ns) else {
462 debug_panic_or_warn!(
463 "delta pending ns is negative for fragment {} (initial: {}, end: {})",
464 fragment_id,
465 initial_pending_ns,
466 end_pending_ns
467 );
468 continue;
469 };
470
471 let delta_stats = DispatchMetrics {
472 fragment_id: *fragment_id,
473 epoch: 0,
474 total_output_throughput: delta_throughput,
475 total_output_pending_ns: delta_pending_ns,
476 };
477 delta_aggregated_stats
478 .dispatch_stats
479 .insert(*fragment_id, delta_stats);
480 }
481
482 delta_aggregated_stats
483 }
484 }
485
486 #[expect(dead_code)]
487 #[derive(Debug)]
488 pub(super) struct OperatorMetrics {
489 pub operator_id: OperatorId,
490 pub epoch: u32,
491 pub total_output_throughput: u64,
492 pub total_output_pending_ns: u64,
493 }
494
495 #[derive(Debug)]
496 pub(super) struct OperatorStats {
497 inner: HashMap<OperatorId, OperatorMetrics>,
498 }
499
500 impl OperatorStats {
501 pub(super) fn aggregate(
503 operator_map: HashMap<OperatorId, HashSet<ExecutorId>>,
504 executor_stats: &ExecutorStats,
505 fragment_parallelisms: &HashMap<FragmentId, usize>,
506 ) -> Self {
507 let mut operator_stats = HashMap::new();
508 'operator_loop: for (operator_id, executor_ids) in operator_map {
509 let num_executors = executor_ids.len() as u64;
510 let mut total_output_throughput = 0;
511 let mut total_output_pending_ns = 0;
512 for executor_id in executor_ids {
513 if let Some(stats) = executor_stats.get(&executor_id) {
514 total_output_throughput += stats.total_output_throughput;
515 total_output_pending_ns += stats.total_output_pending_ns;
516 } else {
517 continue 'operator_loop;
519 }
520 }
521 let total_output_throughput = total_output_throughput;
522 let total_output_pending_ns = total_output_pending_ns / num_executors;
523
524 operator_stats.insert(
525 operator_id,
526 OperatorMetrics {
527 operator_id,
528 epoch: 0,
529 total_output_throughput,
530 total_output_pending_ns,
531 },
532 );
533 }
534
535 for (fragment_id, dispatch_metrics) in &executor_stats.dispatch_stats {
536 let operator_id = operator_id_for_dispatch(*fragment_id);
537 let total_output_throughput = dispatch_metrics.total_output_throughput;
538 let Some(fragment_parallelism) = fragment_parallelisms.get(fragment_id) else {
539 debug_panic_or_warn!(
540 "missing fragment parallelism for fragment {}",
541 fragment_id
542 );
543 continue;
544 };
545 let total_output_pending_ns =
546 dispatch_metrics.total_output_pending_ns / *fragment_parallelism as u64;
547
548 operator_stats.insert(
549 operator_id,
550 OperatorMetrics {
551 operator_id,
552 epoch: 0,
553 total_output_throughput,
554 total_output_pending_ns,
555 },
556 );
557 }
558
559 OperatorStats {
560 inner: operator_stats,
561 }
562 }
563
564 pub fn get(&self, operator_id: &OperatorId) -> Option<&OperatorMetrics> {
565 self.inner.get(operator_id)
566 }
567 }
568}
569
570mod graph {
573 use std::collections::{HashMap, HashSet};
574 use std::fmt::Debug;
575 use std::time::Duration;
576
577 use itertools::Itertools;
578 use risingwave_common::operator::{
579 unique_executor_id_from_unique_operator_id, unique_operator_id,
580 unique_operator_id_into_parts,
581 };
582 use risingwave_pb::meta::list_table_fragments_response::FragmentInfo;
583 use risingwave_pb::stream_plan::stream_node::{NodeBody, NodeBodyDiscriminants};
584 use risingwave_pb::stream_plan::{MergeNode, StreamNode as PbStreamNode};
585
586 use crate::catalog::FragmentId;
587 use crate::handler::explain_analyze_stream_job::ExplainAnalyzeStreamJobOutput;
588 use crate::handler::explain_analyze_stream_job::metrics::OperatorStats;
589 use crate::handler::explain_analyze_stream_job::utils::operator_id_for_dispatch;
590 pub(super) type OperatorId = u64;
591 pub(super) type ExecutorId = u64;
592
593 pub(super) struct StreamNode {
595 operator_id: OperatorId,
596 fragment_id: u32,
597 identity: NodeBodyDiscriminants,
598 actor_ids: HashSet<u32>,
599 dependencies: Vec<u64>,
600 }
601
602 impl Debug for StreamNode {
603 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
604 let (actor_id, operator_id) = unique_operator_id_into_parts(self.operator_id);
605 let operator_id_str = format!(
606 "{}: (actor_id: {}, operator_id: {})",
607 self.operator_id, actor_id, operator_id
608 );
609 write!(
610 f,
611 "StreamNode {{ operator_id: {}, fragment_id: {}, identity: {:?}, actor_ids: {:?}, dependencies: {:?} }}",
612 operator_id_str, self.fragment_id, self.identity, self.actor_ids, self.dependencies
613 )
614 }
615 }
616
617 impl StreamNode {
618 fn new_for_dispatcher(fragment_id: u32) -> Self {
619 StreamNode {
620 operator_id: operator_id_for_dispatch(fragment_id),
621 fragment_id,
622 identity: NodeBodyDiscriminants::Exchange,
623 actor_ids: Default::default(),
624 dependencies: Default::default(),
625 }
626 }
627 }
628
629 pub(super) fn extract_stream_node_infos(
631 fragments: Vec<FragmentInfo>,
632 ) -> (
633 OperatorId,
634 HashSet<FragmentId>,
635 HashMap<OperatorId, StreamNode>,
636 ) {
637 let job_fragment_ids = fragments.iter().map(|f| f.id).collect::<HashSet<_>>();
638
639 fn find_root_nodes(stream_nodes: &HashMap<u64, StreamNode>) -> HashSet<u64> {
641 let mut all_nodes = stream_nodes.keys().copied().collect::<HashSet<_>>();
642 for node in stream_nodes.values() {
643 for dependency in &node.dependencies {
644 all_nodes.remove(dependency);
645 }
646 }
647 all_nodes
648 }
649
650 fn extract_stream_node_info(
653 fragment_id: u32,
654 fragment_id_to_merge_operator_id: &mut HashMap<u32, OperatorId>,
655 operator_id_to_stream_node: &mut HashMap<OperatorId, StreamNode>,
656 node: &PbStreamNode,
657 actor_ids: &HashSet<u32>,
658 ) {
659 let identity = node
660 .node_body
661 .as_ref()
662 .expect("should have node body")
663 .into();
664 let operator_id = unique_operator_id(fragment_id, node.operator_id);
665 if let Some(merge_node) = node.node_body.as_ref()
666 && let NodeBody::Merge(box MergeNode {
667 upstream_fragment_id,
668 ..
669 }) = merge_node
670 {
671 fragment_id_to_merge_operator_id.insert(*upstream_fragment_id, operator_id);
672 }
673 let dependencies = &node.input;
674 let dependency_ids = dependencies
675 .iter()
676 .map(|input| unique_operator_id(fragment_id, input.operator_id))
677 .collect::<Vec<_>>();
678 operator_id_to_stream_node.insert(
679 operator_id,
680 StreamNode {
681 operator_id,
682 fragment_id,
683 identity,
684 actor_ids: actor_ids.clone(),
685 dependencies: dependency_ids,
686 },
687 );
688 for dependency in dependencies {
689 extract_stream_node_info(
690 fragment_id,
691 fragment_id_to_merge_operator_id,
692 operator_id_to_stream_node,
693 dependency,
694 actor_ids,
695 );
696 }
697 }
698
699 let mut operator_id_to_stream_node = HashMap::new();
702 let mut fragment_id_to_merge_operator_id = HashMap::new();
703 for fragment in fragments {
704 let actors = fragment.actors;
705 assert!(
706 !actors.is_empty(),
707 "fragment {} should have at least one actor",
708 fragment.id
709 );
710 let actor_ids = actors.iter().map(|actor| actor.id).collect::<HashSet<_>>();
711 let node = actors[0].node.as_ref().expect("should have stream node");
712 extract_stream_node_info(
713 fragment.id,
714 &mut fragment_id_to_merge_operator_id,
715 &mut operator_id_to_stream_node,
716 node,
717 &actor_ids,
718 );
719 }
720
721 let root_or_dispatch_nodes = find_root_nodes(&operator_id_to_stream_node);
723 let mut root_node = None;
724 for operator_id in root_or_dispatch_nodes {
725 let node = operator_id_to_stream_node.get_mut(&operator_id).unwrap();
726 let fragment_id = node.fragment_id;
727 if let Some(merge_operator_id) = fragment_id_to_merge_operator_id.get(&fragment_id) {
728 let mut dispatcher = StreamNode::new_for_dispatcher(fragment_id);
729 let operator_id_for_dispatch = dispatcher.operator_id;
730 dispatcher.dependencies.push(operator_id);
731 assert!(
732 operator_id_to_stream_node
733 .insert(operator_id_for_dispatch as _, dispatcher)
734 .is_none()
735 );
736 operator_id_to_stream_node
737 .get_mut(merge_operator_id)
738 .unwrap()
739 .dependencies
740 .push(operator_id_for_dispatch as _)
741 } else {
742 root_node = Some(operator_id);
743 }
744 }
745
746 let mut dispatcher_fragment_ids = HashSet::new();
747 for dispatcher_fragment_id in fragment_id_to_merge_operator_id.keys() {
748 if job_fragment_ids.contains(dispatcher_fragment_id) {
749 dispatcher_fragment_ids.insert(*dispatcher_fragment_id);
750 }
751 }
752
753 (
754 root_node.unwrap(),
755 dispatcher_fragment_ids,
756 operator_id_to_stream_node,
757 )
758 }
759
760 pub(super) fn extract_executor_infos(
761 adjacency_list: &HashMap<OperatorId, StreamNode>,
762 ) -> (HashSet<u64>, HashMap<u64, HashSet<u64>>) {
763 let mut executor_ids = HashSet::new();
764 let mut operator_to_executor = HashMap::new();
765 for (operator_id, node) in adjacency_list {
766 assert_eq!(*operator_id, node.operator_id);
767 let operator_id = node.operator_id;
768 for actor_id in &node.actor_ids {
769 let executor_id =
770 unique_executor_id_from_unique_operator_id(*actor_id, operator_id);
771 if node.identity != NodeBodyDiscriminants::BatchPlan
772 && node.identity != NodeBodyDiscriminants::Merge
774 && node.identity != NodeBodyDiscriminants::Project
775 {
776 assert!(executor_ids.insert(executor_id));
777 }
778 assert!(
779 operator_to_executor
780 .entry(operator_id)
781 .or_insert_with(HashSet::new)
782 .insert(executor_id)
783 );
784 }
785 }
786 (executor_ids, operator_to_executor)
787 }
788
789 pub(super) fn render_graph_with_metrics(
794 adjacency_list: &HashMap<u64, StreamNode>,
795 root_node: u64,
796 stats: &OperatorStats,
797 profiling_duration: &Duration,
798 ) -> Vec<ExplainAnalyzeStreamJobOutput> {
799 let profiling_duration_secs = profiling_duration.as_secs_f64();
800 let mut rows = vec![];
801 let mut stack = vec![(String::new(), true, root_node)];
802 while let Some((prefix, last_child, node_id)) = stack.pop() {
803 let Some(node) = adjacency_list.get(&node_id) else {
804 continue;
805 };
806 let is_root = node_id == root_node;
807
808 let identity_rendered = if is_root {
809 node.identity.to_string()
810 } else {
811 let connector = if last_child { "└─ " } else { "├─ " };
812 format!("{}{}{}", prefix, connector, node.identity)
813 };
814
815 let child_prefix = if is_root {
816 ""
817 } else if last_child {
818 " "
819 } else {
820 "│ "
821 };
822 let child_prefix = format!("{}{}", prefix, child_prefix);
823
824 let stats = stats.get(&node_id);
825 let (output_rows_per_second, downstream_backpressure_ratio) = match stats {
826 Some(stats) => (
827 Some(
828 (stats.total_output_throughput as f64 / profiling_duration_secs)
829 .to_string(),
830 ),
831 Some(
832 (Duration::from_nanos(stats.total_output_pending_ns).as_secs_f64()
833 / usize::max(node.actor_ids.len(), 1) as f64
834 / profiling_duration_secs)
835 .to_string(),
836 ),
837 ),
838 None => (None, None),
839 };
840 let row = ExplainAnalyzeStreamJobOutput {
841 identity: identity_rendered,
842 actor_ids: node
843 .actor_ids
844 .iter()
845 .sorted()
846 .map(|id| id.to_string())
847 .collect::<Vec<_>>()
848 .join(","),
849 output_rows_per_second,
850 downstream_backpressure_ratio,
851 };
852 rows.push(row);
853 for (position, dependency) in node.dependencies.iter().enumerate() {
854 stack.push((child_prefix.clone(), position == 0, *dependency));
855 }
856 }
857 rows
858 }
859}
860
861mod utils {
862 use risingwave_common::operator::unique_operator_id;
863
864 use crate::handler::explain_analyze_stream_job::graph::OperatorId;
865
866 pub(super) fn operator_id_for_dispatch(fragment_id: u32) -> OperatorId {
867 unique_operator_id(fragment_id, u32::MAX as u64)
868 }
869}