1use std::collections::HashMap;
16
17use pgwire::pg_response::StatementType;
18use risingwave_common::types::Fields;
19use risingwave_sqlparser::ast::AnalyzeTarget;
20use tokio::time::Duration;
21
22use crate::error::Result;
23use crate::handler::explain_analyze_stream_job::graph::{
24 extract_executor_infos, extract_stream_node_infos, render_graph_with_metrics,
25};
26use crate::handler::{HandlerArgs, RwPgResponse, RwPgResponseBuilder, RwPgResponseBuilderExt};
27
28#[macro_export]
29macro_rules! debug_panic_or_warn {
30 ($($arg:tt)*) => {
31 if cfg!(debug_assertions) || cfg!(madsim) {
32 panic!($($arg)*);
33 } else {
34 tracing::warn!($($arg)*);
35 }
36 };
37}
38
39#[derive(Fields)]
40struct ExplainAnalyzeStreamJobOutput {
41 identity: String,
42 actor_ids: String,
43 output_rows_per_second: Option<String>,
44 downstream_backpressure_ratio: Option<String>,
45}
46
47pub async fn handle_explain_analyze_stream_job(
48 handler_args: HandlerArgs,
49 target: AnalyzeTarget,
50 duration_secs: Option<u64>,
51) -> Result<RwPgResponse> {
52 let profiling_duration = Duration::from_secs(duration_secs.unwrap_or(10));
53 let job_id = bind::bind_relation(&target, &handler_args)?;
54
55 let meta_client = handler_args.session.env().meta_client();
56 let fragments = net::get_fragments(meta_client, job_id).await?;
57 let fragment_parallelisms = fragments
58 .iter()
59 .map(|f| (f.id, f.actors.len()))
60 .collect::<HashMap<_, _>>();
61 let (root_node, dispatcher_fragment_ids, adjacency_list) = extract_stream_node_infos(fragments);
62 let (executor_ids, operator_to_executor) = extract_executor_infos(&adjacency_list);
63 tracing::debug!(
64 ?fragment_parallelisms,
65 ?root_node,
66 ?dispatcher_fragment_ids,
67 ?adjacency_list,
68 "explain analyze metadata"
69 );
70
71 let worker_nodes = net::list_stream_worker_nodes(handler_args.session.env()).await?;
72
73 let executor_stats = net::get_executor_stats(
74 &handler_args,
75 &worker_nodes,
76 &executor_ids,
77 &dispatcher_fragment_ids,
78 profiling_duration,
79 )
80 .await?;
81 tracing::debug!(?executor_stats, "collected executor stats");
82 let aggregated_stats = metrics::OperatorStats::aggregate(
83 operator_to_executor,
84 &executor_stats,
85 &fragment_parallelisms,
86 );
87 tracing::debug!(?aggregated_stats, "collected aggregated stats");
88
89 let rows = render_graph_with_metrics(
91 &adjacency_list,
92 root_node,
93 &aggregated_stats,
94 &profiling_duration,
95 );
96 let builder = RwPgResponseBuilder::empty(StatementType::EXPLAIN);
97 let builder = builder.rows(rows);
98 Ok(builder.into())
99}
100
101mod bind {
104 use risingwave_common::id::JobId;
105 use risingwave_sqlparser::ast::AnalyzeTarget;
106
107 use crate::Binder;
108 use crate::catalog::root_catalog::SchemaPath;
109 use crate::error::Result;
110 use crate::handler::HandlerArgs;
111
112 pub(super) fn bind_relation(
114 target_relation: &AnalyzeTarget,
115 handler_args: &HandlerArgs,
116 ) -> Result<JobId> {
117 let job_id = match &target_relation {
118 AnalyzeTarget::Id(id) => (*id).into(),
119 AnalyzeTarget::Index(name)
120 | AnalyzeTarget::Table(name)
121 | AnalyzeTarget::Sink(name)
122 | AnalyzeTarget::MaterializedView(name) => {
123 let session = &handler_args.session;
124 let db_name = session.database();
125 let (schema_name, name) = Binder::resolve_schema_qualified_name(&db_name, name)?;
126 let search_path = session.config().search_path();
127 let user_name = &session.user_name();
128 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
129
130 let catalog_reader = handler_args.session.env().catalog_reader();
131 let catalog = catalog_reader.read_guard();
132
133 match target_relation {
134 AnalyzeTarget::Index(_) => {
135 let (catalog, _schema_name) =
136 catalog.get_any_index_by_name(&db_name, schema_path, &name)?;
137 catalog.id.as_job_id()
138 }
139 AnalyzeTarget::Table(_) => {
140 let (catalog, _schema_name) =
141 catalog.get_any_table_by_name(&db_name, schema_path, &name)?;
142 catalog.id.as_job_id()
143 }
144 AnalyzeTarget::Sink(_) => {
145 let (catalog, _schema_name) =
146 catalog.get_any_sink_by_name(&db_name, schema_path, &name)?;
147 catalog.id.as_job_id()
148 }
149 AnalyzeTarget::MaterializedView(_) => {
150 let (catalog, _schema_name) =
151 catalog.get_any_table_by_name(&db_name, schema_path, &name)?;
152 catalog.id.as_job_id()
153 }
154 AnalyzeTarget::Id(_) => unreachable!(),
155 }
156 }
157 };
158 Ok(job_id)
159 }
160}
161
162mod net {
164 use std::collections::HashSet;
165
166 use risingwave_common::bail;
167 use risingwave_common::id::{FragmentId, JobId};
168 use risingwave_pb::common::WorkerNode;
169 use risingwave_pb::id::ExecutorId;
170 use risingwave_pb::meta::list_table_fragments_response::FragmentInfo;
171 use risingwave_pb::monitor_service::GetProfileStatsRequest;
172 use tokio::time::{Duration, sleep};
173
174 use crate::error::Result;
175 use crate::handler::HandlerArgs;
176 use crate::handler::explain_analyze_stream_job::metrics::ExecutorStats;
177 use crate::meta_client::FrontendMetaClient;
178 use crate::session::FrontendEnv;
179
180 pub(super) async fn list_stream_worker_nodes(env: &FrontendEnv) -> Result<Vec<WorkerNode>> {
181 let worker_nodes = env.meta_client().list_all_nodes().await?;
182 let stream_worker_nodes = worker_nodes
183 .into_iter()
184 .filter(|node| {
185 node.property
186 .as_ref()
187 .map(|p| p.is_streaming)
188 .unwrap_or_else(|| false)
189 })
190 .collect::<Vec<_>>();
191 Ok(stream_worker_nodes)
192 }
193
194 pub(super) async fn get_fragments(
196 meta_client: &dyn FrontendMetaClient,
197 job_id: JobId,
198 ) -> Result<Vec<FragmentInfo>> {
199 let mut fragment_map = meta_client.list_table_fragments(&[job_id]).await?;
200 let mut table_fragments = fragment_map.drain();
201 let Some((fragment_job_id, table_fragment_info)) = table_fragments.next() else {
202 bail!("table fragment of job {job_id} not found");
203 };
204 assert_eq!(
205 table_fragments.next(),
206 None,
207 "expected only at most one fragment"
208 );
209 assert_eq!(fragment_job_id, job_id);
210 Ok(table_fragment_info.fragments)
211 }
212
213 pub(super) async fn get_executor_stats(
214 handler_args: &HandlerArgs,
215 worker_nodes: &[WorkerNode],
216 executor_ids: &HashSet<ExecutorId>,
217 dispatcher_fragment_ids: &HashSet<FragmentId>,
218 profiling_duration: Duration,
219 ) -> Result<ExecutorStats> {
220 let dispatcher_fragment_ids = dispatcher_fragment_ids.iter().copied().collect::<Vec<_>>();
221 let mut initial_aggregated_stats = ExecutorStats::new();
222 for node in worker_nodes {
223 let mut compute_client = handler_args.session.env().client_pool().get(node).await?;
224 let stats = compute_client
225 .monitor_client
226 .get_profile_stats(GetProfileStatsRequest {
227 executor_ids: executor_ids.iter().copied().collect(),
228 dispatcher_fragment_ids: dispatcher_fragment_ids.clone(),
229 })
230 .await
231 .expect("get profiling stats failed");
232 initial_aggregated_stats.record(
233 executor_ids,
234 &dispatcher_fragment_ids,
235 &stats.into_inner(),
236 );
237 }
238 tracing::debug!(?initial_aggregated_stats, "initial aggregated stats");
239
240 sleep(profiling_duration).await;
241
242 let mut final_aggregated_stats = ExecutorStats::new();
243 for node in worker_nodes {
244 let mut compute_client = handler_args.session.env().client_pool().get(node).await?;
245 let stats = compute_client
246 .monitor_client
247 .get_profile_stats(GetProfileStatsRequest {
248 executor_ids: executor_ids.iter().copied().collect(),
249 dispatcher_fragment_ids: dispatcher_fragment_ids.clone(),
250 })
251 .await
252 .expect("get profiling stats failed");
253 final_aggregated_stats.record(
254 executor_ids,
255 &dispatcher_fragment_ids,
256 &stats.into_inner(),
257 );
258 }
259 tracing::debug!(?final_aggregated_stats, "final aggregated stats");
260
261 let delta_aggregated_stats = ExecutorStats::get_delta(
262 &initial_aggregated_stats,
263 &final_aggregated_stats,
264 executor_ids,
265 &dispatcher_fragment_ids,
266 );
267
268 Ok(delta_aggregated_stats)
269 }
270}
271
272mod metrics {
277 use std::collections::{HashMap, HashSet};
278
279 use risingwave_common::operator::unique_executor_id_into_parts;
280 use risingwave_pb::id::{ExecutorId, GlobalOperatorId};
281 use risingwave_pb::monitor_service::GetProfileStatsResponse;
282
283 use crate::catalog::FragmentId;
284 use crate::handler::explain_analyze_stream_job::utils::operator_id_for_dispatch;
285
286 type OperatorId = GlobalOperatorId;
287
288 #[expect(dead_code)]
289 #[derive(Default, Debug)]
290 pub(super) struct ExecutorMetrics {
291 pub executor_id: ExecutorId,
292 pub epoch: u32,
293 pub total_output_throughput: u64,
294 pub total_output_pending_ns: u64,
295 }
296
297 #[derive(Default, Debug)]
298 pub(super) struct DispatchMetrics {
299 pub fragment_id: FragmentId,
300 pub epoch: u32,
301 pub total_output_throughput: u64,
302 pub total_output_pending_ns: u64,
303 }
304
305 #[derive(Debug)]
306 pub(super) struct ExecutorStats {
307 executor_stats: HashMap<ExecutorId, ExecutorMetrics>,
308 dispatch_stats: HashMap<FragmentId, DispatchMetrics>,
309 }
310
311 impl ExecutorStats {
312 pub(super) fn new() -> Self {
313 ExecutorStats {
314 executor_stats: HashMap::new(),
315 dispatch_stats: HashMap::new(),
316 }
317 }
318
319 pub fn get(&self, executor_id: &ExecutorId) -> Option<&ExecutorMetrics> {
320 self.executor_stats.get(executor_id)
321 }
322
323 pub(super) fn record<'a>(
325 &mut self,
326 executor_ids: &'a HashSet<ExecutorId>,
327 dispatch_fragment_ids: &'a [FragmentId],
328 metrics: &'a GetProfileStatsResponse,
329 ) {
330 for executor_id in executor_ids {
331 let Some(total_output_throughput) =
332 metrics.stream_node_output_row_count.get(executor_id)
333 else {
334 continue;
335 };
336 let Some(total_output_pending_ns) = metrics
337 .stream_node_output_blocking_duration_ns
338 .get(executor_id)
339 else {
340 continue;
341 };
342 let stats = ExecutorMetrics {
343 executor_id: *executor_id,
344 epoch: 0,
345 total_output_throughput: *total_output_throughput,
346 total_output_pending_ns: *total_output_pending_ns,
347 };
348 if cfg!(madsim) {
351 self.executor_stats.insert(*executor_id, stats);
356 } else {
357 assert!(self.executor_stats.insert(*executor_id, stats).is_none());
358 }
359 }
360
361 for fragment_id in dispatch_fragment_ids {
362 let Some(total_output_throughput) =
363 metrics.dispatch_fragment_output_row_count.get(fragment_id)
364 else {
365 continue;
366 };
367 let Some(total_output_pending_ns) = metrics
368 .dispatch_fragment_output_blocking_duration_ns
369 .get(fragment_id)
370 else {
371 continue;
372 };
373 let stats = self.dispatch_stats.entry(*fragment_id).or_default();
374 stats.fragment_id = *fragment_id;
375 stats.epoch = 0;
376 stats.total_output_throughput += *total_output_throughput;
380 stats.total_output_pending_ns += *total_output_pending_ns;
381 }
382 }
383
384 pub(super) fn get_delta(
385 initial: &Self,
386 end: &Self,
387 executor_ids: &HashSet<ExecutorId>,
388 dispatch_fragment_ids: &[FragmentId],
389 ) -> Self {
390 let mut delta_aggregated_stats = Self::new();
391 for executor_id in executor_ids {
392 let (actor_id, operator_id) = unique_executor_id_into_parts(*executor_id);
393 let Some(initial_stats) = initial.executor_stats.get(executor_id) else {
394 debug_panic_or_warn!(
395 "missing initial stats for executor {} (actor {} operator {})",
396 executor_id,
397 actor_id,
398 operator_id
399 );
400 continue;
401 };
402 let Some(end_stats) = end.executor_stats.get(executor_id) else {
403 debug_panic_or_warn!(
404 "missing final stats for executor {} (actor {} operator {})",
405 executor_id,
406 actor_id,
407 operator_id
408 );
409 continue;
410 };
411
412 let initial_throughput = initial_stats.total_output_throughput;
413 let end_throughput = end_stats.total_output_throughput;
414 let Some(delta_throughput) = end_throughput.checked_sub(initial_throughput) else {
415 debug_panic_or_warn!(
416 "delta throughput is negative for actor {} operator {} (initial: {}, end: {})",
417 actor_id,
418 operator_id,
419 initial_throughput,
420 end_throughput
421 );
422 continue;
423 };
424
425 let initial_pending_ns = initial_stats.total_output_pending_ns;
426 let end_pending_ns = end_stats.total_output_pending_ns;
427 let Some(delta_pending_ns) = end_pending_ns.checked_sub(initial_pending_ns) else {
428 debug_panic_or_warn!(
429 "delta pending ns is negative for actor {} operator {} (initial: {}, end: {})",
430 actor_id,
431 operator_id,
432 initial_pending_ns,
433 end_pending_ns
434 );
435 continue;
436 };
437
438 let delta_stats = ExecutorMetrics {
439 executor_id: *executor_id,
440 epoch: 0,
441 total_output_throughput: delta_throughput,
442 total_output_pending_ns: delta_pending_ns,
443 };
444 delta_aggregated_stats
445 .executor_stats
446 .insert(*executor_id, delta_stats);
447 }
448
449 for fragment_id in dispatch_fragment_ids {
450 let Some(initial_stats) = initial.dispatch_stats.get(fragment_id) else {
451 debug_panic_or_warn!("missing initial stats for fragment {}", fragment_id);
452 continue;
453 };
454 let Some(end_stats) = end.dispatch_stats.get(fragment_id) else {
455 debug_panic_or_warn!("missing final stats for fragment {}", fragment_id);
456 continue;
457 };
458
459 let initial_throughput = initial_stats.total_output_throughput;
460 let end_throughput = end_stats.total_output_throughput;
461 let Some(delta_throughput) = end_throughput.checked_sub(initial_throughput) else {
462 debug_panic_or_warn!(
463 "delta throughput is negative for fragment {} (initial: {}, end: {})",
464 fragment_id,
465 initial_throughput,
466 end_throughput
467 );
468 continue;
469 };
470
471 let initial_pending_ns = initial_stats.total_output_pending_ns;
472 let end_pending_ns = end_stats.total_output_pending_ns;
473 let Some(delta_pending_ns) = end_pending_ns.checked_sub(initial_pending_ns) else {
474 debug_panic_or_warn!(
475 "delta pending ns is negative for fragment {} (initial: {}, end: {})",
476 fragment_id,
477 initial_pending_ns,
478 end_pending_ns
479 );
480 continue;
481 };
482
483 let delta_stats = DispatchMetrics {
484 fragment_id: *fragment_id,
485 epoch: 0,
486 total_output_throughput: delta_throughput,
487 total_output_pending_ns: delta_pending_ns,
488 };
489 delta_aggregated_stats
490 .dispatch_stats
491 .insert(*fragment_id, delta_stats);
492 }
493
494 delta_aggregated_stats
495 }
496 }
497
498 #[expect(dead_code)]
499 #[derive(Debug)]
500 pub(super) struct OperatorMetrics {
501 pub operator_id: GlobalOperatorId,
502 pub epoch: u32,
503 pub total_output_throughput: u64,
504 pub total_output_pending_ns: u64,
505 }
506
507 #[derive(Debug)]
508 pub(super) struct OperatorStats {
509 inner: HashMap<GlobalOperatorId, OperatorMetrics>,
510 }
511
512 impl OperatorStats {
513 pub(super) fn aggregate(
515 operator_map: HashMap<OperatorId, HashSet<ExecutorId>>,
516 executor_stats: &ExecutorStats,
517 fragment_parallelisms: &HashMap<FragmentId, usize>,
518 ) -> Self {
519 let mut operator_stats = HashMap::new();
520 'operator_loop: for (operator_id, executor_ids) in operator_map {
521 let num_executors = executor_ids.len() as u64;
522 let mut total_output_throughput = 0;
523 let mut total_output_pending_ns = 0;
524 for executor_id in executor_ids {
525 if let Some(stats) = executor_stats.get(&executor_id) {
526 total_output_throughput += stats.total_output_throughput;
527 total_output_pending_ns += stats.total_output_pending_ns;
528 } else {
529 continue 'operator_loop;
531 }
532 }
533 let total_output_throughput = total_output_throughput;
534 let total_output_pending_ns = total_output_pending_ns / num_executors;
535
536 operator_stats.insert(
537 operator_id,
538 OperatorMetrics {
539 operator_id,
540 epoch: 0,
541 total_output_throughput,
542 total_output_pending_ns,
543 },
544 );
545 }
546
547 for (fragment_id, dispatch_metrics) in &executor_stats.dispatch_stats {
548 let operator_id = operator_id_for_dispatch(*fragment_id);
549 let total_output_throughput = dispatch_metrics.total_output_throughput;
550 let Some(fragment_parallelism) = fragment_parallelisms.get(fragment_id) else {
551 debug_panic_or_warn!(
552 "missing fragment parallelism for fragment {}",
553 fragment_id
554 );
555 continue;
556 };
557 let total_output_pending_ns =
558 dispatch_metrics.total_output_pending_ns / *fragment_parallelism as u64;
559
560 operator_stats.insert(
561 operator_id,
562 OperatorMetrics {
563 operator_id,
564 epoch: 0,
565 total_output_throughput,
566 total_output_pending_ns,
567 },
568 );
569 }
570
571 OperatorStats {
572 inner: operator_stats,
573 }
574 }
575
576 pub fn get(&self, operator_id: &OperatorId) -> Option<&OperatorMetrics> {
577 self.inner.get(operator_id)
578 }
579 }
580}
581
582mod graph {
585 use std::collections::{HashMap, HashSet};
586 use std::fmt::Debug;
587 use std::time::Duration;
588
589 use itertools::Itertools;
590 use risingwave_common::operator::{
591 unique_executor_id_from_unique_operator_id, unique_operator_id,
592 unique_operator_id_into_parts,
593 };
594 use risingwave_pb::id::{ActorId, GlobalOperatorId};
595 use risingwave_pb::meta::list_table_fragments_response::FragmentInfo;
596 use risingwave_pb::stream_plan::stream_node::{NodeBody, NodeBodyDiscriminants};
597 use risingwave_pb::stream_plan::{MergeNode, StreamNode as PbStreamNode};
598
599 use crate::catalog::FragmentId;
600 use crate::handler::explain_analyze_stream_job::ExplainAnalyzeStreamJobOutput;
601 use crate::handler::explain_analyze_stream_job::metrics::OperatorStats;
602 use crate::handler::explain_analyze_stream_job::utils::operator_id_for_dispatch;
603
604 pub(super) type OperatorId = GlobalOperatorId;
605 pub(super) use risingwave_pb::id::ExecutorId;
606
607 pub(super) struct StreamNode {
609 operator_id: OperatorId,
610 fragment_id: FragmentId,
611 identity: NodeBodyDiscriminants,
612 actor_ids: HashSet<ActorId>,
613 dependencies: Vec<OperatorId>,
614 }
615
616 impl Debug for StreamNode {
617 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
618 let (actor_id, operator_id) = unique_operator_id_into_parts(self.operator_id);
619 let operator_id_str = format!(
620 "{}: (actor_id: {}, operator_id: {})",
621 self.operator_id, actor_id, operator_id
622 );
623 write!(
624 f,
625 "StreamNode {{ operator_id: {}, fragment_id: {}, identity: {:?}, actor_ids: {:?}, dependencies: {:?} }}",
626 operator_id_str, self.fragment_id, self.identity, self.actor_ids, self.dependencies
627 )
628 }
629 }
630
631 impl StreamNode {
632 fn new_for_dispatcher(fragment_id: FragmentId) -> Self {
633 StreamNode {
634 operator_id: operator_id_for_dispatch(fragment_id),
635 fragment_id,
636 identity: NodeBodyDiscriminants::Exchange,
637 actor_ids: Default::default(),
638 dependencies: Default::default(),
639 }
640 }
641 }
642
643 pub(super) fn extract_stream_node_infos(
645 fragments: Vec<FragmentInfo>,
646 ) -> (
647 OperatorId,
648 HashSet<FragmentId>,
649 HashMap<OperatorId, StreamNode>,
650 ) {
651 let job_fragment_ids = fragments
652 .iter()
653 .map(|f| f.id)
654 .collect::<HashSet<FragmentId>>();
655
656 fn find_root_nodes(stream_nodes: &HashMap<OperatorId, StreamNode>) -> HashSet<OperatorId> {
658 let mut all_nodes = stream_nodes.keys().copied().collect::<HashSet<_>>();
659 for node in stream_nodes.values() {
660 for dependency in &node.dependencies {
661 all_nodes.remove(dependency);
662 }
663 }
664 all_nodes
665 }
666
667 fn extract_stream_node_info(
670 fragment_id: FragmentId,
671 fragment_id_to_merge_operator_id: &mut HashMap<FragmentId, OperatorId>,
672 operator_id_to_stream_node: &mut HashMap<OperatorId, StreamNode>,
673 node: &PbStreamNode,
674 actor_ids: &HashSet<ActorId>,
675 ) {
676 let identity = node
677 .node_body
678 .as_ref()
679 .expect("should have node body")
680 .into();
681 let operator_id = unique_operator_id(fragment_id, node.operator_id);
682 if let Some(merge_node) = node.node_body.as_ref()
683 && let NodeBody::Merge(box MergeNode {
684 upstream_fragment_id,
685 ..
686 }) = merge_node
687 {
688 fragment_id_to_merge_operator_id.insert(*upstream_fragment_id, operator_id);
689 }
690 let dependencies = &node.input;
691 let dependency_ids = dependencies
692 .iter()
693 .map(|input| unique_operator_id(fragment_id, input.operator_id))
694 .collect::<Vec<_>>();
695 operator_id_to_stream_node.insert(
696 operator_id,
697 StreamNode {
698 operator_id,
699 fragment_id,
700 identity,
701 actor_ids: actor_ids.clone(),
702 dependencies: dependency_ids,
703 },
704 );
705 for dependency in dependencies {
706 extract_stream_node_info(
707 fragment_id,
708 fragment_id_to_merge_operator_id,
709 operator_id_to_stream_node,
710 dependency,
711 actor_ids,
712 );
713 }
714 }
715
716 let mut operator_id_to_stream_node = HashMap::new();
719 let mut fragment_id_to_merge_operator_id = HashMap::new();
720 for fragment in fragments {
721 let actors = fragment.actors;
722 assert!(
723 !actors.is_empty(),
724 "fragment {} should have at least one actor",
725 fragment.id
726 );
727 let actor_ids = actors.iter().map(|actor| actor.id).collect::<HashSet<_>>();
728 let node = actors[0].node.as_ref().expect("should have stream node");
729 extract_stream_node_info(
730 fragment.id,
731 &mut fragment_id_to_merge_operator_id,
732 &mut operator_id_to_stream_node,
733 node,
734 &actor_ids,
735 );
736 }
737
738 let root_or_dispatch_nodes = find_root_nodes(&operator_id_to_stream_node);
740 let mut root_node = None;
741 for operator_id in root_or_dispatch_nodes {
742 let node = operator_id_to_stream_node.get_mut(&operator_id).unwrap();
743 let fragment_id = node.fragment_id;
744 if let Some(merge_operator_id) = fragment_id_to_merge_operator_id.get(&fragment_id) {
745 let mut dispatcher = StreamNode::new_for_dispatcher(fragment_id);
746 let operator_id_for_dispatch = dispatcher.operator_id;
747 dispatcher.dependencies.push(operator_id);
748 assert!(
749 operator_id_to_stream_node
750 .insert(operator_id_for_dispatch as _, dispatcher)
751 .is_none()
752 );
753 operator_id_to_stream_node
754 .get_mut(merge_operator_id)
755 .unwrap()
756 .dependencies
757 .push(operator_id_for_dispatch as _)
758 } else {
759 root_node = Some(operator_id);
760 }
761 }
762
763 let mut dispatcher_fragment_ids = HashSet::new();
764 for dispatcher_fragment_id in fragment_id_to_merge_operator_id.keys() {
765 if job_fragment_ids.contains(dispatcher_fragment_id) {
766 dispatcher_fragment_ids.insert(*dispatcher_fragment_id);
767 }
768 }
769
770 (
771 root_node.unwrap(),
772 dispatcher_fragment_ids,
773 operator_id_to_stream_node,
774 )
775 }
776
777 pub(super) fn extract_executor_infos(
778 adjacency_list: &HashMap<OperatorId, StreamNode>,
779 ) -> (
780 HashSet<ExecutorId>,
781 HashMap<OperatorId, HashSet<ExecutorId>>,
782 ) {
783 let mut executor_ids = HashSet::new();
784 let mut operator_to_executor = HashMap::new();
785 for (operator_id, node) in adjacency_list {
786 assert_eq!(*operator_id, node.operator_id);
787 let operator_id = node.operator_id;
788 for actor_id in &node.actor_ids {
789 let executor_id =
790 unique_executor_id_from_unique_operator_id(*actor_id, operator_id);
791 if node.identity != NodeBodyDiscriminants::BatchPlan
792 && node.identity != NodeBodyDiscriminants::Merge
794 && node.identity != NodeBodyDiscriminants::Project
795 {
796 assert!(executor_ids.insert(executor_id));
797 }
798 assert!(
799 operator_to_executor
800 .entry(operator_id)
801 .or_insert_with(HashSet::new)
802 .insert(executor_id)
803 );
804 }
805 }
806 (executor_ids, operator_to_executor)
807 }
808
809 pub(super) fn render_graph_with_metrics(
814 adjacency_list: &HashMap<OperatorId, StreamNode>,
815 root_node: OperatorId,
816 stats: &OperatorStats,
817 profiling_duration: &Duration,
818 ) -> Vec<ExplainAnalyzeStreamJobOutput> {
819 let profiling_duration_secs = profiling_duration.as_secs_f64();
820 let mut rows = vec![];
821 let mut stack = vec![(String::new(), true, root_node)];
822 while let Some((prefix, last_child, node_id)) = stack.pop() {
823 let Some(node) = adjacency_list.get(&node_id) else {
824 continue;
825 };
826 let is_root = node_id == root_node;
827
828 let identity_rendered = if is_root {
829 node.identity.to_string()
830 } else {
831 let connector = if last_child { "└─ " } else { "├─ " };
832 format!("{}{}{}", prefix, connector, node.identity)
833 };
834
835 let child_prefix = if is_root {
836 ""
837 } else if last_child {
838 " "
839 } else {
840 "│ "
841 };
842 let child_prefix = format!("{}{}", prefix, child_prefix);
843
844 let stats = stats.get(&node_id);
845 let (output_rows_per_second, downstream_backpressure_ratio) = match stats {
846 Some(stats) => (
847 Some(
848 (stats.total_output_throughput as f64 / profiling_duration_secs)
849 .to_string(),
850 ),
851 Some(
852 (Duration::from_nanos(stats.total_output_pending_ns).as_secs_f64()
853 / usize::max(node.actor_ids.len(), 1) as f64
854 / profiling_duration_secs)
855 .to_string(),
856 ),
857 ),
858 None => (None, None),
859 };
860 let row = ExplainAnalyzeStreamJobOutput {
861 identity: identity_rendered,
862 actor_ids: node
863 .actor_ids
864 .iter()
865 .sorted()
866 .map(|id| id.to_string())
867 .collect::<Vec<_>>()
868 .join(","),
869 output_rows_per_second,
870 downstream_backpressure_ratio,
871 };
872 rows.push(row);
873 for (position, dependency) in node.dependencies.iter().enumerate() {
874 stack.push((child_prefix.clone(), position == 0, *dependency));
875 }
876 }
877 rows
878 }
879}
880
881mod utils {
882 use risingwave_common::operator::unique_operator_id;
883 use risingwave_pb::id::GlobalOperatorId;
884
885 use crate::catalog::FragmentId;
886
887 pub(super) fn operator_id_for_dispatch(fragment_id: FragmentId) -> GlobalOperatorId {
888 unique_operator_id(fragment_id, u32::MAX)
889 }
890}