1use std::collections::HashMap;
16
17use pgwire::pg_response::StatementType;
18use risingwave_common::types::Fields;
19use risingwave_sqlparser::ast::AnalyzeTarget;
20use tokio::time::Duration;
21
22use crate::error::Result;
23use crate::handler::explain_analyze_stream_job::graph::{
24 extract_executor_infos, extract_stream_node_infos, render_graph_with_metrics,
25};
26use crate::handler::{HandlerArgs, RwPgResponse, RwPgResponseBuilder, RwPgResponseBuilderExt};
27
28#[macro_export]
29macro_rules! debug_panic_or_warn {
30 ($($arg:tt)*) => {
31 if cfg!(debug_assertions) || cfg!(madsim) {
32 panic!($($arg)*);
33 } else {
34 tracing::warn!($($arg)*);
35 }
36 };
37}
38
39#[derive(Fields)]
40struct ExplainAnalyzeStreamJobOutput {
41 identity: String,
42 actor_ids: String,
43 output_rows_per_second: Option<String>,
44 downstream_backpressure_ratio: Option<String>,
45}
46
47pub async fn handle_explain_analyze_stream_job(
48 handler_args: HandlerArgs,
49 target: AnalyzeTarget,
50 duration_secs: Option<u64>,
51) -> Result<RwPgResponse> {
52 let profiling_duration = Duration::from_secs(duration_secs.unwrap_or(10));
53 let job_id = bind::bind_relation(&target, &handler_args)?;
54
55 let meta_client = handler_args.session.env().meta_client();
56 let fragments = net::get_fragments(meta_client, job_id.into()).await?;
57 let fragment_parallelisms = fragments
58 .iter()
59 .map(|f| (f.id, f.actors.len()))
60 .collect::<HashMap<_, _>>();
61 let (root_node, dispatcher_fragment_ids, adjacency_list) = extract_stream_node_infos(fragments);
62 let (executor_ids, operator_to_executor) = extract_executor_infos(&adjacency_list);
63 tracing::debug!(
64 ?fragment_parallelisms,
65 ?root_node,
66 ?dispatcher_fragment_ids,
67 ?adjacency_list,
68 "explain analyze metadata"
69 );
70
71 let worker_nodes = net::list_stream_worker_nodes(handler_args.session.env()).await?;
72
73 let executor_stats = net::get_executor_stats(
74 &handler_args,
75 &worker_nodes,
76 &executor_ids,
77 &dispatcher_fragment_ids,
78 profiling_duration,
79 )
80 .await?;
81 tracing::debug!(?executor_stats, "collected executor stats");
82 let aggregated_stats = metrics::OperatorStats::aggregate(
83 operator_to_executor,
84 &executor_stats,
85 &fragment_parallelisms,
86 );
87 tracing::debug!(?aggregated_stats, "collected aggregated stats");
88
89 let rows = render_graph_with_metrics(
91 &adjacency_list,
92 root_node,
93 &aggregated_stats,
94 &profiling_duration,
95 );
96 let builder = RwPgResponseBuilder::empty(StatementType::EXPLAIN);
97 let builder = builder.rows(rows);
98 Ok(builder.into())
99}
100
101mod bind {
104 use risingwave_sqlparser::ast::AnalyzeTarget;
105
106 use crate::Binder;
107 use crate::catalog::root_catalog::SchemaPath;
108 use crate::error::Result;
109 use crate::handler::HandlerArgs;
110
111 pub(super) fn bind_relation(
113 target_relation: &AnalyzeTarget,
114 handler_args: &HandlerArgs,
115 ) -> Result<u32> {
116 let job_id = match &target_relation {
117 AnalyzeTarget::Id(id) => *id,
118 AnalyzeTarget::Index(name)
119 | AnalyzeTarget::Table(name)
120 | AnalyzeTarget::Sink(name)
121 | AnalyzeTarget::MaterializedView(name) => {
122 let session = &handler_args.session;
123 let db_name = session.database();
124 let (schema_name, name) = Binder::resolve_schema_qualified_name(&db_name, name)?;
125 let search_path = session.config().search_path();
126 let user_name = &session.user_name();
127 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
128
129 let catalog_reader = handler_args.session.env().catalog_reader();
130 let catalog = catalog_reader.read_guard();
131
132 match target_relation {
133 AnalyzeTarget::Index(_) => {
134 let (catalog, _schema_name) =
135 catalog.get_any_index_by_name(&db_name, schema_path, &name)?;
136 catalog.id.index_id
137 }
138 AnalyzeTarget::Table(_) => {
139 let (catalog, _schema_name) =
140 catalog.get_any_table_by_name(&db_name, schema_path, &name)?;
141 catalog.id.as_raw_id()
142 }
143 AnalyzeTarget::Sink(_) => {
144 let (catalog, _schema_name) =
145 catalog.get_any_sink_by_name(&db_name, schema_path, &name)?;
146 catalog.id.sink_id
147 }
148 AnalyzeTarget::MaterializedView(_) => {
149 let (catalog, _schema_name) =
150 catalog.get_any_table_by_name(&db_name, schema_path, &name)?;
151 catalog.id.as_raw_id()
152 }
153 AnalyzeTarget::Id(_) => unreachable!(),
154 }
155 }
156 };
157 Ok(job_id)
158 }
159}
160
161mod net {
163 use std::collections::HashSet;
164
165 use risingwave_pb::common::WorkerNode;
166 use risingwave_pb::id::JobId;
167 use risingwave_pb::meta::list_table_fragments_response::FragmentInfo;
168 use risingwave_pb::monitor_service::GetProfileStatsRequest;
169 use tokio::time::{Duration, sleep};
170
171 use crate::error::Result;
172 use crate::handler::HandlerArgs;
173 use crate::handler::explain_analyze_stream_job::graph::ExecutorId;
174 use crate::handler::explain_analyze_stream_job::metrics::ExecutorStats;
175 use crate::meta_client::FrontendMetaClient;
176 use crate::session::FrontendEnv;
177
178 pub(super) async fn list_stream_worker_nodes(env: &FrontendEnv) -> Result<Vec<WorkerNode>> {
179 let worker_nodes = env.meta_client().list_all_nodes().await?;
180 let stream_worker_nodes = worker_nodes
181 .into_iter()
182 .filter(|node| {
183 node.property
184 .as_ref()
185 .map(|p| p.is_streaming)
186 .unwrap_or_else(|| false)
187 })
188 .collect::<Vec<_>>();
189 Ok(stream_worker_nodes)
190 }
191
192 pub(super) async fn get_fragments(
194 meta_client: &dyn FrontendMetaClient,
195 job_id: JobId,
196 ) -> Result<Vec<FragmentInfo>> {
197 let mut fragment_map = meta_client.list_table_fragments(&[job_id]).await?;
198 assert_eq!(fragment_map.len(), 1, "expected only one fragment");
199 let (fragment_job_id, table_fragment_info) = fragment_map.drain().next().unwrap();
200 assert_eq!(fragment_job_id, job_id);
201 Ok(table_fragment_info.fragments)
202 }
203
204 pub(super) async fn get_executor_stats(
205 handler_args: &HandlerArgs,
206 worker_nodes: &[WorkerNode],
207 executor_ids: &HashSet<ExecutorId>,
208 dispatcher_fragment_ids: &HashSet<u32>,
209 profiling_duration: Duration,
210 ) -> Result<ExecutorStats> {
211 let dispatcher_fragment_ids = dispatcher_fragment_ids.iter().copied().collect::<Vec<_>>();
212 let mut initial_aggregated_stats = ExecutorStats::new();
213 for node in worker_nodes {
214 let mut compute_client = handler_args.session.env().client_pool().get(node).await?;
215 let stats = compute_client
216 .monitor_client
217 .get_profile_stats(GetProfileStatsRequest {
218 executor_ids: executor_ids.iter().copied().collect(),
219 dispatcher_fragment_ids: dispatcher_fragment_ids.clone(),
220 })
221 .await
222 .expect("get profiling stats failed");
223 initial_aggregated_stats.record(
224 executor_ids,
225 &dispatcher_fragment_ids,
226 &stats.into_inner(),
227 );
228 }
229 tracing::debug!(?initial_aggregated_stats, "initial aggregated stats");
230
231 sleep(profiling_duration).await;
232
233 let mut final_aggregated_stats = ExecutorStats::new();
234 for node in worker_nodes {
235 let mut compute_client = handler_args.session.env().client_pool().get(node).await?;
236 let stats = compute_client
237 .monitor_client
238 .get_profile_stats(GetProfileStatsRequest {
239 executor_ids: executor_ids.iter().copied().collect(),
240 dispatcher_fragment_ids: dispatcher_fragment_ids.clone(),
241 })
242 .await
243 .expect("get profiling stats failed");
244 final_aggregated_stats.record(
245 executor_ids,
246 &dispatcher_fragment_ids,
247 &stats.into_inner(),
248 );
249 }
250 tracing::debug!(?final_aggregated_stats, "final aggregated stats");
251
252 let delta_aggregated_stats = ExecutorStats::get_delta(
253 &initial_aggregated_stats,
254 &final_aggregated_stats,
255 executor_ids,
256 &dispatcher_fragment_ids,
257 );
258
259 Ok(delta_aggregated_stats)
260 }
261}
262
263mod metrics {
268 use std::collections::{HashMap, HashSet};
269
270 use risingwave_common::operator::unique_executor_id_into_parts;
271 use risingwave_pb::monitor_service::GetProfileStatsResponse;
272
273 use crate::catalog::FragmentId;
274 use crate::handler::explain_analyze_stream_job::graph::{ExecutorId, OperatorId};
275 use crate::handler::explain_analyze_stream_job::utils::operator_id_for_dispatch;
276
277 #[expect(dead_code)]
278 #[derive(Default, Debug)]
279 pub(super) struct ExecutorMetrics {
280 pub executor_id: ExecutorId,
281 pub epoch: u32,
282 pub total_output_throughput: u64,
283 pub total_output_pending_ns: u64,
284 }
285
286 #[derive(Default, Debug)]
287 pub(super) struct DispatchMetrics {
288 pub fragment_id: FragmentId,
289 pub epoch: u32,
290 pub total_output_throughput: u64,
291 pub total_output_pending_ns: u64,
292 }
293
294 #[derive(Debug)]
295 pub(super) struct ExecutorStats {
296 executor_stats: HashMap<ExecutorId, ExecutorMetrics>,
297 dispatch_stats: HashMap<FragmentId, DispatchMetrics>,
298 }
299
300 impl ExecutorStats {
301 pub(super) fn new() -> Self {
302 ExecutorStats {
303 executor_stats: HashMap::new(),
304 dispatch_stats: HashMap::new(),
305 }
306 }
307
308 pub fn get(&self, executor_id: &ExecutorId) -> Option<&ExecutorMetrics> {
309 self.executor_stats.get(executor_id)
310 }
311
312 pub(super) fn record<'a>(
314 &mut self,
315 executor_ids: &'a HashSet<ExecutorId>,
316 dispatch_fragment_ids: &'a [FragmentId],
317 metrics: &'a GetProfileStatsResponse,
318 ) {
319 for executor_id in executor_ids {
320 let Some(total_output_throughput) =
321 metrics.stream_node_output_row_count.get(executor_id)
322 else {
323 continue;
324 };
325 let Some(total_output_pending_ns) = metrics
326 .stream_node_output_blocking_duration_ns
327 .get(executor_id)
328 else {
329 continue;
330 };
331 let stats = ExecutorMetrics {
332 executor_id: *executor_id,
333 epoch: 0,
334 total_output_throughput: *total_output_throughput,
335 total_output_pending_ns: *total_output_pending_ns,
336 };
337 if cfg!(madsim) {
340 self.executor_stats.insert(*executor_id, stats);
345 } else {
346 assert!(self.executor_stats.insert(*executor_id, stats).is_none());
347 }
348 }
349
350 for fragment_id in dispatch_fragment_ids {
351 let Some(total_output_throughput) =
352 metrics.dispatch_fragment_output_row_count.get(fragment_id)
353 else {
354 continue;
355 };
356 let Some(total_output_pending_ns) = metrics
357 .dispatch_fragment_output_blocking_duration_ns
358 .get(fragment_id)
359 else {
360 continue;
361 };
362 let stats = self.dispatch_stats.entry(*fragment_id).or_default();
363 stats.fragment_id = *fragment_id;
364 stats.epoch = 0;
365 stats.total_output_throughput += *total_output_throughput;
369 stats.total_output_pending_ns += *total_output_pending_ns;
370 }
371 }
372
373 pub(super) fn get_delta(
374 initial: &Self,
375 end: &Self,
376 executor_ids: &HashSet<ExecutorId>,
377 dispatch_fragment_ids: &[FragmentId],
378 ) -> Self {
379 let mut delta_aggregated_stats = Self::new();
380 for executor_id in executor_ids {
381 let (actor_id, operator_id) = unique_executor_id_into_parts(*executor_id);
382 let Some(initial_stats) = initial.executor_stats.get(executor_id) else {
383 debug_panic_or_warn!(
384 "missing initial stats for executor {} (actor {} operator {})",
385 executor_id,
386 actor_id,
387 operator_id
388 );
389 continue;
390 };
391 let Some(end_stats) = end.executor_stats.get(executor_id) else {
392 debug_panic_or_warn!(
393 "missing final stats for executor {} (actor {} operator {})",
394 executor_id,
395 actor_id,
396 operator_id
397 );
398 continue;
399 };
400
401 let initial_throughput = initial_stats.total_output_throughput;
402 let end_throughput = end_stats.total_output_throughput;
403 let Some(delta_throughput) = end_throughput.checked_sub(initial_throughput) else {
404 debug_panic_or_warn!(
405 "delta throughput is negative for actor {} operator {} (initial: {}, end: {})",
406 actor_id,
407 operator_id,
408 initial_throughput,
409 end_throughput
410 );
411 continue;
412 };
413
414 let initial_pending_ns = initial_stats.total_output_pending_ns;
415 let end_pending_ns = end_stats.total_output_pending_ns;
416 let Some(delta_pending_ns) = end_pending_ns.checked_sub(initial_pending_ns) else {
417 debug_panic_or_warn!(
418 "delta pending ns is negative for actor {} operator {} (initial: {}, end: {})",
419 actor_id,
420 operator_id,
421 initial_pending_ns,
422 end_pending_ns
423 );
424 continue;
425 };
426
427 let delta_stats = ExecutorMetrics {
428 executor_id: *executor_id,
429 epoch: 0,
430 total_output_throughput: delta_throughput,
431 total_output_pending_ns: delta_pending_ns,
432 };
433 delta_aggregated_stats
434 .executor_stats
435 .insert(*executor_id, delta_stats);
436 }
437
438 for fragment_id in dispatch_fragment_ids {
439 let Some(initial_stats) = initial.dispatch_stats.get(fragment_id) else {
440 debug_panic_or_warn!("missing initial stats for fragment {}", fragment_id);
441 continue;
442 };
443 let Some(end_stats) = end.dispatch_stats.get(fragment_id) else {
444 debug_panic_or_warn!("missing final stats for fragment {}", fragment_id);
445 continue;
446 };
447
448 let initial_throughput = initial_stats.total_output_throughput;
449 let end_throughput = end_stats.total_output_throughput;
450 let Some(delta_throughput) = end_throughput.checked_sub(initial_throughput) else {
451 debug_panic_or_warn!(
452 "delta throughput is negative for fragment {} (initial: {}, end: {})",
453 fragment_id,
454 initial_throughput,
455 end_throughput
456 );
457 continue;
458 };
459
460 let initial_pending_ns = initial_stats.total_output_pending_ns;
461 let end_pending_ns = end_stats.total_output_pending_ns;
462 let Some(delta_pending_ns) = end_pending_ns.checked_sub(initial_pending_ns) else {
463 debug_panic_or_warn!(
464 "delta pending ns is negative for fragment {} (initial: {}, end: {})",
465 fragment_id,
466 initial_pending_ns,
467 end_pending_ns
468 );
469 continue;
470 };
471
472 let delta_stats = DispatchMetrics {
473 fragment_id: *fragment_id,
474 epoch: 0,
475 total_output_throughput: delta_throughput,
476 total_output_pending_ns: delta_pending_ns,
477 };
478 delta_aggregated_stats
479 .dispatch_stats
480 .insert(*fragment_id, delta_stats);
481 }
482
483 delta_aggregated_stats
484 }
485 }
486
487 #[expect(dead_code)]
488 #[derive(Debug)]
489 pub(super) struct OperatorMetrics {
490 pub operator_id: OperatorId,
491 pub epoch: u32,
492 pub total_output_throughput: u64,
493 pub total_output_pending_ns: u64,
494 }
495
496 #[derive(Debug)]
497 pub(super) struct OperatorStats {
498 inner: HashMap<OperatorId, OperatorMetrics>,
499 }
500
501 impl OperatorStats {
502 pub(super) fn aggregate(
504 operator_map: HashMap<OperatorId, HashSet<ExecutorId>>,
505 executor_stats: &ExecutorStats,
506 fragment_parallelisms: &HashMap<FragmentId, usize>,
507 ) -> Self {
508 let mut operator_stats = HashMap::new();
509 'operator_loop: for (operator_id, executor_ids) in operator_map {
510 let num_executors = executor_ids.len() as u64;
511 let mut total_output_throughput = 0;
512 let mut total_output_pending_ns = 0;
513 for executor_id in executor_ids {
514 if let Some(stats) = executor_stats.get(&executor_id) {
515 total_output_throughput += stats.total_output_throughput;
516 total_output_pending_ns += stats.total_output_pending_ns;
517 } else {
518 continue 'operator_loop;
520 }
521 }
522 let total_output_throughput = total_output_throughput;
523 let total_output_pending_ns = total_output_pending_ns / num_executors;
524
525 operator_stats.insert(
526 operator_id,
527 OperatorMetrics {
528 operator_id,
529 epoch: 0,
530 total_output_throughput,
531 total_output_pending_ns,
532 },
533 );
534 }
535
536 for (fragment_id, dispatch_metrics) in &executor_stats.dispatch_stats {
537 let operator_id = operator_id_for_dispatch(*fragment_id);
538 let total_output_throughput = dispatch_metrics.total_output_throughput;
539 let Some(fragment_parallelism) = fragment_parallelisms.get(fragment_id) else {
540 debug_panic_or_warn!(
541 "missing fragment parallelism for fragment {}",
542 fragment_id
543 );
544 continue;
545 };
546 let total_output_pending_ns =
547 dispatch_metrics.total_output_pending_ns / *fragment_parallelism as u64;
548
549 operator_stats.insert(
550 operator_id,
551 OperatorMetrics {
552 operator_id,
553 epoch: 0,
554 total_output_throughput,
555 total_output_pending_ns,
556 },
557 );
558 }
559
560 OperatorStats {
561 inner: operator_stats,
562 }
563 }
564
565 pub fn get(&self, operator_id: &OperatorId) -> Option<&OperatorMetrics> {
566 self.inner.get(operator_id)
567 }
568 }
569}
570
571mod graph {
574 use std::collections::{HashMap, HashSet};
575 use std::fmt::Debug;
576 use std::time::Duration;
577
578 use itertools::Itertools;
579 use risingwave_common::operator::{
580 unique_executor_id_from_unique_operator_id, unique_operator_id,
581 unique_operator_id_into_parts,
582 };
583 use risingwave_pb::meta::list_table_fragments_response::FragmentInfo;
584 use risingwave_pb::stream_plan::stream_node::{NodeBody, NodeBodyDiscriminants};
585 use risingwave_pb::stream_plan::{MergeNode, StreamNode as PbStreamNode};
586
587 use crate::catalog::FragmentId;
588 use crate::handler::explain_analyze_stream_job::ExplainAnalyzeStreamJobOutput;
589 use crate::handler::explain_analyze_stream_job::metrics::OperatorStats;
590 use crate::handler::explain_analyze_stream_job::utils::operator_id_for_dispatch;
591 pub(super) type OperatorId = u64;
592 pub(super) type ExecutorId = u64;
593
594 pub(super) struct StreamNode {
596 operator_id: OperatorId,
597 fragment_id: u32,
598 identity: NodeBodyDiscriminants,
599 actor_ids: HashSet<u32>,
600 dependencies: Vec<u64>,
601 }
602
603 impl Debug for StreamNode {
604 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
605 let (actor_id, operator_id) = unique_operator_id_into_parts(self.operator_id);
606 let operator_id_str = format!(
607 "{}: (actor_id: {}, operator_id: {})",
608 self.operator_id, actor_id, operator_id
609 );
610 write!(
611 f,
612 "StreamNode {{ operator_id: {}, fragment_id: {}, identity: {:?}, actor_ids: {:?}, dependencies: {:?} }}",
613 operator_id_str, self.fragment_id, self.identity, self.actor_ids, self.dependencies
614 )
615 }
616 }
617
618 impl StreamNode {
619 fn new_for_dispatcher(fragment_id: u32) -> Self {
620 StreamNode {
621 operator_id: operator_id_for_dispatch(fragment_id),
622 fragment_id,
623 identity: NodeBodyDiscriminants::Exchange,
624 actor_ids: Default::default(),
625 dependencies: Default::default(),
626 }
627 }
628 }
629
630 pub(super) fn extract_stream_node_infos(
632 fragments: Vec<FragmentInfo>,
633 ) -> (
634 OperatorId,
635 HashSet<FragmentId>,
636 HashMap<OperatorId, StreamNode>,
637 ) {
638 let job_fragment_ids = fragments.iter().map(|f| f.id).collect::<HashSet<_>>();
639
640 fn find_root_nodes(stream_nodes: &HashMap<u64, StreamNode>) -> HashSet<u64> {
642 let mut all_nodes = stream_nodes.keys().copied().collect::<HashSet<_>>();
643 for node in stream_nodes.values() {
644 for dependency in &node.dependencies {
645 all_nodes.remove(dependency);
646 }
647 }
648 all_nodes
649 }
650
651 fn extract_stream_node_info(
654 fragment_id: u32,
655 fragment_id_to_merge_operator_id: &mut HashMap<u32, OperatorId>,
656 operator_id_to_stream_node: &mut HashMap<OperatorId, StreamNode>,
657 node: &PbStreamNode,
658 actor_ids: &HashSet<u32>,
659 ) {
660 let identity = node
661 .node_body
662 .as_ref()
663 .expect("should have node body")
664 .into();
665 let operator_id = unique_operator_id(fragment_id, node.operator_id);
666 if let Some(merge_node) = node.node_body.as_ref()
667 && let NodeBody::Merge(box MergeNode {
668 upstream_fragment_id,
669 ..
670 }) = merge_node
671 {
672 fragment_id_to_merge_operator_id.insert(*upstream_fragment_id, operator_id);
673 }
674 let dependencies = &node.input;
675 let dependency_ids = dependencies
676 .iter()
677 .map(|input| unique_operator_id(fragment_id, input.operator_id))
678 .collect::<Vec<_>>();
679 operator_id_to_stream_node.insert(
680 operator_id,
681 StreamNode {
682 operator_id,
683 fragment_id,
684 identity,
685 actor_ids: actor_ids.clone(),
686 dependencies: dependency_ids,
687 },
688 );
689 for dependency in dependencies {
690 extract_stream_node_info(
691 fragment_id,
692 fragment_id_to_merge_operator_id,
693 operator_id_to_stream_node,
694 dependency,
695 actor_ids,
696 );
697 }
698 }
699
700 let mut operator_id_to_stream_node = HashMap::new();
703 let mut fragment_id_to_merge_operator_id = HashMap::new();
704 for fragment in fragments {
705 let actors = fragment.actors;
706 assert!(
707 !actors.is_empty(),
708 "fragment {} should have at least one actor",
709 fragment.id
710 );
711 let actor_ids = actors.iter().map(|actor| actor.id).collect::<HashSet<_>>();
712 let node = actors[0].node.as_ref().expect("should have stream node");
713 extract_stream_node_info(
714 fragment.id,
715 &mut fragment_id_to_merge_operator_id,
716 &mut operator_id_to_stream_node,
717 node,
718 &actor_ids,
719 );
720 }
721
722 let root_or_dispatch_nodes = find_root_nodes(&operator_id_to_stream_node);
724 let mut root_node = None;
725 for operator_id in root_or_dispatch_nodes {
726 let node = operator_id_to_stream_node.get_mut(&operator_id).unwrap();
727 let fragment_id = node.fragment_id;
728 if let Some(merge_operator_id) = fragment_id_to_merge_operator_id.get(&fragment_id) {
729 let mut dispatcher = StreamNode::new_for_dispatcher(fragment_id);
730 let operator_id_for_dispatch = dispatcher.operator_id;
731 dispatcher.dependencies.push(operator_id);
732 assert!(
733 operator_id_to_stream_node
734 .insert(operator_id_for_dispatch as _, dispatcher)
735 .is_none()
736 );
737 operator_id_to_stream_node
738 .get_mut(merge_operator_id)
739 .unwrap()
740 .dependencies
741 .push(operator_id_for_dispatch as _)
742 } else {
743 root_node = Some(operator_id);
744 }
745 }
746
747 let mut dispatcher_fragment_ids = HashSet::new();
748 for dispatcher_fragment_id in fragment_id_to_merge_operator_id.keys() {
749 if job_fragment_ids.contains(dispatcher_fragment_id) {
750 dispatcher_fragment_ids.insert(*dispatcher_fragment_id);
751 }
752 }
753
754 (
755 root_node.unwrap(),
756 dispatcher_fragment_ids,
757 operator_id_to_stream_node,
758 )
759 }
760
761 pub(super) fn extract_executor_infos(
762 adjacency_list: &HashMap<OperatorId, StreamNode>,
763 ) -> (HashSet<u64>, HashMap<u64, HashSet<u64>>) {
764 let mut executor_ids = HashSet::new();
765 let mut operator_to_executor = HashMap::new();
766 for (operator_id, node) in adjacency_list {
767 assert_eq!(*operator_id, node.operator_id);
768 let operator_id = node.operator_id;
769 for actor_id in &node.actor_ids {
770 let executor_id =
771 unique_executor_id_from_unique_operator_id(*actor_id, operator_id);
772 if node.identity != NodeBodyDiscriminants::BatchPlan
773 && node.identity != NodeBodyDiscriminants::Merge
775 && node.identity != NodeBodyDiscriminants::Project
776 {
777 assert!(executor_ids.insert(executor_id));
778 }
779 assert!(
780 operator_to_executor
781 .entry(operator_id)
782 .or_insert_with(HashSet::new)
783 .insert(executor_id)
784 );
785 }
786 }
787 (executor_ids, operator_to_executor)
788 }
789
790 pub(super) fn render_graph_with_metrics(
795 adjacency_list: &HashMap<u64, StreamNode>,
796 root_node: u64,
797 stats: &OperatorStats,
798 profiling_duration: &Duration,
799 ) -> Vec<ExplainAnalyzeStreamJobOutput> {
800 let profiling_duration_secs = profiling_duration.as_secs_f64();
801 let mut rows = vec![];
802 let mut stack = vec![(String::new(), true, root_node)];
803 while let Some((prefix, last_child, node_id)) = stack.pop() {
804 let Some(node) = adjacency_list.get(&node_id) else {
805 continue;
806 };
807 let is_root = node_id == root_node;
808
809 let identity_rendered = if is_root {
810 node.identity.to_string()
811 } else {
812 let connector = if last_child { "└─ " } else { "├─ " };
813 format!("{}{}{}", prefix, connector, node.identity)
814 };
815
816 let child_prefix = if is_root {
817 ""
818 } else if last_child {
819 " "
820 } else {
821 "│ "
822 };
823 let child_prefix = format!("{}{}", prefix, child_prefix);
824
825 let stats = stats.get(&node_id);
826 let (output_rows_per_second, downstream_backpressure_ratio) = match stats {
827 Some(stats) => (
828 Some(
829 (stats.total_output_throughput as f64 / profiling_duration_secs)
830 .to_string(),
831 ),
832 Some(
833 (Duration::from_nanos(stats.total_output_pending_ns).as_secs_f64()
834 / usize::max(node.actor_ids.len(), 1) as f64
835 / profiling_duration_secs)
836 .to_string(),
837 ),
838 ),
839 None => (None, None),
840 };
841 let row = ExplainAnalyzeStreamJobOutput {
842 identity: identity_rendered,
843 actor_ids: node
844 .actor_ids
845 .iter()
846 .sorted()
847 .map(|id| id.to_string())
848 .collect::<Vec<_>>()
849 .join(","),
850 output_rows_per_second,
851 downstream_backpressure_ratio,
852 };
853 rows.push(row);
854 for (position, dependency) in node.dependencies.iter().enumerate() {
855 stack.push((child_prefix.clone(), position == 0, *dependency));
856 }
857 }
858 rows
859 }
860}
861
862mod utils {
863 use risingwave_common::operator::unique_operator_id;
864
865 use crate::handler::explain_analyze_stream_job::graph::OperatorId;
866
867 pub(super) fn operator_id_for_dispatch(fragment_id: u32) -> OperatorId {
868 unique_operator_id(fragment_id, u32::MAX as u64)
869 }
870}