1use std::collections::HashMap;
16
17use pgwire::pg_response::StatementType;
18use risingwave_common::types::Fields;
19use risingwave_sqlparser::ast::AnalyzeTarget;
20use tokio::time::Duration;
21
22use crate::error::Result;
23use crate::handler::explain_analyze_stream_job::graph::{
24 extract_executor_infos, extract_stream_node_infos, render_graph_with_metrics,
25};
26use crate::handler::{HandlerArgs, RwPgResponse, RwPgResponseBuilder, RwPgResponseBuilderExt};
27
28#[derive(Fields)]
29struct ExplainAnalyzeStreamJobOutput {
30 identity: String,
31 actor_ids: String,
32 output_rows_per_second: String,
33 downstream_backpressure_ratio: String,
34}
35
36pub async fn handle_explain_analyze_stream_job(
37 handler_args: HandlerArgs,
38 target: AnalyzeTarget,
39 duration_secs: Option<u64>,
40) -> Result<RwPgResponse> {
41 let profiling_duration = Duration::from_secs(duration_secs.unwrap_or(10));
42 let job_id = bind::bind_relation(&target, &handler_args)?;
43
44 let meta_client = handler_args.session.env().meta_client();
45 let fragments = net::get_fragments(meta_client, job_id).await?;
46 let dispatcher_fragment_ids = fragments.iter().map(|f| f.id).collect::<Vec<_>>();
47 let fragment_parallelisms = fragments
48 .iter()
49 .map(|f| (f.id, f.actors.len()))
50 .collect::<HashMap<_, _>>();
51 let (root_node, adjacency_list) = extract_stream_node_infos(fragments);
52 let (executor_ids, operator_to_executor) = extract_executor_infos(&adjacency_list);
53
54 let worker_nodes = net::list_stream_worker_nodes(handler_args.session.env()).await?;
55
56 let executor_stats = net::get_executor_stats(
57 &handler_args,
58 &worker_nodes,
59 &executor_ids,
60 &dispatcher_fragment_ids,
61 profiling_duration,
62 )
63 .await?;
64 tracing::debug!(?executor_stats, "collected executor stats");
65 let aggregated_stats = metrics::OperatorStats::aggregate(
66 operator_to_executor,
67 &executor_stats,
68 &fragment_parallelisms,
69 );
70 tracing::debug!(?aggregated_stats, "collected aggregated stats");
71
72 let rows = render_graph_with_metrics(
74 &adjacency_list,
75 root_node,
76 &aggregated_stats,
77 &profiling_duration,
78 );
79 let builder = RwPgResponseBuilder::empty(StatementType::EXPLAIN);
80 let builder = builder.rows(rows);
81 Ok(builder.into())
82}
83
84mod bind {
87 use risingwave_sqlparser::ast::AnalyzeTarget;
88
89 use crate::Binder;
90 use crate::catalog::root_catalog::SchemaPath;
91 use crate::error::Result;
92 use crate::handler::HandlerArgs;
93
94 pub(super) fn bind_relation(
96 target_relation: &AnalyzeTarget,
97 handler_args: &HandlerArgs,
98 ) -> Result<u32> {
99 let job_id = match &target_relation {
100 AnalyzeTarget::Id(id) => *id,
101 AnalyzeTarget::Index(name)
102 | AnalyzeTarget::Table(name)
103 | AnalyzeTarget::Sink(name)
104 | AnalyzeTarget::MaterializedView(name) => {
105 let session = &handler_args.session;
106 let db_name = session.database();
107 let (schema_name, name) =
108 Binder::resolve_schema_qualified_name(&db_name, name.clone())?;
109 let search_path = session.config().search_path();
110 let user_name = &session.user_name();
111 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
112
113 let catalog_reader = handler_args.session.env().catalog_reader();
114 let catalog = catalog_reader.read_guard();
115
116 match target_relation {
117 AnalyzeTarget::Index(_) => {
118 let (catalog, _schema_name) =
119 catalog.get_index_by_name(&db_name, schema_path, &name)?;
120 catalog.id.index_id
121 }
122 AnalyzeTarget::Table(_) => {
123 let (catalog, _schema_name) =
124 catalog.get_any_table_by_name(&db_name, schema_path, &name)?;
125 catalog.id.table_id
126 }
127 AnalyzeTarget::Sink(_) => {
128 let (catalog, _schema_name) =
129 catalog.get_sink_by_name(&db_name, schema_path, &name)?;
130 catalog.id.sink_id
131 }
132 AnalyzeTarget::MaterializedView(_) => {
133 let (catalog, _schema_name) =
134 catalog.get_any_table_by_name(&db_name, schema_path, &name)?;
135 catalog.id.table_id
136 }
137 AnalyzeTarget::Id(_) => unreachable!(),
138 }
139 }
140 };
141 Ok(job_id)
142 }
143}
144
145mod net {
147 use risingwave_pb::common::WorkerNode;
148 use risingwave_pb::meta::list_table_fragments_response::FragmentInfo;
149 use risingwave_pb::monitor_service::GetProfileStatsRequest;
150 use tokio::time::{Duration, sleep};
151
152 use crate::error::Result;
153 use crate::handler::HandlerArgs;
154 use crate::handler::explain_analyze_stream_job::graph::ExecutorId;
155 use crate::handler::explain_analyze_stream_job::metrics::ExecutorStats;
156 use crate::meta_client::FrontendMetaClient;
157 use crate::session::FrontendEnv;
158
159 pub(super) async fn list_stream_worker_nodes(env: &FrontendEnv) -> Result<Vec<WorkerNode>> {
160 let worker_nodes = env.meta_client().list_all_nodes().await?;
161 let stream_worker_nodes = worker_nodes
162 .into_iter()
163 .filter(|node| {
164 node.property
165 .as_ref()
166 .map(|p| p.is_streaming)
167 .unwrap_or_else(|| false)
168 })
169 .collect::<Vec<_>>();
170 Ok(stream_worker_nodes)
171 }
172
173 pub(super) async fn get_fragments(
175 meta_client: &dyn FrontendMetaClient,
176 job_id: u32,
177 ) -> Result<Vec<FragmentInfo>> {
178 let mut fragment_map = meta_client.list_table_fragments(&[job_id]).await?;
179 assert_eq!(fragment_map.len(), 1, "expected only one fragment");
180 let (fragment_job_id, table_fragment_info) = fragment_map.drain().next().unwrap();
181 assert_eq!(fragment_job_id, job_id);
182 Ok(table_fragment_info.fragments)
183 }
184
185 pub(super) async fn get_executor_stats(
186 handler_args: &HandlerArgs,
187 worker_nodes: &[WorkerNode],
188 executor_ids: &[ExecutorId],
189 dispatcher_fragment_ids: &[u32],
190 profiling_duration: Duration,
191 ) -> Result<ExecutorStats> {
192 let mut aggregated_stats = ExecutorStats::new();
193 for node in worker_nodes {
194 let mut compute_client = handler_args.session.env().client_pool().get(node).await?;
195 let stats = compute_client
196 .monitor_client
197 .get_profile_stats(GetProfileStatsRequest {
198 executor_ids: executor_ids.into(),
199 dispatcher_fragment_ids: dispatcher_fragment_ids.into(),
200 })
201 .await
202 .expect("get profiling stats failed");
203 aggregated_stats.start_record(
204 executor_ids,
205 dispatcher_fragment_ids,
206 &stats.into_inner(),
207 );
208 }
209
210 sleep(profiling_duration).await;
211
212 for node in worker_nodes {
213 let mut compute_client = handler_args.session.env().client_pool().get(node).await?;
214 let stats = compute_client
215 .monitor_client
216 .get_profile_stats(GetProfileStatsRequest {
217 executor_ids: executor_ids.into(),
218 dispatcher_fragment_ids: dispatcher_fragment_ids.into(),
219 })
220 .await
221 .expect("get profiling stats failed");
222 aggregated_stats.finish_record(
223 executor_ids,
224 dispatcher_fragment_ids,
225 &stats.into_inner(),
226 );
227 }
228
229 Ok(aggregated_stats)
230 }
231}
232
233mod metrics {
238 use std::collections::HashMap;
239
240 use risingwave_pb::monitor_service::GetProfileStatsResponse;
241
242 use crate::catalog::FragmentId;
243 use crate::handler::explain_analyze_stream_job::graph::{ExecutorId, OperatorId};
244
245 #[derive(Default, Debug)]
246 pub(super) struct ExecutorMetrics {
247 pub executor_id: ExecutorId,
248 pub epoch: u32,
249 pub total_output_throughput: u64,
250 pub total_output_pending_ns: u64,
251 }
252
253 #[derive(Default, Debug)]
254 pub(super) struct DispatchMetrics {
255 pub fragment_id: FragmentId,
256 pub epoch: u32,
257 pub total_output_throughput: u64,
258 pub total_output_pending_ns: u64,
259 }
260
261 #[derive(Debug)]
262 pub(super) struct ExecutorStats {
263 executor_stats: HashMap<ExecutorId, ExecutorMetrics>,
264 dispatch_stats: HashMap<FragmentId, DispatchMetrics>,
265 }
266
267 impl ExecutorStats {
268 pub(super) fn new() -> Self {
269 ExecutorStats {
270 executor_stats: HashMap::new(),
271 dispatch_stats: HashMap::new(),
272 }
273 }
274
275 pub fn get(&self, executor_id: &ExecutorId) -> Option<&ExecutorMetrics> {
276 self.executor_stats.get(executor_id)
277 }
278
279 pub(super) fn start_record<'a>(
281 &mut self,
282 executor_ids: &'a [ExecutorId],
283 dispatch_fragment_ids: &'a [FragmentId],
284 metrics: &'a GetProfileStatsResponse,
285 ) {
286 for executor_id in executor_ids {
287 let stats = self.executor_stats.entry(*executor_id).or_default();
288 stats.executor_id = *executor_id;
289 stats.epoch = 0;
290 stats.total_output_throughput += metrics
291 .stream_node_output_row_count
292 .get(executor_id)
293 .cloned()
294 .unwrap_or(0);
295 stats.total_output_pending_ns += metrics
296 .stream_node_output_blocking_duration_ns
297 .get(executor_id)
298 .cloned()
299 .unwrap_or(0);
300 }
301
302 for fragment_id in dispatch_fragment_ids {
303 let stats = self.dispatch_stats.entry(*fragment_id).or_default();
304 stats.fragment_id = *fragment_id;
305 stats.epoch = 0;
306 stats.total_output_throughput += metrics
307 .dispatch_fragment_output_row_count
308 .get(fragment_id)
309 .cloned()
310 .unwrap_or(0);
311 stats.total_output_pending_ns += metrics
312 .dispatch_fragment_output_blocking_duration_ns
313 .get(fragment_id)
314 .cloned()
315 .unwrap_or(0);
316 }
317 }
318
319 pub(super) fn finish_record<'a>(
321 &mut self,
322 executor_ids: &'a [ExecutorId],
323 dispatch_fragment_ids: &'a [FragmentId],
324 metrics: &'a GetProfileStatsResponse,
325 ) {
326 for executor_id in executor_ids {
327 if let Some(stats) = self.executor_stats.get_mut(executor_id) {
328 stats.total_output_throughput = metrics
329 .stream_node_output_row_count
330 .get(executor_id)
331 .cloned()
332 .unwrap_or(0)
333 - stats.total_output_throughput;
334 stats.total_output_pending_ns = metrics
335 .stream_node_output_blocking_duration_ns
336 .get(executor_id)
337 .cloned()
338 .unwrap_or(0)
339 - stats.total_output_pending_ns;
340 } else {
341 }
343 }
344
345 for fragment_id in dispatch_fragment_ids {
346 if let Some(stats) = self.dispatch_stats.get_mut(fragment_id) {
347 stats.total_output_throughput = metrics
348 .dispatch_fragment_output_row_count
349 .get(fragment_id)
350 .cloned()
351 .unwrap_or(0)
352 - stats.total_output_throughput;
353 stats.total_output_pending_ns = metrics
354 .dispatch_fragment_output_blocking_duration_ns
355 .get(fragment_id)
356 .cloned()
357 .unwrap_or(0)
358 - stats.total_output_pending_ns;
359 } else {
360 }
362 }
363 }
364 }
365
366 #[expect(dead_code)]
367 #[derive(Debug)]
368 pub(super) struct OperatorMetrics {
369 pub operator_id: OperatorId,
370 pub epoch: u32,
371 pub total_output_throughput: u64,
372 pub total_output_pending_ns: u64,
373 }
374
375 #[derive(Debug)]
376 pub(super) struct OperatorStats {
377 inner: HashMap<OperatorId, OperatorMetrics>,
378 }
379
380 impl OperatorStats {
381 pub(super) fn aggregate(
383 operator_map: HashMap<OperatorId, Vec<ExecutorId>>,
384 executor_stats: &ExecutorStats,
385 fragment_parallelisms: &HashMap<FragmentId, usize>,
386 ) -> Self {
387 let mut operator_stats = HashMap::new();
388 for (operator_id, executor_ids) in operator_map {
389 let num_executors = executor_ids.len() as u64;
390 let mut total_output_throughput = 0;
391 let mut total_output_pending_ns = 0;
392 for executor_id in executor_ids {
393 if let Some(stats) = executor_stats.get(&executor_id) {
394 total_output_throughput += stats.total_output_throughput;
395 total_output_pending_ns += stats.total_output_pending_ns;
396 }
397 }
398 let total_output_throughput = total_output_throughput;
399 let total_output_pending_ns = total_output_pending_ns / num_executors;
400
401 operator_stats.insert(
402 operator_id,
403 OperatorMetrics {
404 operator_id,
405 epoch: 0,
406 total_output_throughput,
407 total_output_pending_ns,
408 },
409 );
410 }
411
412 for (fragment_id, dispatch_metrics) in &executor_stats.dispatch_stats {
413 let operator_id = *fragment_id as OperatorId;
414 let total_output_throughput = dispatch_metrics.total_output_throughput;
415 let fragment_parallelism = fragment_parallelisms
416 .get(fragment_id)
417 .copied()
418 .expect("should have fragment parallelism");
419 let total_output_pending_ns =
420 dispatch_metrics.total_output_pending_ns / fragment_parallelism as u64;
421
422 operator_stats.insert(
423 operator_id,
424 OperatorMetrics {
425 operator_id,
426 epoch: 0,
427 total_output_throughput,
428 total_output_pending_ns,
429 },
430 );
431 }
432
433 OperatorStats {
434 inner: operator_stats,
435 }
436 }
437
438 pub fn get(&self, operator_id: &OperatorId) -> Option<&OperatorMetrics> {
439 self.inner.get(operator_id)
440 }
441 }
442}
443
444mod graph {
447 use std::collections::{HashMap, HashSet};
448 use std::time::Duration;
449
450 use risingwave_common::operator::{
451 unique_executor_id_from_unique_operator_id, unique_operator_id,
452 };
453 use risingwave_pb::meta::list_table_fragments_response::FragmentInfo;
454 use risingwave_pb::stream_plan::stream_node::NodeBody;
455 use risingwave_pb::stream_plan::{MergeNode, StreamNode as PbStreamNode};
456
457 use crate::handler::explain_analyze_stream_job::ExplainAnalyzeStreamJobOutput;
458 use crate::handler::explain_analyze_stream_job::metrics::OperatorStats;
459
460 pub(super) type OperatorId = u64;
461 pub(super) type ExecutorId = u64;
462
463 #[derive(Debug)]
465 pub(super) struct StreamNode {
466 operator_id: OperatorId,
467 fragment_id: u32,
468 identity: String,
469 actor_ids: Vec<u32>,
470 dependencies: Vec<u64>,
471 }
472
473 impl StreamNode {
474 fn new_for_dispatcher(fragment_id: u32) -> Self {
475 StreamNode {
476 operator_id: fragment_id as u64,
477 fragment_id,
478 identity: "Dispatcher".to_owned(),
479 actor_ids: vec![],
480 dependencies: vec![],
481 }
482 }
483 }
484
485 pub(super) fn extract_stream_node_infos(
487 fragments: Vec<FragmentInfo>,
488 ) -> (OperatorId, HashMap<OperatorId, StreamNode>) {
489 fn find_root_nodes(stream_nodes: &HashMap<u64, StreamNode>) -> HashSet<u64> {
492 let mut all_nodes = stream_nodes.keys().copied().collect::<HashSet<_>>();
493 for node in stream_nodes.values() {
494 for dependency in &node.dependencies {
495 all_nodes.remove(dependency);
496 }
497 }
498 all_nodes
499 }
500
501 fn extract_stream_node_info(
504 fragment_id: u32,
505 fragment_id_to_merge_operator_id: &mut HashMap<u32, OperatorId>,
506 operator_id_to_stream_node: &mut HashMap<OperatorId, StreamNode>,
507 node: &PbStreamNode,
508 actor_id: u32,
509 ) {
510 let identity = node
511 .identity
512 .split_ascii_whitespace()
513 .next()
514 .unwrap()
515 .to_owned();
516 let operator_id = unique_operator_id(fragment_id, node.operator_id);
517 if let Some(merge_node) = node.node_body.as_ref()
518 && let NodeBody::Merge(box MergeNode {
519 upstream_fragment_id,
520 ..
521 }) = merge_node
522 {
523 fragment_id_to_merge_operator_id.insert(*upstream_fragment_id, operator_id);
524 }
525 let dependencies = &node.input;
526 let entry = operator_id_to_stream_node
527 .entry(operator_id)
528 .or_insert_with(|| {
529 let dependencies = dependencies
530 .iter()
531 .map(|input| unique_operator_id(fragment_id, input.operator_id))
532 .collect();
533 StreamNode {
534 operator_id,
535 fragment_id,
536 identity,
537 actor_ids: vec![],
538 dependencies,
539 }
540 });
541 entry.actor_ids.push(actor_id);
542 for dependency in dependencies {
543 extract_stream_node_info(
544 fragment_id,
545 fragment_id_to_merge_operator_id,
546 operator_id_to_stream_node,
547 dependency,
548 actor_id,
549 );
550 }
551 }
552
553 let mut operator_id_to_stream_node = HashMap::new();
556 let mut fragment_id_to_merge_operator_id = HashMap::new();
557 for fragment in fragments {
558 let actors = fragment.actors;
559 for actor in actors {
560 let actor_id = actor.id;
561 let node = actor.node.unwrap();
562 extract_stream_node_info(
563 fragment.id,
564 &mut fragment_id_to_merge_operator_id,
565 &mut operator_id_to_stream_node,
566 &node,
567 actor_id,
568 );
569 }
570 }
571
572 let root_or_dispatch_nodes = find_root_nodes(&operator_id_to_stream_node);
574 let mut root_node = None;
575 for operator_id in root_or_dispatch_nodes {
576 let node = operator_id_to_stream_node.get_mut(&operator_id).unwrap();
577 let fragment_id = node.fragment_id;
578 if let Some(merge_operator_id) = fragment_id_to_merge_operator_id.get(&fragment_id) {
579 let mut dispatcher = StreamNode::new_for_dispatcher(fragment_id);
580 dispatcher.dependencies.push(operator_id);
581 assert!(
582 operator_id_to_stream_node
583 .insert(fragment_id as _, dispatcher)
584 .is_none()
585 );
586 operator_id_to_stream_node
587 .get_mut(merge_operator_id)
588 .unwrap()
589 .dependencies
590 .push(fragment_id as _);
591 } else {
592 root_node = Some(operator_id);
593 }
594 }
595
596 (root_node.unwrap(), operator_id_to_stream_node)
597 }
598
599 pub(super) fn extract_executor_infos(
600 adjacency_list: &HashMap<u64, StreamNode>,
601 ) -> (Vec<u64>, HashMap<u64, Vec<u64>>) {
602 let mut executor_ids: Vec<_> = Default::default();
603 let mut operator_to_executor: HashMap<_, _> = Default::default();
604 for node in adjacency_list.values() {
605 let operator_id = node.operator_id;
606 for actor_id in &node.actor_ids {
607 let executor_id =
608 unique_executor_id_from_unique_operator_id(*actor_id, operator_id);
609 executor_ids.push(executor_id);
610 operator_to_executor
611 .entry(operator_id)
612 .or_insert_with(Vec::new)
613 .push(executor_id);
614 }
615 }
616 (executor_ids, operator_to_executor)
617 }
618
619 pub(super) fn render_graph_with_metrics(
624 adjacency_list: &HashMap<u64, StreamNode>,
625 root_node: u64,
626 stats: &OperatorStats,
627 profiling_duration: &Duration,
628 ) -> Vec<ExplainAnalyzeStreamJobOutput> {
629 let profiling_duration_secs = profiling_duration.as_secs_f64();
630 let mut rows = vec![];
631 let mut stack = vec![(String::new(), true, root_node)];
632 while let Some((prefix, last_child, node_id)) = stack.pop() {
633 let Some(node) = adjacency_list.get(&node_id) else {
634 continue;
635 };
636 let is_root = node_id == root_node;
637
638 let identity_rendered = if is_root {
639 node.identity.clone()
640 } else {
641 let connector = if last_child { "└─ " } else { "├─ " };
642 format!("{}{}{}", prefix, connector, node.identity)
643 };
644
645 let child_prefix = if is_root {
646 ""
647 } else if last_child {
648 " "
649 } else {
650 "│ "
651 };
652 let child_prefix = format!("{}{}", prefix, child_prefix);
653
654 let stats = stats.get(&node_id);
655 let (output_throughput, output_latency) = stats
656 .map(|stats| (stats.total_output_throughput, stats.total_output_pending_ns))
657 .unwrap_or((0, 0));
658 let row = ExplainAnalyzeStreamJobOutput {
659 identity: identity_rendered,
660 actor_ids: node
661 .actor_ids
662 .iter()
663 .map(|id| id.to_string())
664 .collect::<Vec<_>>()
665 .join(","),
666 output_rows_per_second: (output_throughput as f64 / profiling_duration_secs)
667 .to_string(),
668 downstream_backpressure_ratio: (Duration::from_nanos(output_latency).as_secs_f64()
669 / usize::max(node.actor_ids.len(), 1) as f64
670 / profiling_duration_secs)
671 .to_string(),
672 };
673 rows.push(row);
674 for (position, dependency) in node.dependencies.iter().enumerate() {
675 stack.push((child_prefix.clone(), position == 0, *dependency));
676 }
677 }
678 rows
679 }
680}