1use std::collections::HashMap;
16
17use pgwire::pg_response::StatementType;
18use risingwave_common::types::Fields;
19use risingwave_sqlparser::ast::AnalyzeTarget;
20use tokio::time::Duration;
21
22use crate::error::Result;
23use crate::handler::explain_analyze_stream_job::graph::{
24 extract_executor_infos, extract_stream_node_infos, render_graph_with_metrics,
25};
26use crate::handler::{HandlerArgs, RwPgResponse, RwPgResponseBuilder, RwPgResponseBuilderExt};
27
28#[macro_export]
29macro_rules! debug_panic_or_warn {
30 ($($arg:tt)*) => {
31 if cfg!(debug_assertions) || cfg!(madsim) {
32 panic!($($arg)*);
33 } else {
34 tracing::warn!($($arg)*);
35 }
36 };
37}
38
39#[derive(Fields)]
40struct ExplainAnalyzeStreamJobOutput {
41 identity: String,
42 actor_ids: String,
43 output_rows_per_second: Option<String>,
44 downstream_backpressure_ratio: Option<String>,
45}
46
47pub async fn handle_explain_analyze_stream_job(
48 handler_args: HandlerArgs,
49 target: AnalyzeTarget,
50 duration_secs: Option<u64>,
51) -> Result<RwPgResponse> {
52 let profiling_duration = Duration::from_secs(duration_secs.unwrap_or(10));
53 let job_id = bind::bind_relation(&target, &handler_args)?;
54
55 let meta_client = handler_args.session.env().meta_client();
56 let fragments = net::get_fragments(meta_client, job_id).await?;
57 let fragment_parallelisms = fragments
58 .iter()
59 .map(|f| (f.id, f.actors.len()))
60 .collect::<HashMap<_, _>>();
61 let (root_node, dispatcher_fragment_ids, adjacency_list) = extract_stream_node_infos(fragments);
62 let (executor_ids, operator_to_executor) = extract_executor_infos(&adjacency_list);
63 tracing::debug!(
64 ?fragment_parallelisms,
65 ?root_node,
66 ?dispatcher_fragment_ids,
67 ?adjacency_list,
68 "explain analyze metadata"
69 );
70
71 let worker_nodes = net::list_stream_worker_nodes(handler_args.session.env()).await?;
72
73 let executor_stats = net::get_executor_stats(
74 &handler_args,
75 &worker_nodes,
76 &executor_ids,
77 &dispatcher_fragment_ids,
78 profiling_duration,
79 )
80 .await?;
81 tracing::debug!(?executor_stats, "collected executor stats");
82 let aggregated_stats = metrics::OperatorStats::aggregate(
83 operator_to_executor,
84 &executor_stats,
85 &fragment_parallelisms,
86 );
87 tracing::debug!(?aggregated_stats, "collected aggregated stats");
88
89 let rows = render_graph_with_metrics(
91 &adjacency_list,
92 root_node,
93 &aggregated_stats,
94 &profiling_duration,
95 );
96 let builder = RwPgResponseBuilder::empty(StatementType::EXPLAIN);
97 let builder = builder.rows(rows);
98 Ok(builder.into())
99}
100
101mod bind {
104 use risingwave_common::id::JobId;
105 use risingwave_sqlparser::ast::AnalyzeTarget;
106
107 use crate::Binder;
108 use crate::catalog::root_catalog::SchemaPath;
109 use crate::error::Result;
110 use crate::handler::HandlerArgs;
111
112 pub(super) fn bind_relation(
114 target_relation: &AnalyzeTarget,
115 handler_args: &HandlerArgs,
116 ) -> Result<JobId> {
117 let job_id = match &target_relation {
118 AnalyzeTarget::Id(id) => (*id).into(),
119 AnalyzeTarget::Index(name)
120 | AnalyzeTarget::Table(name)
121 | AnalyzeTarget::Sink(name)
122 | AnalyzeTarget::MaterializedView(name) => {
123 let session = &handler_args.session;
124 let db_name = session.database();
125 let (schema_name, name) = Binder::resolve_schema_qualified_name(&db_name, name)?;
126 let search_path = session.config().search_path();
127 let user_name = &session.user_name();
128 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
129
130 let catalog_reader = handler_args.session.env().catalog_reader();
131 let catalog = catalog_reader.read_guard();
132
133 match target_relation {
134 AnalyzeTarget::Index(_) => {
135 let (catalog, _schema_name) =
136 catalog.get_any_index_by_name(&db_name, schema_path, &name)?;
137 catalog.id.as_job_id()
138 }
139 AnalyzeTarget::Table(_) => {
140 let (catalog, _schema_name) =
141 catalog.get_any_table_by_name(&db_name, schema_path, &name)?;
142 catalog.id.as_job_id()
143 }
144 AnalyzeTarget::Sink(_) => {
145 let (catalog, _schema_name) =
146 catalog.get_any_sink_by_name(&db_name, schema_path, &name)?;
147 catalog.id.as_job_id()
148 }
149 AnalyzeTarget::MaterializedView(_) => {
150 let (catalog, _schema_name) =
151 catalog.get_any_table_by_name(&db_name, schema_path, &name)?;
152 catalog.id.as_job_id()
153 }
154 AnalyzeTarget::Id(_) => unreachable!(),
155 }
156 }
157 };
158 Ok(job_id)
159 }
160}
161
162mod net {
164 use std::collections::HashSet;
165
166 use risingwave_common::bail;
167 use risingwave_common::id::{FragmentId, JobId};
168 use risingwave_pb::common::WorkerNode;
169 use risingwave_pb::id::ExecutorId;
170 use risingwave_pb::meta::list_table_fragments_response::FragmentInfo;
171 use risingwave_pb::monitor_service::GetProfileStatsRequest;
172 use tokio::time::{Duration, sleep};
173
174 use crate::error::Result;
175 use crate::handler::HandlerArgs;
176 use crate::handler::explain_analyze_stream_job::metrics::ExecutorStats;
177 use crate::meta_client::FrontendMetaClient;
178 use crate::session::FrontendEnv;
179
180 pub(super) async fn list_stream_worker_nodes(env: &FrontendEnv) -> Result<Vec<WorkerNode>> {
181 let worker_nodes = env.meta_client().list_all_nodes().await?;
182 let stream_worker_nodes = worker_nodes
183 .into_iter()
184 .filter(|node| {
185 node.property
186 .as_ref()
187 .map(|p| p.is_streaming)
188 .unwrap_or_else(|| false)
189 })
190 .collect::<Vec<_>>();
191 Ok(stream_worker_nodes)
192 }
193
194 pub(super) async fn get_fragments(
196 meta_client: &dyn FrontendMetaClient,
197 job_id: JobId,
198 ) -> Result<Vec<FragmentInfo>> {
199 let mut fragment_map = meta_client.list_table_fragments(&[job_id]).await?;
200 let mut table_fragments = fragment_map.drain();
201 let Some((fragment_job_id, table_fragment_info)) = table_fragments.next() else {
202 bail!("table fragment of job {job_id} not found");
203 };
204 assert_eq!(
205 table_fragments.next(),
206 None,
207 "expected only at most one fragment"
208 );
209 assert_eq!(fragment_job_id, job_id);
210 Ok(table_fragment_info.fragments)
211 }
212
213 pub(super) async fn get_executor_stats(
214 handler_args: &HandlerArgs,
215 worker_nodes: &[WorkerNode],
216 executor_ids: &HashSet<ExecutorId>,
217 dispatcher_fragment_ids: &HashSet<FragmentId>,
218 profiling_duration: Duration,
219 ) -> Result<ExecutorStats> {
220 let dispatcher_fragment_ids = dispatcher_fragment_ids.iter().copied().collect::<Vec<_>>();
221 let mut initial_aggregated_stats = ExecutorStats::new();
222 let monitor_client_pool = handler_args.session.env().monitor_client_pool();
223
224 for node in worker_nodes {
225 let client = monitor_client_pool.get(node).await?;
226 let stats = client
227 .get_profile_stats(GetProfileStatsRequest {
228 executor_ids: executor_ids.iter().copied().collect(),
229 dispatcher_fragment_ids: dispatcher_fragment_ids.clone(),
230 })
231 .await
232 .expect("get profiling stats failed");
233 initial_aggregated_stats.record(executor_ids, &dispatcher_fragment_ids, &stats);
234 }
235 tracing::debug!(?initial_aggregated_stats, "initial aggregated stats");
236
237 sleep(profiling_duration).await;
238
239 let mut final_aggregated_stats = ExecutorStats::new();
240 for node in worker_nodes {
241 let client = monitor_client_pool.get(node).await?;
242 let stats = client
243 .get_profile_stats(GetProfileStatsRequest {
244 executor_ids: executor_ids.iter().copied().collect(),
245 dispatcher_fragment_ids: dispatcher_fragment_ids.clone(),
246 })
247 .await
248 .expect("get profiling stats failed");
249 final_aggregated_stats.record(executor_ids, &dispatcher_fragment_ids, &stats);
250 }
251 tracing::debug!(?final_aggregated_stats, "final aggregated stats");
252
253 let delta_aggregated_stats = ExecutorStats::get_delta(
254 &initial_aggregated_stats,
255 &final_aggregated_stats,
256 executor_ids,
257 &dispatcher_fragment_ids,
258 );
259
260 Ok(delta_aggregated_stats)
261 }
262}
263
264mod metrics {
269 use std::collections::{HashMap, HashSet};
270
271 use risingwave_common::operator::unique_executor_id_into_parts;
272 use risingwave_pb::id::{ExecutorId, GlobalOperatorId};
273 use risingwave_pb::monitor_service::GetProfileStatsResponse;
274
275 use crate::catalog::FragmentId;
276 use crate::handler::explain_analyze_stream_job::utils::operator_id_for_dispatch;
277
278 type OperatorId = GlobalOperatorId;
279
280 #[expect(dead_code)]
281 #[derive(Default, Debug)]
282 pub(super) struct ExecutorMetrics {
283 pub executor_id: ExecutorId,
284 pub epoch: u32,
285 pub total_output_throughput: u64,
286 pub total_output_pending_ns: u64,
287 }
288
289 #[derive(Default, Debug)]
290 pub(super) struct DispatchMetrics {
291 pub fragment_id: FragmentId,
292 pub epoch: u32,
293 pub total_output_throughput: u64,
294 pub total_output_pending_ns: u64,
295 }
296
297 #[derive(Debug)]
298 pub(super) struct ExecutorStats {
299 executor_stats: HashMap<ExecutorId, ExecutorMetrics>,
300 dispatch_stats: HashMap<FragmentId, DispatchMetrics>,
301 }
302
303 impl ExecutorStats {
304 pub(super) fn new() -> Self {
305 ExecutorStats {
306 executor_stats: HashMap::new(),
307 dispatch_stats: HashMap::new(),
308 }
309 }
310
311 pub fn get(&self, executor_id: &ExecutorId) -> Option<&ExecutorMetrics> {
312 self.executor_stats.get(executor_id)
313 }
314
315 pub(super) fn record<'a>(
317 &mut self,
318 executor_ids: &'a HashSet<ExecutorId>,
319 dispatch_fragment_ids: &'a [FragmentId],
320 metrics: &'a GetProfileStatsResponse,
321 ) {
322 for executor_id in executor_ids {
323 let Some(total_output_throughput) =
324 metrics.stream_node_output_row_count.get(executor_id)
325 else {
326 continue;
327 };
328 let Some(total_output_pending_ns) = metrics
329 .stream_node_output_blocking_duration_ns
330 .get(executor_id)
331 else {
332 continue;
333 };
334 let stats = ExecutorMetrics {
335 executor_id: *executor_id,
336 epoch: 0,
337 total_output_throughput: *total_output_throughput,
338 total_output_pending_ns: *total_output_pending_ns,
339 };
340 if cfg!(madsim) {
343 self.executor_stats.insert(*executor_id, stats);
348 } else {
349 assert!(self.executor_stats.insert(*executor_id, stats).is_none());
350 }
351 }
352
353 for fragment_id in dispatch_fragment_ids {
354 let Some(total_output_throughput) =
355 metrics.dispatch_fragment_output_row_count.get(fragment_id)
356 else {
357 continue;
358 };
359 let Some(total_output_pending_ns) = metrics
360 .dispatch_fragment_output_blocking_duration_ns
361 .get(fragment_id)
362 else {
363 continue;
364 };
365 let stats = self.dispatch_stats.entry(*fragment_id).or_default();
366 stats.fragment_id = *fragment_id;
367 stats.epoch = 0;
368 stats.total_output_throughput += *total_output_throughput;
372 stats.total_output_pending_ns += *total_output_pending_ns;
373 }
374 }
375
376 pub(super) fn get_delta(
377 initial: &Self,
378 end: &Self,
379 executor_ids: &HashSet<ExecutorId>,
380 dispatch_fragment_ids: &[FragmentId],
381 ) -> Self {
382 let mut delta_aggregated_stats = Self::new();
383 for executor_id in executor_ids {
384 let (actor_id, operator_id) = unique_executor_id_into_parts(*executor_id);
385 let Some(initial_stats) = initial.executor_stats.get(executor_id) else {
386 debug_panic_or_warn!(
387 "missing initial stats for executor {} (actor {} operator {})",
388 executor_id,
389 actor_id,
390 operator_id
391 );
392 continue;
393 };
394 let Some(end_stats) = end.executor_stats.get(executor_id) else {
395 debug_panic_or_warn!(
396 "missing final stats for executor {} (actor {} operator {})",
397 executor_id,
398 actor_id,
399 operator_id
400 );
401 continue;
402 };
403
404 let initial_throughput = initial_stats.total_output_throughput;
405 let end_throughput = end_stats.total_output_throughput;
406 let Some(delta_throughput) = end_throughput.checked_sub(initial_throughput) else {
407 debug_panic_or_warn!(
408 "delta throughput is negative for actor {} operator {} (initial: {}, end: {})",
409 actor_id,
410 operator_id,
411 initial_throughput,
412 end_throughput
413 );
414 continue;
415 };
416
417 let initial_pending_ns = initial_stats.total_output_pending_ns;
418 let end_pending_ns = end_stats.total_output_pending_ns;
419 let Some(delta_pending_ns) = end_pending_ns.checked_sub(initial_pending_ns) else {
420 debug_panic_or_warn!(
421 "delta pending ns is negative for actor {} operator {} (initial: {}, end: {})",
422 actor_id,
423 operator_id,
424 initial_pending_ns,
425 end_pending_ns
426 );
427 continue;
428 };
429
430 let delta_stats = ExecutorMetrics {
431 executor_id: *executor_id,
432 epoch: 0,
433 total_output_throughput: delta_throughput,
434 total_output_pending_ns: delta_pending_ns,
435 };
436 delta_aggregated_stats
437 .executor_stats
438 .insert(*executor_id, delta_stats);
439 }
440
441 for fragment_id in dispatch_fragment_ids {
442 let Some(initial_stats) = initial.dispatch_stats.get(fragment_id) else {
443 debug_panic_or_warn!("missing initial stats for fragment {}", fragment_id);
444 continue;
445 };
446 let Some(end_stats) = end.dispatch_stats.get(fragment_id) else {
447 debug_panic_or_warn!("missing final stats for fragment {}", fragment_id);
448 continue;
449 };
450
451 let initial_throughput = initial_stats.total_output_throughput;
452 let end_throughput = end_stats.total_output_throughput;
453 let Some(delta_throughput) = end_throughput.checked_sub(initial_throughput) else {
454 debug_panic_or_warn!(
455 "delta throughput is negative for fragment {} (initial: {}, end: {})",
456 fragment_id,
457 initial_throughput,
458 end_throughput
459 );
460 continue;
461 };
462
463 let initial_pending_ns = initial_stats.total_output_pending_ns;
464 let end_pending_ns = end_stats.total_output_pending_ns;
465 let Some(delta_pending_ns) = end_pending_ns.checked_sub(initial_pending_ns) else {
466 debug_panic_or_warn!(
467 "delta pending ns is negative for fragment {} (initial: {}, end: {})",
468 fragment_id,
469 initial_pending_ns,
470 end_pending_ns
471 );
472 continue;
473 };
474
475 let delta_stats = DispatchMetrics {
476 fragment_id: *fragment_id,
477 epoch: 0,
478 total_output_throughput: delta_throughput,
479 total_output_pending_ns: delta_pending_ns,
480 };
481 delta_aggregated_stats
482 .dispatch_stats
483 .insert(*fragment_id, delta_stats);
484 }
485
486 delta_aggregated_stats
487 }
488 }
489
490 #[expect(dead_code)]
491 #[derive(Debug)]
492 pub(super) struct OperatorMetrics {
493 pub operator_id: GlobalOperatorId,
494 pub epoch: u32,
495 pub total_output_throughput: u64,
496 pub total_output_pending_ns: u64,
497 }
498
499 #[derive(Debug)]
500 pub(super) struct OperatorStats {
501 inner: HashMap<GlobalOperatorId, OperatorMetrics>,
502 }
503
504 impl OperatorStats {
505 pub(super) fn aggregate(
507 operator_map: HashMap<OperatorId, HashSet<ExecutorId>>,
508 executor_stats: &ExecutorStats,
509 fragment_parallelisms: &HashMap<FragmentId, usize>,
510 ) -> Self {
511 let mut operator_stats = HashMap::new();
512 'operator_loop: for (operator_id, executor_ids) in operator_map {
513 let num_executors = executor_ids.len() as u64;
514 let mut total_output_throughput = 0;
515 let mut total_output_pending_ns = 0;
516 for executor_id in executor_ids {
517 if let Some(stats) = executor_stats.get(&executor_id) {
518 total_output_throughput += stats.total_output_throughput;
519 total_output_pending_ns += stats.total_output_pending_ns;
520 } else {
521 continue 'operator_loop;
523 }
524 }
525 let total_output_throughput = total_output_throughput;
526 let total_output_pending_ns = total_output_pending_ns / num_executors;
527
528 operator_stats.insert(
529 operator_id,
530 OperatorMetrics {
531 operator_id,
532 epoch: 0,
533 total_output_throughput,
534 total_output_pending_ns,
535 },
536 );
537 }
538
539 for (fragment_id, dispatch_metrics) in &executor_stats.dispatch_stats {
540 let operator_id = operator_id_for_dispatch(*fragment_id);
541 let total_output_throughput = dispatch_metrics.total_output_throughput;
542 let Some(fragment_parallelism) = fragment_parallelisms.get(fragment_id) else {
543 debug_panic_or_warn!(
544 "missing fragment parallelism for fragment {}",
545 fragment_id
546 );
547 continue;
548 };
549 let total_output_pending_ns =
550 dispatch_metrics.total_output_pending_ns / *fragment_parallelism as u64;
551
552 operator_stats.insert(
553 operator_id,
554 OperatorMetrics {
555 operator_id,
556 epoch: 0,
557 total_output_throughput,
558 total_output_pending_ns,
559 },
560 );
561 }
562
563 OperatorStats {
564 inner: operator_stats,
565 }
566 }
567
568 pub fn get(&self, operator_id: &OperatorId) -> Option<&OperatorMetrics> {
569 self.inner.get(operator_id)
570 }
571 }
572}
573
574mod graph {
577 use std::collections::{HashMap, HashSet};
578 use std::fmt::Debug;
579 use std::time::Duration;
580
581 use itertools::Itertools;
582 use risingwave_common::operator::{
583 unique_executor_id_from_unique_operator_id, unique_operator_id,
584 unique_operator_id_into_parts,
585 };
586 use risingwave_pb::id::{ActorId, GlobalOperatorId};
587 use risingwave_pb::meta::list_table_fragments_response::FragmentInfo;
588 use risingwave_pb::stream_plan::stream_node::{NodeBody, NodeBodyDiscriminants};
589 use risingwave_pb::stream_plan::{MergeNode, StreamNode as PbStreamNode};
590
591 use crate::catalog::FragmentId;
592 use crate::handler::explain_analyze_stream_job::ExplainAnalyzeStreamJobOutput;
593 use crate::handler::explain_analyze_stream_job::metrics::OperatorStats;
594 use crate::handler::explain_analyze_stream_job::utils::operator_id_for_dispatch;
595
596 pub(super) type OperatorId = GlobalOperatorId;
597 pub(super) use risingwave_pb::id::ExecutorId;
598
599 pub(super) struct StreamNode {
601 operator_id: OperatorId,
602 fragment_id: FragmentId,
603 identity: NodeBodyDiscriminants,
604 actor_ids: HashSet<ActorId>,
605 dependencies: Vec<OperatorId>,
606 }
607
608 impl Debug for StreamNode {
609 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
610 let (actor_id, operator_id) = unique_operator_id_into_parts(self.operator_id);
611 let operator_id_str = format!(
612 "{}: (actor_id: {}, operator_id: {})",
613 self.operator_id, actor_id, operator_id
614 );
615 write!(
616 f,
617 "StreamNode {{ operator_id: {}, fragment_id: {}, identity: {:?}, actor_ids: {:?}, dependencies: {:?} }}",
618 operator_id_str, self.fragment_id, self.identity, self.actor_ids, self.dependencies
619 )
620 }
621 }
622
623 impl StreamNode {
624 fn new_for_dispatcher(fragment_id: FragmentId) -> Self {
625 StreamNode {
626 operator_id: operator_id_for_dispatch(fragment_id),
627 fragment_id,
628 identity: NodeBodyDiscriminants::Exchange,
629 actor_ids: Default::default(),
630 dependencies: Default::default(),
631 }
632 }
633 }
634
635 pub(super) fn extract_stream_node_infos(
637 fragments: Vec<FragmentInfo>,
638 ) -> (
639 OperatorId,
640 HashSet<FragmentId>,
641 HashMap<OperatorId, StreamNode>,
642 ) {
643 let job_fragment_ids = fragments
644 .iter()
645 .map(|f| f.id)
646 .collect::<HashSet<FragmentId>>();
647
648 fn find_root_nodes(stream_nodes: &HashMap<OperatorId, StreamNode>) -> HashSet<OperatorId> {
650 let mut all_nodes = stream_nodes.keys().copied().collect::<HashSet<_>>();
651 for node in stream_nodes.values() {
652 for dependency in &node.dependencies {
653 all_nodes.remove(dependency);
654 }
655 }
656 all_nodes
657 }
658
659 fn extract_stream_node_info(
662 fragment_id: FragmentId,
663 fragment_id_to_merge_operator_id: &mut HashMap<FragmentId, OperatorId>,
664 operator_id_to_stream_node: &mut HashMap<OperatorId, StreamNode>,
665 node: &PbStreamNode,
666 actor_ids: &HashSet<ActorId>,
667 ) {
668 let identity = node
669 .node_body
670 .as_ref()
671 .expect("should have node body")
672 .into();
673 let operator_id = unique_operator_id(fragment_id, node.operator_id);
674 if let Some(merge_node) = node.node_body.as_ref()
675 && let NodeBody::Merge(box MergeNode {
676 upstream_fragment_id,
677 ..
678 }) = merge_node
679 {
680 fragment_id_to_merge_operator_id.insert(*upstream_fragment_id, operator_id);
681 }
682 let dependencies = &node.input;
683 let dependency_ids = dependencies
684 .iter()
685 .map(|input| unique_operator_id(fragment_id, input.operator_id))
686 .collect::<Vec<_>>();
687 operator_id_to_stream_node.insert(
688 operator_id,
689 StreamNode {
690 operator_id,
691 fragment_id,
692 identity,
693 actor_ids: actor_ids.clone(),
694 dependencies: dependency_ids,
695 },
696 );
697 for dependency in dependencies {
698 extract_stream_node_info(
699 fragment_id,
700 fragment_id_to_merge_operator_id,
701 operator_id_to_stream_node,
702 dependency,
703 actor_ids,
704 );
705 }
706 }
707
708 let mut operator_id_to_stream_node = HashMap::new();
711 let mut fragment_id_to_merge_operator_id = HashMap::new();
712 for fragment in fragments {
713 let actors = fragment.actors;
714 assert!(
715 !actors.is_empty(),
716 "fragment {} should have at least one actor",
717 fragment.id
718 );
719 let actor_ids = actors.iter().map(|actor| actor.id).collect::<HashSet<_>>();
720 let node = actors[0].node.as_ref().expect("should have stream node");
721 extract_stream_node_info(
722 fragment.id,
723 &mut fragment_id_to_merge_operator_id,
724 &mut operator_id_to_stream_node,
725 node,
726 &actor_ids,
727 );
728 }
729
730 let root_or_dispatch_nodes = find_root_nodes(&operator_id_to_stream_node);
732 let mut root_node = None;
733 for operator_id in root_or_dispatch_nodes {
734 let node = operator_id_to_stream_node.get_mut(&operator_id).unwrap();
735 let fragment_id = node.fragment_id;
736 if let Some(merge_operator_id) = fragment_id_to_merge_operator_id.get(&fragment_id) {
737 let mut dispatcher = StreamNode::new_for_dispatcher(fragment_id);
738 let operator_id_for_dispatch = dispatcher.operator_id;
739 dispatcher.dependencies.push(operator_id);
740 assert!(
741 operator_id_to_stream_node
742 .insert(operator_id_for_dispatch as _, dispatcher)
743 .is_none()
744 );
745 operator_id_to_stream_node
746 .get_mut(merge_operator_id)
747 .unwrap()
748 .dependencies
749 .push(operator_id_for_dispatch as _)
750 } else {
751 root_node = Some(operator_id);
752 }
753 }
754
755 let mut dispatcher_fragment_ids = HashSet::new();
756 for dispatcher_fragment_id in fragment_id_to_merge_operator_id.keys() {
757 if job_fragment_ids.contains(dispatcher_fragment_id) {
758 dispatcher_fragment_ids.insert(*dispatcher_fragment_id);
759 }
760 }
761
762 (
763 root_node.unwrap(),
764 dispatcher_fragment_ids,
765 operator_id_to_stream_node,
766 )
767 }
768
769 pub(super) fn extract_executor_infos(
770 adjacency_list: &HashMap<OperatorId, StreamNode>,
771 ) -> (
772 HashSet<ExecutorId>,
773 HashMap<OperatorId, HashSet<ExecutorId>>,
774 ) {
775 let mut executor_ids = HashSet::new();
776 let mut operator_to_executor = HashMap::new();
777 for (operator_id, node) in adjacency_list {
778 assert_eq!(*operator_id, node.operator_id);
779 let operator_id = node.operator_id;
780 for actor_id in &node.actor_ids {
781 let executor_id =
782 unique_executor_id_from_unique_operator_id(*actor_id, operator_id);
783 if node.identity != NodeBodyDiscriminants::BatchPlan
784 && node.identity != NodeBodyDiscriminants::Merge
786 && node.identity != NodeBodyDiscriminants::Project
787 {
788 assert!(executor_ids.insert(executor_id));
789 }
790 assert!(
791 operator_to_executor
792 .entry(operator_id)
793 .or_insert_with(HashSet::new)
794 .insert(executor_id)
795 );
796 }
797 }
798 (executor_ids, operator_to_executor)
799 }
800
801 pub(super) fn render_graph_with_metrics(
806 adjacency_list: &HashMap<OperatorId, StreamNode>,
807 root_node: OperatorId,
808 stats: &OperatorStats,
809 profiling_duration: &Duration,
810 ) -> Vec<ExplainAnalyzeStreamJobOutput> {
811 let profiling_duration_secs = profiling_duration.as_secs_f64();
812 let mut rows = vec![];
813 let mut stack = vec![(String::new(), true, root_node)];
814 while let Some((prefix, last_child, node_id)) = stack.pop() {
815 let Some(node) = adjacency_list.get(&node_id) else {
816 continue;
817 };
818 let is_root = node_id == root_node;
819
820 let identity_rendered = if is_root {
821 node.identity.to_string()
822 } else {
823 let connector = if last_child { "└─ " } else { "├─ " };
824 format!("{}{}{}", prefix, connector, node.identity)
825 };
826
827 let child_prefix = if is_root {
828 ""
829 } else if last_child {
830 " "
831 } else {
832 "│ "
833 };
834 let child_prefix = format!("{}{}", prefix, child_prefix);
835
836 let stats = stats.get(&node_id);
837 let (output_rows_per_second, downstream_backpressure_ratio) = match stats {
838 Some(stats) => (
839 Some(
840 (stats.total_output_throughput as f64 / profiling_duration_secs)
841 .to_string(),
842 ),
843 Some(
844 (Duration::from_nanos(stats.total_output_pending_ns).as_secs_f64()
845 / usize::max(node.actor_ids.len(), 1) as f64
846 / profiling_duration_secs)
847 .to_string(),
848 ),
849 ),
850 None => (None, None),
851 };
852 let row = ExplainAnalyzeStreamJobOutput {
853 identity: identity_rendered,
854 actor_ids: node
855 .actor_ids
856 .iter()
857 .sorted()
858 .map(|id| id.to_string())
859 .collect::<Vec<_>>()
860 .join(","),
861 output_rows_per_second,
862 downstream_backpressure_ratio,
863 };
864 rows.push(row);
865 for (position, dependency) in node.dependencies.iter().enumerate() {
866 stack.push((child_prefix.clone(), position == 0, *dependency));
867 }
868 }
869 rows
870 }
871}
872
873mod utils {
874 use risingwave_common::operator::unique_operator_id;
875 use risingwave_pb::id::GlobalOperatorId;
876
877 use crate::catalog::FragmentId;
878
879 pub(super) fn operator_id_for_dispatch(fragment_id: FragmentId) -> GlobalOperatorId {
880 unique_operator_id(fragment_id, u32::MAX)
881 }
882}