1use std::collections::HashMap;
16
17use pgwire::pg_response::StatementType;
18use risingwave_common::types::Fields;
19use risingwave_sqlparser::ast::AnalyzeTarget;
20use tokio::time::Duration;
21
22use crate::error::Result;
23use crate::handler::explain_analyze_stream_job::graph::{
24 extract_executor_infos, extract_stream_node_infos, render_graph_with_metrics,
25};
26use crate::handler::{HandlerArgs, RwPgResponse, RwPgResponseBuilder, RwPgResponseBuilderExt};
27
28#[macro_export]
29macro_rules! debug_panic_or_warn {
30 ($($arg:tt)*) => {
31 if cfg!(debug_assertions) || cfg!(madsim) {
32 panic!($($arg)*);
33 } else {
34 tracing::warn!($($arg)*);
35 }
36 };
37}
38
39#[derive(Fields)]
40struct ExplainAnalyzeStreamJobOutput {
41 identity: String,
42 actor_ids: String,
43 output_rows_per_second: Option<String>,
44 downstream_backpressure_ratio: Option<String>,
45}
46
47pub async fn handle_explain_analyze_stream_job(
48 handler_args: HandlerArgs,
49 target: AnalyzeTarget,
50 duration_secs: Option<u64>,
51) -> Result<RwPgResponse> {
52 let profiling_duration = Duration::from_secs(duration_secs.unwrap_or(10));
53 let job_id = bind::bind_relation(&target, &handler_args)?;
54
55 let meta_client = handler_args.session.env().meta_client();
56 let fragments = net::get_fragments(meta_client, job_id).await?;
57 let fragment_parallelisms = fragments
58 .iter()
59 .map(|f| (f.id, f.actors.len()))
60 .collect::<HashMap<_, _>>();
61 let (root_node, dispatcher_fragment_ids, adjacency_list) = extract_stream_node_infos(fragments);
62 let (executor_ids, operator_to_executor) = extract_executor_infos(&adjacency_list);
63 tracing::debug!(
64 ?fragment_parallelisms,
65 ?root_node,
66 ?dispatcher_fragment_ids,
67 ?adjacency_list,
68 "explain analyze metadata"
69 );
70
71 let worker_nodes = net::list_stream_worker_nodes(handler_args.session.env()).await?;
72
73 let executor_stats = net::get_executor_stats(
74 &handler_args,
75 &worker_nodes,
76 &executor_ids,
77 &dispatcher_fragment_ids,
78 profiling_duration,
79 )
80 .await?;
81 tracing::debug!(?executor_stats, "collected executor stats");
82 let aggregated_stats = metrics::OperatorStats::aggregate(
83 operator_to_executor,
84 &executor_stats,
85 &fragment_parallelisms,
86 );
87 tracing::debug!(?aggregated_stats, "collected aggregated stats");
88
89 let rows = render_graph_with_metrics(
91 &adjacency_list,
92 root_node,
93 &aggregated_stats,
94 &profiling_duration,
95 );
96 let builder = RwPgResponseBuilder::empty(StatementType::EXPLAIN);
97 let builder = builder.rows(rows);
98 Ok(builder.into())
99}
100
101mod bind {
104 use risingwave_common::id::JobId;
105 use risingwave_sqlparser::ast::AnalyzeTarget;
106
107 use crate::Binder;
108 use crate::catalog::root_catalog::SchemaPath;
109 use crate::error::Result;
110 use crate::handler::HandlerArgs;
111
112 pub(super) fn bind_relation(
114 target_relation: &AnalyzeTarget,
115 handler_args: &HandlerArgs,
116 ) -> Result<JobId> {
117 let job_id = match &target_relation {
118 AnalyzeTarget::Id(id) => (*id).into(),
119 AnalyzeTarget::Index(name)
120 | AnalyzeTarget::Table(name)
121 | AnalyzeTarget::Sink(name)
122 | AnalyzeTarget::MaterializedView(name) => {
123 let session = &handler_args.session;
124 let db_name = session.database();
125 let (schema_name, name) = Binder::resolve_schema_qualified_name(&db_name, name)?;
126 let search_path = session.config().search_path();
127 let user_name = &session.user_name();
128 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
129
130 let catalog_reader = handler_args.session.env().catalog_reader();
131 let catalog = catalog_reader.read_guard();
132
133 match target_relation {
134 AnalyzeTarget::Index(_) => {
135 let (catalog, _schema_name) =
136 catalog.get_any_index_by_name(&db_name, schema_path, &name)?;
137 catalog.id.as_job_id()
138 }
139 AnalyzeTarget::Table(_) => {
140 let (catalog, _schema_name) =
141 catalog.get_any_table_by_name(&db_name, schema_path, &name)?;
142 catalog.id.as_job_id()
143 }
144 AnalyzeTarget::Sink(_) => {
145 let (catalog, _schema_name) =
146 catalog.get_any_sink_by_name(&db_name, schema_path, &name)?;
147 catalog.id.as_job_id()
148 }
149 AnalyzeTarget::MaterializedView(_) => {
150 let (catalog, _schema_name) =
151 catalog.get_any_table_by_name(&db_name, schema_path, &name)?;
152 catalog.id.as_job_id()
153 }
154 AnalyzeTarget::Id(_) => unreachable!(),
155 }
156 }
157 };
158 Ok(job_id)
159 }
160}
161
162mod net {
164 use std::collections::HashSet;
165
166 use risingwave_common::id::{FragmentId, JobId};
167 use risingwave_pb::common::WorkerNode;
168 use risingwave_pb::meta::list_table_fragments_response::FragmentInfo;
169 use risingwave_pb::monitor_service::GetProfileStatsRequest;
170 use tokio::time::{Duration, sleep};
171
172 use crate::error::Result;
173 use crate::handler::HandlerArgs;
174 use crate::handler::explain_analyze_stream_job::graph::ExecutorId;
175 use crate::handler::explain_analyze_stream_job::metrics::ExecutorStats;
176 use crate::meta_client::FrontendMetaClient;
177 use crate::session::FrontendEnv;
178
179 pub(super) async fn list_stream_worker_nodes(env: &FrontendEnv) -> Result<Vec<WorkerNode>> {
180 let worker_nodes = env.meta_client().list_all_nodes().await?;
181 let stream_worker_nodes = worker_nodes
182 .into_iter()
183 .filter(|node| {
184 node.property
185 .as_ref()
186 .map(|p| p.is_streaming)
187 .unwrap_or_else(|| false)
188 })
189 .collect::<Vec<_>>();
190 Ok(stream_worker_nodes)
191 }
192
193 pub(super) async fn get_fragments(
195 meta_client: &dyn FrontendMetaClient,
196 job_id: JobId,
197 ) -> Result<Vec<FragmentInfo>> {
198 let mut fragment_map = meta_client.list_table_fragments(&[job_id]).await?;
199 assert_eq!(fragment_map.len(), 1, "expected only one fragment");
200 let (fragment_job_id, table_fragment_info) = fragment_map.drain().next().unwrap();
201 assert_eq!(fragment_job_id, job_id);
202 Ok(table_fragment_info.fragments)
203 }
204
205 pub(super) async fn get_executor_stats(
206 handler_args: &HandlerArgs,
207 worker_nodes: &[WorkerNode],
208 executor_ids: &HashSet<ExecutorId>,
209 dispatcher_fragment_ids: &HashSet<FragmentId>,
210 profiling_duration: Duration,
211 ) -> Result<ExecutorStats> {
212 let dispatcher_fragment_ids = dispatcher_fragment_ids.iter().copied().collect::<Vec<_>>();
213 let mut initial_aggregated_stats = ExecutorStats::new();
214 for node in worker_nodes {
215 let mut compute_client = handler_args.session.env().client_pool().get(node).await?;
216 let stats = compute_client
217 .monitor_client
218 .get_profile_stats(GetProfileStatsRequest {
219 executor_ids: executor_ids.iter().copied().collect(),
220 dispatcher_fragment_ids: dispatcher_fragment_ids.clone(),
221 })
222 .await
223 .expect("get profiling stats failed");
224 initial_aggregated_stats.record(
225 executor_ids,
226 &dispatcher_fragment_ids,
227 &stats.into_inner(),
228 );
229 }
230 tracing::debug!(?initial_aggregated_stats, "initial aggregated stats");
231
232 sleep(profiling_duration).await;
233
234 let mut final_aggregated_stats = ExecutorStats::new();
235 for node in worker_nodes {
236 let mut compute_client = handler_args.session.env().client_pool().get(node).await?;
237 let stats = compute_client
238 .monitor_client
239 .get_profile_stats(GetProfileStatsRequest {
240 executor_ids: executor_ids.iter().copied().collect(),
241 dispatcher_fragment_ids: dispatcher_fragment_ids.clone(),
242 })
243 .await
244 .expect("get profiling stats failed");
245 final_aggregated_stats.record(
246 executor_ids,
247 &dispatcher_fragment_ids,
248 &stats.into_inner(),
249 );
250 }
251 tracing::debug!(?final_aggregated_stats, "final aggregated stats");
252
253 let delta_aggregated_stats = ExecutorStats::get_delta(
254 &initial_aggregated_stats,
255 &final_aggregated_stats,
256 executor_ids,
257 &dispatcher_fragment_ids,
258 );
259
260 Ok(delta_aggregated_stats)
261 }
262}
263
264mod metrics {
269 use std::collections::{HashMap, HashSet};
270
271 use risingwave_common::operator::unique_executor_id_into_parts;
272 use risingwave_pb::monitor_service::GetProfileStatsResponse;
273
274 use crate::catalog::FragmentId;
275 use crate::handler::explain_analyze_stream_job::graph::{ExecutorId, OperatorId};
276 use crate::handler::explain_analyze_stream_job::utils::operator_id_for_dispatch;
277
278 #[expect(dead_code)]
279 #[derive(Default, Debug)]
280 pub(super) struct ExecutorMetrics {
281 pub executor_id: ExecutorId,
282 pub epoch: u32,
283 pub total_output_throughput: u64,
284 pub total_output_pending_ns: u64,
285 }
286
287 #[derive(Default, Debug)]
288 pub(super) struct DispatchMetrics {
289 pub fragment_id: FragmentId,
290 pub epoch: u32,
291 pub total_output_throughput: u64,
292 pub total_output_pending_ns: u64,
293 }
294
295 #[derive(Debug)]
296 pub(super) struct ExecutorStats {
297 executor_stats: HashMap<ExecutorId, ExecutorMetrics>,
298 dispatch_stats: HashMap<FragmentId, DispatchMetrics>,
299 }
300
301 impl ExecutorStats {
302 pub(super) fn new() -> Self {
303 ExecutorStats {
304 executor_stats: HashMap::new(),
305 dispatch_stats: HashMap::new(),
306 }
307 }
308
309 pub fn get(&self, executor_id: &ExecutorId) -> Option<&ExecutorMetrics> {
310 self.executor_stats.get(executor_id)
311 }
312
313 pub(super) fn record<'a>(
315 &mut self,
316 executor_ids: &'a HashSet<ExecutorId>,
317 dispatch_fragment_ids: &'a [FragmentId],
318 metrics: &'a GetProfileStatsResponse,
319 ) {
320 for executor_id in executor_ids {
321 let Some(total_output_throughput) =
322 metrics.stream_node_output_row_count.get(executor_id)
323 else {
324 continue;
325 };
326 let Some(total_output_pending_ns) = metrics
327 .stream_node_output_blocking_duration_ns
328 .get(executor_id)
329 else {
330 continue;
331 };
332 let stats = ExecutorMetrics {
333 executor_id: *executor_id,
334 epoch: 0,
335 total_output_throughput: *total_output_throughput,
336 total_output_pending_ns: *total_output_pending_ns,
337 };
338 if cfg!(madsim) {
341 self.executor_stats.insert(*executor_id, stats);
346 } else {
347 assert!(self.executor_stats.insert(*executor_id, stats).is_none());
348 }
349 }
350
351 for fragment_id in dispatch_fragment_ids {
352 let Some(total_output_throughput) =
353 metrics.dispatch_fragment_output_row_count.get(fragment_id)
354 else {
355 continue;
356 };
357 let Some(total_output_pending_ns) = metrics
358 .dispatch_fragment_output_blocking_duration_ns
359 .get(fragment_id)
360 else {
361 continue;
362 };
363 let stats = self.dispatch_stats.entry(*fragment_id).or_default();
364 stats.fragment_id = *fragment_id;
365 stats.epoch = 0;
366 stats.total_output_throughput += *total_output_throughput;
370 stats.total_output_pending_ns += *total_output_pending_ns;
371 }
372 }
373
374 pub(super) fn get_delta(
375 initial: &Self,
376 end: &Self,
377 executor_ids: &HashSet<ExecutorId>,
378 dispatch_fragment_ids: &[FragmentId],
379 ) -> Self {
380 let mut delta_aggregated_stats = Self::new();
381 for executor_id in executor_ids {
382 let (actor_id, operator_id) = unique_executor_id_into_parts(*executor_id);
383 let Some(initial_stats) = initial.executor_stats.get(executor_id) else {
384 debug_panic_or_warn!(
385 "missing initial stats for executor {} (actor {} operator {})",
386 executor_id,
387 actor_id,
388 operator_id
389 );
390 continue;
391 };
392 let Some(end_stats) = end.executor_stats.get(executor_id) else {
393 debug_panic_or_warn!(
394 "missing final stats for executor {} (actor {} operator {})",
395 executor_id,
396 actor_id,
397 operator_id
398 );
399 continue;
400 };
401
402 let initial_throughput = initial_stats.total_output_throughput;
403 let end_throughput = end_stats.total_output_throughput;
404 let Some(delta_throughput) = end_throughput.checked_sub(initial_throughput) else {
405 debug_panic_or_warn!(
406 "delta throughput is negative for actor {} operator {} (initial: {}, end: {})",
407 actor_id,
408 operator_id,
409 initial_throughput,
410 end_throughput
411 );
412 continue;
413 };
414
415 let initial_pending_ns = initial_stats.total_output_pending_ns;
416 let end_pending_ns = end_stats.total_output_pending_ns;
417 let Some(delta_pending_ns) = end_pending_ns.checked_sub(initial_pending_ns) else {
418 debug_panic_or_warn!(
419 "delta pending ns is negative for actor {} operator {} (initial: {}, end: {})",
420 actor_id,
421 operator_id,
422 initial_pending_ns,
423 end_pending_ns
424 );
425 continue;
426 };
427
428 let delta_stats = ExecutorMetrics {
429 executor_id: *executor_id,
430 epoch: 0,
431 total_output_throughput: delta_throughput,
432 total_output_pending_ns: delta_pending_ns,
433 };
434 delta_aggregated_stats
435 .executor_stats
436 .insert(*executor_id, delta_stats);
437 }
438
439 for fragment_id in dispatch_fragment_ids {
440 let Some(initial_stats) = initial.dispatch_stats.get(fragment_id) else {
441 debug_panic_or_warn!("missing initial stats for fragment {}", fragment_id);
442 continue;
443 };
444 let Some(end_stats) = end.dispatch_stats.get(fragment_id) else {
445 debug_panic_or_warn!("missing final stats for fragment {}", fragment_id);
446 continue;
447 };
448
449 let initial_throughput = initial_stats.total_output_throughput;
450 let end_throughput = end_stats.total_output_throughput;
451 let Some(delta_throughput) = end_throughput.checked_sub(initial_throughput) else {
452 debug_panic_or_warn!(
453 "delta throughput is negative for fragment {} (initial: {}, end: {})",
454 fragment_id,
455 initial_throughput,
456 end_throughput
457 );
458 continue;
459 };
460
461 let initial_pending_ns = initial_stats.total_output_pending_ns;
462 let end_pending_ns = end_stats.total_output_pending_ns;
463 let Some(delta_pending_ns) = end_pending_ns.checked_sub(initial_pending_ns) else {
464 debug_panic_or_warn!(
465 "delta pending ns is negative for fragment {} (initial: {}, end: {})",
466 fragment_id,
467 initial_pending_ns,
468 end_pending_ns
469 );
470 continue;
471 };
472
473 let delta_stats = DispatchMetrics {
474 fragment_id: *fragment_id,
475 epoch: 0,
476 total_output_throughput: delta_throughput,
477 total_output_pending_ns: delta_pending_ns,
478 };
479 delta_aggregated_stats
480 .dispatch_stats
481 .insert(*fragment_id, delta_stats);
482 }
483
484 delta_aggregated_stats
485 }
486 }
487
488 #[expect(dead_code)]
489 #[derive(Debug)]
490 pub(super) struct OperatorMetrics {
491 pub operator_id: OperatorId,
492 pub epoch: u32,
493 pub total_output_throughput: u64,
494 pub total_output_pending_ns: u64,
495 }
496
497 #[derive(Debug)]
498 pub(super) struct OperatorStats {
499 inner: HashMap<OperatorId, OperatorMetrics>,
500 }
501
502 impl OperatorStats {
503 pub(super) fn aggregate(
505 operator_map: HashMap<OperatorId, HashSet<ExecutorId>>,
506 executor_stats: &ExecutorStats,
507 fragment_parallelisms: &HashMap<FragmentId, usize>,
508 ) -> Self {
509 let mut operator_stats = HashMap::new();
510 'operator_loop: for (operator_id, executor_ids) in operator_map {
511 let num_executors = executor_ids.len() as u64;
512 let mut total_output_throughput = 0;
513 let mut total_output_pending_ns = 0;
514 for executor_id in executor_ids {
515 if let Some(stats) = executor_stats.get(&executor_id) {
516 total_output_throughput += stats.total_output_throughput;
517 total_output_pending_ns += stats.total_output_pending_ns;
518 } else {
519 continue 'operator_loop;
521 }
522 }
523 let total_output_throughput = total_output_throughput;
524 let total_output_pending_ns = total_output_pending_ns / num_executors;
525
526 operator_stats.insert(
527 operator_id,
528 OperatorMetrics {
529 operator_id,
530 epoch: 0,
531 total_output_throughput,
532 total_output_pending_ns,
533 },
534 );
535 }
536
537 for (fragment_id, dispatch_metrics) in &executor_stats.dispatch_stats {
538 let operator_id = operator_id_for_dispatch(*fragment_id);
539 let total_output_throughput = dispatch_metrics.total_output_throughput;
540 let Some(fragment_parallelism) = fragment_parallelisms.get(fragment_id) else {
541 debug_panic_or_warn!(
542 "missing fragment parallelism for fragment {}",
543 fragment_id
544 );
545 continue;
546 };
547 let total_output_pending_ns =
548 dispatch_metrics.total_output_pending_ns / *fragment_parallelism as u64;
549
550 operator_stats.insert(
551 operator_id,
552 OperatorMetrics {
553 operator_id,
554 epoch: 0,
555 total_output_throughput,
556 total_output_pending_ns,
557 },
558 );
559 }
560
561 OperatorStats {
562 inner: operator_stats,
563 }
564 }
565
566 pub fn get(&self, operator_id: &OperatorId) -> Option<&OperatorMetrics> {
567 self.inner.get(operator_id)
568 }
569 }
570}
571
572mod graph {
575 use std::collections::{HashMap, HashSet};
576 use std::fmt::Debug;
577 use std::time::Duration;
578
579 use itertools::Itertools;
580 use risingwave_common::operator::{
581 unique_executor_id_from_unique_operator_id, unique_operator_id,
582 unique_operator_id_into_parts,
583 };
584 use risingwave_pb::id::ActorId;
585 use risingwave_pb::meta::list_table_fragments_response::FragmentInfo;
586 use risingwave_pb::stream_plan::stream_node::{NodeBody, NodeBodyDiscriminants};
587 use risingwave_pb::stream_plan::{MergeNode, StreamNode as PbStreamNode};
588
589 use crate::catalog::FragmentId;
590 use crate::handler::explain_analyze_stream_job::ExplainAnalyzeStreamJobOutput;
591 use crate::handler::explain_analyze_stream_job::metrics::OperatorStats;
592 use crate::handler::explain_analyze_stream_job::utils::operator_id_for_dispatch;
593 pub(super) type OperatorId = u64;
594 pub(super) type ExecutorId = u64;
595
596 pub(super) struct StreamNode {
598 operator_id: OperatorId,
599 fragment_id: FragmentId,
600 identity: NodeBodyDiscriminants,
601 actor_ids: HashSet<ActorId>,
602 dependencies: Vec<u64>,
603 }
604
605 impl Debug for StreamNode {
606 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
607 let (actor_id, operator_id) = unique_operator_id_into_parts(self.operator_id);
608 let operator_id_str = format!(
609 "{}: (actor_id: {}, operator_id: {})",
610 self.operator_id, actor_id, operator_id
611 );
612 write!(
613 f,
614 "StreamNode {{ operator_id: {}, fragment_id: {}, identity: {:?}, actor_ids: {:?}, dependencies: {:?} }}",
615 operator_id_str, self.fragment_id, self.identity, self.actor_ids, self.dependencies
616 )
617 }
618 }
619
620 impl StreamNode {
621 fn new_for_dispatcher(fragment_id: FragmentId) -> Self {
622 StreamNode {
623 operator_id: operator_id_for_dispatch(fragment_id),
624 fragment_id,
625 identity: NodeBodyDiscriminants::Exchange,
626 actor_ids: Default::default(),
627 dependencies: Default::default(),
628 }
629 }
630 }
631
632 pub(super) fn extract_stream_node_infos(
634 fragments: Vec<FragmentInfo>,
635 ) -> (
636 OperatorId,
637 HashSet<FragmentId>,
638 HashMap<OperatorId, StreamNode>,
639 ) {
640 let job_fragment_ids = fragments
641 .iter()
642 .map(|f| f.id)
643 .collect::<HashSet<FragmentId>>();
644
645 fn find_root_nodes(stream_nodes: &HashMap<u64, StreamNode>) -> HashSet<u64> {
647 let mut all_nodes = stream_nodes.keys().copied().collect::<HashSet<_>>();
648 for node in stream_nodes.values() {
649 for dependency in &node.dependencies {
650 all_nodes.remove(dependency);
651 }
652 }
653 all_nodes
654 }
655
656 fn extract_stream_node_info(
659 fragment_id: FragmentId,
660 fragment_id_to_merge_operator_id: &mut HashMap<FragmentId, OperatorId>,
661 operator_id_to_stream_node: &mut HashMap<OperatorId, StreamNode>,
662 node: &PbStreamNode,
663 actor_ids: &HashSet<ActorId>,
664 ) {
665 let identity = node
666 .node_body
667 .as_ref()
668 .expect("should have node body")
669 .into();
670 let operator_id = unique_operator_id(fragment_id, node.operator_id);
671 if let Some(merge_node) = node.node_body.as_ref()
672 && let NodeBody::Merge(box MergeNode {
673 upstream_fragment_id,
674 ..
675 }) = merge_node
676 {
677 fragment_id_to_merge_operator_id.insert(*upstream_fragment_id, operator_id);
678 }
679 let dependencies = &node.input;
680 let dependency_ids = dependencies
681 .iter()
682 .map(|input| unique_operator_id(fragment_id, input.operator_id))
683 .collect::<Vec<_>>();
684 operator_id_to_stream_node.insert(
685 operator_id,
686 StreamNode {
687 operator_id,
688 fragment_id,
689 identity,
690 actor_ids: actor_ids.clone(),
691 dependencies: dependency_ids,
692 },
693 );
694 for dependency in dependencies {
695 extract_stream_node_info(
696 fragment_id,
697 fragment_id_to_merge_operator_id,
698 operator_id_to_stream_node,
699 dependency,
700 actor_ids,
701 );
702 }
703 }
704
705 let mut operator_id_to_stream_node = HashMap::new();
708 let mut fragment_id_to_merge_operator_id = HashMap::new();
709 for fragment in fragments {
710 let actors = fragment.actors;
711 assert!(
712 !actors.is_empty(),
713 "fragment {} should have at least one actor",
714 fragment.id
715 );
716 let actor_ids = actors.iter().map(|actor| actor.id).collect::<HashSet<_>>();
717 let node = actors[0].node.as_ref().expect("should have stream node");
718 extract_stream_node_info(
719 fragment.id,
720 &mut fragment_id_to_merge_operator_id,
721 &mut operator_id_to_stream_node,
722 node,
723 &actor_ids,
724 );
725 }
726
727 let root_or_dispatch_nodes = find_root_nodes(&operator_id_to_stream_node);
729 let mut root_node = None;
730 for operator_id in root_or_dispatch_nodes {
731 let node = operator_id_to_stream_node.get_mut(&operator_id).unwrap();
732 let fragment_id = node.fragment_id;
733 if let Some(merge_operator_id) = fragment_id_to_merge_operator_id.get(&fragment_id) {
734 let mut dispatcher = StreamNode::new_for_dispatcher(fragment_id);
735 let operator_id_for_dispatch = dispatcher.operator_id;
736 dispatcher.dependencies.push(operator_id);
737 assert!(
738 operator_id_to_stream_node
739 .insert(operator_id_for_dispatch as _, dispatcher)
740 .is_none()
741 );
742 operator_id_to_stream_node
743 .get_mut(merge_operator_id)
744 .unwrap()
745 .dependencies
746 .push(operator_id_for_dispatch as _)
747 } else {
748 root_node = Some(operator_id);
749 }
750 }
751
752 let mut dispatcher_fragment_ids = HashSet::new();
753 for dispatcher_fragment_id in fragment_id_to_merge_operator_id.keys() {
754 if job_fragment_ids.contains(dispatcher_fragment_id) {
755 dispatcher_fragment_ids.insert(*dispatcher_fragment_id);
756 }
757 }
758
759 (
760 root_node.unwrap(),
761 dispatcher_fragment_ids,
762 operator_id_to_stream_node,
763 )
764 }
765
766 pub(super) fn extract_executor_infos(
767 adjacency_list: &HashMap<OperatorId, StreamNode>,
768 ) -> (HashSet<u64>, HashMap<u64, HashSet<u64>>) {
769 let mut executor_ids = HashSet::new();
770 let mut operator_to_executor = HashMap::new();
771 for (operator_id, node) in adjacency_list {
772 assert_eq!(*operator_id, node.operator_id);
773 let operator_id = node.operator_id;
774 for actor_id in &node.actor_ids {
775 let executor_id =
776 unique_executor_id_from_unique_operator_id(*actor_id, operator_id);
777 if node.identity != NodeBodyDiscriminants::BatchPlan
778 && node.identity != NodeBodyDiscriminants::Merge
780 && node.identity != NodeBodyDiscriminants::Project
781 {
782 assert!(executor_ids.insert(executor_id));
783 }
784 assert!(
785 operator_to_executor
786 .entry(operator_id)
787 .or_insert_with(HashSet::new)
788 .insert(executor_id)
789 );
790 }
791 }
792 (executor_ids, operator_to_executor)
793 }
794
795 pub(super) fn render_graph_with_metrics(
800 adjacency_list: &HashMap<u64, StreamNode>,
801 root_node: u64,
802 stats: &OperatorStats,
803 profiling_duration: &Duration,
804 ) -> Vec<ExplainAnalyzeStreamJobOutput> {
805 let profiling_duration_secs = profiling_duration.as_secs_f64();
806 let mut rows = vec![];
807 let mut stack = vec![(String::new(), true, root_node)];
808 while let Some((prefix, last_child, node_id)) = stack.pop() {
809 let Some(node) = adjacency_list.get(&node_id) else {
810 continue;
811 };
812 let is_root = node_id == root_node;
813
814 let identity_rendered = if is_root {
815 node.identity.to_string()
816 } else {
817 let connector = if last_child { "└─ " } else { "├─ " };
818 format!("{}{}{}", prefix, connector, node.identity)
819 };
820
821 let child_prefix = if is_root {
822 ""
823 } else if last_child {
824 " "
825 } else {
826 "│ "
827 };
828 let child_prefix = format!("{}{}", prefix, child_prefix);
829
830 let stats = stats.get(&node_id);
831 let (output_rows_per_second, downstream_backpressure_ratio) = match stats {
832 Some(stats) => (
833 Some(
834 (stats.total_output_throughput as f64 / profiling_duration_secs)
835 .to_string(),
836 ),
837 Some(
838 (Duration::from_nanos(stats.total_output_pending_ns).as_secs_f64()
839 / usize::max(node.actor_ids.len(), 1) as f64
840 / profiling_duration_secs)
841 .to_string(),
842 ),
843 ),
844 None => (None, None),
845 };
846 let row = ExplainAnalyzeStreamJobOutput {
847 identity: identity_rendered,
848 actor_ids: node
849 .actor_ids
850 .iter()
851 .sorted()
852 .map(|id| id.to_string())
853 .collect::<Vec<_>>()
854 .join(","),
855 output_rows_per_second,
856 downstream_backpressure_ratio,
857 };
858 rows.push(row);
859 for (position, dependency) in node.dependencies.iter().enumerate() {
860 stack.push((child_prefix.clone(), position == 0, *dependency));
861 }
862 }
863 rows
864 }
865}
866
867mod utils {
868 use risingwave_common::operator::unique_operator_id;
869
870 use crate::catalog::FragmentId;
871 use crate::handler::explain_analyze_stream_job::graph::OperatorId;
872
873 pub(super) fn operator_id_for_dispatch(fragment_id: FragmentId) -> OperatorId {
874 unique_operator_id(fragment_id, u32::MAX as u64)
875 }
876}