1use std::cmp::{Ordering, Reverse};
16use std::collections::{BTreeMap, BinaryHeap, HashMap};
17use std::fmt::Write;
18use std::sync::Arc;
19
20use itertools::Itertools;
21use prometheus_http_query::response::Data::Vector;
22use risingwave_common::id::ObjectId;
23use risingwave_common::system_param::AdaptiveParallelismStrategy;
24use risingwave_common::system_param::adaptive_parallelism_strategy::parse_strategy;
25use risingwave_common::system_param::reader::SystemParamsRead;
26use risingwave_common::types::Timestamptz;
27use risingwave_common::util::StackTraceResponseExt;
28use risingwave_common::util::epoch::Epoch;
29use risingwave_hummock_sdk::level::Level;
30use risingwave_hummock_sdk::{CompactionGroupId, HummockSstableId};
31use risingwave_license::LicenseManager;
32use risingwave_meta_model::{JobStatus, StreamingParallelism};
33use risingwave_pb::catalog::table::PbTableType;
34use risingwave_pb::common::WorkerType;
35use risingwave_pb::meta::EventLog;
36use risingwave_pb::meta::event_log::Event;
37use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
38use risingwave_sqlparser::ast::RedactSqlOptionKeywordsRef;
39use risingwave_sqlparser::parser::Parser;
40use serde_json::json;
41use thiserror_ext::AsReport;
42
43use crate::MetaResult;
44use crate::controller::system_param::SystemParamsControllerRef;
45use crate::hummock::HummockManagerRef;
46use crate::manager::MetadataManager;
47use crate::manager::event_log::EventLogManagerRef;
48use crate::manager::iceberg_compaction::IcebergCompactionManagerRef;
49use crate::rpc::await_tree::dump_cluster_await_tree;
50
51pub type DiagnoseCommandRef = Arc<DiagnoseCommand>;
52
53pub struct DiagnoseCommand {
54 metadata_manager: MetadataManager,
55 await_tree_reg: await_tree::Registry,
56 hummock_manger: HummockManagerRef,
57 iceberg_compaction_manager: IcebergCompactionManagerRef,
58 event_log_manager: EventLogManagerRef,
59 prometheus_client: Option<prometheus_http_query::Client>,
60 prometheus_selector: String,
61 redact_sql_option_keywords: RedactSqlOptionKeywordsRef,
62 system_params_controller: SystemParamsControllerRef,
63}
64
65impl DiagnoseCommand {
66 pub fn new(
67 metadata_manager: MetadataManager,
68 await_tree_reg: await_tree::Registry,
69 hummock_manger: HummockManagerRef,
70 iceberg_compaction_manager: IcebergCompactionManagerRef,
71 event_log_manager: EventLogManagerRef,
72 prometheus_client: Option<prometheus_http_query::Client>,
73 prometheus_selector: String,
74 redact_sql_option_keywords: RedactSqlOptionKeywordsRef,
75 system_params_controller: SystemParamsControllerRef,
76 ) -> Self {
77 Self {
78 metadata_manager,
79 await_tree_reg,
80 hummock_manger,
81 iceberg_compaction_manager,
82 event_log_manager,
83 prometheus_client,
84 prometheus_selector,
85 redact_sql_option_keywords,
86 system_params_controller,
87 }
88 }
89
90 pub async fn report(&self, actor_traces_format: ActorTracesFormat) -> String {
91 let mut report = String::new();
92 let _ = writeln!(
93 report,
94 "report created at: {}\nversion: {}",
95 chrono::DateTime::<chrono::offset::Utc>::from(std::time::SystemTime::now()),
96 risingwave_common::current_cluster_version(),
97 );
98 let _ = writeln!(report);
99 self.write_license(&mut report);
100 let _ = writeln!(report);
101 self.write_catalog(&mut report).await;
102 let _ = writeln!(report);
103 self.write_worker_nodes(&mut report).await;
104 let _ = writeln!(report);
105 self.write_streaming_prometheus(&mut report).await;
106 let _ = writeln!(report);
107 self.write_storage(&mut report).await;
108 let _ = writeln!(report);
109 self.write_iceberg_compaction_schedule(&mut report).await;
110 let _ = writeln!(report);
111 self.write_event_logs(&mut report);
112 let _ = writeln!(report);
113 self.write_params(&mut report).await;
114 let _ = writeln!(report);
115 self.write_await_tree(&mut report, actor_traces_format)
116 .await;
117
118 report
119 }
120
121 async fn write_catalog(&self, s: &mut String) {
122 self.write_catalog_inner(s).await;
123 let _ = self.write_table_definition(s).await.inspect_err(|e| {
124 tracing::warn!(
125 error = e.to_report_string(),
126 "failed to display table definition"
127 )
128 });
129 }
130
131 async fn write_catalog_inner(&self, s: &mut String) {
132 let stats = self.metadata_manager.catalog_controller.stats().await;
133
134 let stat = match stats {
135 Ok(stat) => stat,
136 Err(err) => {
137 tracing::warn!(error=?err.as_report(), "failed to get catalog stats");
138 return;
139 }
140 };
141 let _ = writeln!(s, "number of database: {}", stat.database_num);
142 let _ = writeln!(s, "number of fragment: {}", stat.streaming_job_num);
143 let _ = writeln!(s, "number of actor: {}", stat.actor_num);
144 let _ = writeln!(s, "number of source: {}", stat.source_num);
145 let _ = writeln!(s, "number of table: {}", stat.table_num);
146 let _ = writeln!(s, "number of materialized view: {}", stat.mview_num);
147 let _ = writeln!(s, "number of sink: {}", stat.sink_num);
148 let _ = writeln!(s, "number of index: {}", stat.index_num);
149 let _ = writeln!(s, "number of function: {}", stat.function_num);
150
151 self.write_databases(s).await;
152 self.write_schemas(s).await;
153 }
154
155 async fn write_databases(&self, s: &mut String) {
156 let databases = match self
157 .metadata_manager
158 .catalog_controller
159 .list_databases()
160 .await
161 {
162 Ok(databases) => databases,
163 Err(err) => {
164 tracing::warn!(error=?err.as_report(), "failed to list databases");
165 return;
166 }
167 };
168
169 use comfy_table::{Row, Table};
170 let mut table = Table::new();
171 table.set_header({
172 let mut row = Row::new();
173 row.add_cell("id".into());
174 row.add_cell("name".into());
175 row.add_cell("resource_group".into());
176 row.add_cell("barrier_interval_ms".into());
177 row.add_cell("checkpoint_frequency".into());
178 row
179 });
180 for db in databases {
181 let mut row = Row::new();
182 row.add_cell(db.id.into());
183 row.add_cell(db.name.into());
184 row.add_cell(db.resource_group.into());
185 row.add_cell(
186 db.barrier_interval_ms
187 .map(|v| v.to_string())
188 .unwrap_or("default".into())
189 .into(),
190 );
191 row.add_cell(
192 db.checkpoint_frequency
193 .map(|v| v.to_string())
194 .unwrap_or("default".into())
195 .into(),
196 );
197 table.add_row(row);
198 }
199
200 let _ = writeln!(s, "DATABASE");
201 let _ = writeln!(s, "{table}");
202 }
203
204 async fn write_schemas(&self, s: &mut String) {
205 let schemas = match self
206 .metadata_manager
207 .catalog_controller
208 .list_schemas()
209 .await
210 {
211 Ok(schemas) => schemas,
212 Err(err) => {
213 tracing::warn!(error=?err.as_report(), "failed to list schemas");
214 return;
215 }
216 };
217
218 use comfy_table::{Row, Table};
219 let mut table = Table::new();
220 table.set_header({
221 let mut row = Row::new();
222 row.add_cell("id".into());
223 row.add_cell("database_id".into());
224 row.add_cell("name".into());
225 row
226 });
227 for schema in schemas {
228 let mut row = Row::new();
229 row.add_cell(schema.id.into());
230 row.add_cell(schema.database_id.into());
231 row.add_cell(schema.name.into());
232 table.add_row(row);
233 }
234
235 let _ = writeln!(s, "SCHEMA");
236 let _ = writeln!(s, "{table}");
237 }
238
239 async fn write_worker_nodes(&self, s: &mut String) {
240 let Ok(worker_actor_count) = self.metadata_manager.worker_actor_count() else {
241 tracing::warn!("failed to get worker actor count");
242 return;
243 };
244
245 use comfy_table::{Row, Table};
246 let Ok(worker_nodes) = self.metadata_manager.list_worker_node(None, None).await else {
247 tracing::warn!("failed to get worker nodes");
248 return;
249 };
250 let mut table = Table::new();
251 table.set_header({
252 let mut row = Row::new();
253 row.add_cell("id".into());
254 row.add_cell("host".into());
255 row.add_cell("hostname".into());
256 row.add_cell("type".into());
257 row.add_cell("state".into());
258 row.add_cell("parallelism".into());
259 row.add_cell("resource_group".into());
260 row.add_cell("is_streaming".into());
261 row.add_cell("is_serving".into());
262 row.add_cell("is_iceberg_compactor".into());
263 row.add_cell("rw_version".into());
264 row.add_cell("total_memory_bytes".into());
265 row.add_cell("total_cpu_cores".into());
266 row.add_cell("started_at".into());
267 row.add_cell("actor_count".into());
268 row
269 });
270 for worker_node in worker_nodes {
271 let mut row = Row::new();
272 row.add_cell(worker_node.id.into());
273 try_add_cell(
274 &mut row,
275 worker_node
276 .host
277 .as_ref()
278 .map(|h| format!("{}:{}", h.host, h.port)),
279 );
280 try_add_cell(
281 &mut row,
282 worker_node.resource.as_ref().map(|r| r.hostname.clone()),
283 );
284 try_add_cell(
285 &mut row,
286 worker_node.get_type().ok().map(|t| t.as_str_name()),
287 );
288 try_add_cell(
289 &mut row,
290 worker_node.get_state().ok().map(|s| s.as_str_name()),
291 );
292 try_add_cell(&mut row, worker_node.parallelism());
293 try_add_cell(
294 &mut row,
295 worker_node
296 .property
297 .as_ref()
298 .map(|p| p.resource_group.clone().unwrap_or("".to_owned())),
299 );
300 let (is_streaming, is_serving) = {
302 if let Ok(t) = worker_node.get_type()
303 && t == WorkerType::ComputeNode
304 {
305 (
306 worker_node.property.as_ref().map(|p| p.is_streaming),
307 worker_node.property.as_ref().map(|p| p.is_serving),
308 )
309 } else {
310 (None, None)
311 }
312 };
313 try_add_cell(&mut row, is_streaming);
314 try_add_cell(&mut row, is_serving);
315 let is_iceberg_compactor = {
317 if let Ok(t) = worker_node.get_type()
318 && t == WorkerType::Compactor
319 {
320 worker_node
321 .property
322 .as_ref()
323 .map(|p| p.is_iceberg_compactor)
324 } else {
325 None
326 }
327 };
328 try_add_cell(&mut row, is_iceberg_compactor);
329 try_add_cell(
330 &mut row,
331 worker_node.resource.as_ref().map(|r| r.rw_version.clone()),
332 );
333 try_add_cell(
334 &mut row,
335 worker_node.resource.as_ref().map(|r| r.total_memory_bytes),
336 );
337 try_add_cell(
338 &mut row,
339 worker_node.resource.as_ref().map(|r| r.total_cpu_cores),
340 );
341 try_add_cell(
342 &mut row,
343 worker_node
344 .started_at
345 .and_then(|ts| Timestamptz::from_secs(ts as _).map(|t| t.to_string())),
346 );
347 let actor_count = {
348 if let Ok(t) = worker_node.get_type()
349 && t != WorkerType::ComputeNode
350 {
351 None
352 } else {
353 match worker_actor_count.get(&worker_node.id) {
354 None => Some(0),
355 Some(c) => Some(*c),
356 }
357 }
358 };
359 try_add_cell(&mut row, actor_count);
360 table.add_row(row);
361 }
362 let _ = writeln!(s, "{table}");
363 }
364
365 fn write_event_logs(&self, s: &mut String) {
366 let event_logs = self
367 .event_log_manager
368 .list_event_logs()
369 .into_iter()
370 .sorted_by(|a, b| {
371 a.timestamp
372 .unwrap_or(0)
373 .cmp(&b.timestamp.unwrap_or(0))
374 .reverse()
375 })
376 .collect_vec();
377
378 let _ = writeln!(s, "latest barrier completions");
379 Self::write_event_logs_impl(
380 s,
381 event_logs.iter(),
382 |e| {
383 let Event::BarrierComplete(info) = e else {
384 return None;
385 };
386 Some(json!(info).to_string())
387 },
388 Some(10),
389 );
390
391 let _ = writeln!(s);
392 let _ = writeln!(s, "latest barrier collection failures");
393 Self::write_event_logs_impl(
394 s,
395 event_logs.iter(),
396 |e| {
397 let Event::CollectBarrierFail(info) = e else {
398 return None;
399 };
400 Some(json!(info).to_string())
401 },
402 Some(3),
403 );
404
405 let _ = writeln!(s);
406 let _ = writeln!(s, "latest barrier injection failures");
407 Self::write_event_logs_impl(
408 s,
409 event_logs.iter(),
410 |e| {
411 let Event::InjectBarrierFail(info) = e else {
412 return None;
413 };
414 Some(json!(info).to_string())
415 },
416 Some(3),
417 );
418
419 let _ = writeln!(s);
420 let _ = writeln!(s, "latest worker node panics");
421 Self::write_event_logs_impl(
422 s,
423 event_logs.iter(),
424 |e| {
425 let Event::WorkerNodePanic(info) = e else {
426 return None;
427 };
428 Some(json!(info).to_string())
429 },
430 Some(10),
431 );
432
433 let _ = writeln!(s);
434 let _ = writeln!(s, "latest create stream job failures");
435 Self::write_event_logs_impl(
436 s,
437 event_logs.iter(),
438 |e| {
439 let Event::CreateStreamJobFail(info) = e else {
440 return None;
441 };
442 Some(json!(info).to_string())
443 },
444 Some(3),
445 );
446
447 let _ = writeln!(s);
448 let _ = writeln!(s, "latest dirty stream job clear-ups");
449 Self::write_event_logs_impl(
450 s,
451 event_logs.iter(),
452 |e| {
453 let Event::DirtyStreamJobClear(info) = e else {
454 return None;
455 };
456 Some(json!(info).to_string())
457 },
458 Some(3),
459 );
460 }
461
462 fn write_event_logs_impl<'a, F>(
463 s: &mut String,
464 event_logs: impl Iterator<Item = &'a EventLog>,
465 get_event_info: F,
466 limit: Option<usize>,
467 ) where
468 F: Fn(&Event) -> Option<String>,
469 {
470 use comfy_table::{Row, Table};
471 let mut table = Table::new();
472 table.set_header({
473 let mut row = Row::new();
474 row.add_cell("created_at".into());
475 row.add_cell("info".into());
476 row
477 });
478 let mut row_count = 0;
479 for event_log in event_logs {
480 let Some(ref inner) = event_log.event else {
481 continue;
482 };
483 if let Some(limit) = limit
484 && row_count >= limit
485 {
486 break;
487 }
488 let mut row = Row::new();
489 let ts = event_log
490 .timestamp
491 .and_then(|ts| Timestamptz::from_millis(ts as _).map(|ts| ts.to_string()));
492 try_add_cell(&mut row, ts);
493 if let Some(info) = get_event_info(inner) {
494 row.add_cell(info.into());
495 row_count += 1;
496 } else {
497 continue;
498 }
499 table.add_row(row);
500 }
501 let _ = writeln!(s, "{table}");
502 }
503
504 async fn write_storage(&self, s: &mut String) {
505 let mut sst_num = 0;
506 let mut sst_total_file_size = 0;
507 let back_pressured_compaction_groups = self
508 .hummock_manger
509 .write_limits()
510 .await
511 .into_iter()
512 .filter_map(|(k, v)| {
513 if v.table_ids.is_empty() {
514 None
515 } else {
516 Some(k)
517 }
518 })
519 .join(",");
520 if !back_pressured_compaction_groups.is_empty() {
521 let _ = writeln!(
522 s,
523 "back pressured compaction groups: {back_pressured_compaction_groups}"
524 );
525 }
526
527 #[derive(PartialEq, Eq)]
528 struct SstableSort {
529 compaction_group_id: CompactionGroupId,
530 sst_id: HummockSstableId,
531 delete_ratio: u64,
532 }
533 impl PartialOrd for SstableSort {
534 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
535 Some(self.cmp(other))
536 }
537 }
538 impl Ord for SstableSort {
539 fn cmp(&self, other: &Self) -> Ordering {
540 self.delete_ratio.cmp(&other.delete_ratio)
541 }
542 }
543 fn top_k_sstables(
544 top_k: usize,
545 heap: &mut BinaryHeap<Reverse<SstableSort>>,
546 e: SstableSort,
547 ) {
548 if heap.len() < top_k {
549 heap.push(Reverse(e));
550 } else if let Some(mut p) = heap.peek_mut()
551 && e.delete_ratio > p.0.delete_ratio
552 {
553 *p = Reverse(e);
554 }
555 }
556
557 let top_k = 10;
558 let mut top_tombstone_delete_sst = BinaryHeap::with_capacity(top_k);
559 let compaction_group_num = self
560 .hummock_manger
561 .on_current_version(|version| {
562 for compaction_group in version.levels.values() {
563 let mut visit_level = |level: &Level| {
564 sst_num += level.table_infos.len();
565 sst_total_file_size +=
566 level.table_infos.iter().map(|t| t.sst_size).sum::<u64>();
567 for sst in &level.table_infos {
568 if sst.total_key_count == 0 {
569 continue;
570 }
571 let tombstone_delete_ratio =
572 sst.stale_key_count * 10000 / sst.total_key_count;
573 let e = SstableSort {
574 compaction_group_id: compaction_group.group_id,
575 sst_id: sst.sst_id,
576 delete_ratio: tombstone_delete_ratio,
577 };
578 top_k_sstables(top_k, &mut top_tombstone_delete_sst, e);
579 }
580 };
581 let l0 = &compaction_group.l0;
582 for level in &l0.sub_levels {
584 visit_level(level);
585 }
586 for level in &compaction_group.levels {
587 visit_level(level);
588 }
589 }
590 version.levels.len()
591 })
592 .await;
593
594 let _ = writeln!(s, "number of SSTables: {sst_num}");
595 let _ = writeln!(s, "total size of SSTables (byte): {sst_total_file_size}");
596 let _ = writeln!(s, "number of compaction groups: {compaction_group_num}");
597 use comfy_table::{Row, Table};
598 fn format_table(heap: BinaryHeap<Reverse<SstableSort>>) -> Table {
599 let mut table = Table::new();
600 table.set_header({
601 let mut row = Row::new();
602 row.add_cell("compaction group id".into());
603 row.add_cell("sstable id".into());
604 row.add_cell("delete ratio".into());
605 row
606 });
607 for sst in &heap.into_sorted_vec() {
608 let mut row = Row::new();
609 row.add_cell(sst.0.compaction_group_id.into());
610 row.add_cell(sst.0.sst_id.into());
611 row.add_cell(format!("{:.2}%", sst.0.delete_ratio as f64 / 100.0).into());
612 table.add_row(row);
613 }
614 table
615 }
616 let _ = writeln!(s);
617 let _ = writeln!(s, "top tombstone delete ratio");
618 let _ = writeln!(s, "{}", format_table(top_tombstone_delete_sst));
619 let _ = writeln!(s);
620
621 let _ = writeln!(s);
622 self.write_storage_prometheus(s).await;
623 }
624
625 async fn write_iceberg_compaction_schedule(&self, s: &mut String) {
626 use comfy_table::{Row, Table};
627
628 let sink_names = match self.metadata_manager.catalog_controller.list_sinks().await {
629 Ok(sinks) => sinks
630 .into_iter()
631 .map(|sink| (sink.id.as_raw_id(), sink.name))
632 .collect::<BTreeMap<u32, String>>(),
633 Err(err) => {
634 tracing::warn!(error=?err.as_report(), "failed to list sinks");
635 return;
636 }
637 };
638
639 let statuses = self.iceberg_compaction_manager.list_compaction_statuses();
640
641 let _ = writeln!(s, "ICEBERG COMPACTION SCHEDULE");
642
643 let mut table = Table::new();
644 table.set_header({
645 let mut row = Row::new();
646 row.add_cell("sink_id".into());
647 row.add_cell("sink_name".into());
648 row.add_cell("task_type".into());
649 row.add_cell("schedule_state".into());
650 row.add_cell("trigger_interval_sec".into());
651 row.add_cell("trigger_snapshot_count".into());
652 row.add_cell("pending_snapshot_count".into());
653 row.add_cell("next_compaction_after_sec".into());
654 row.add_cell("is_triggerable".into());
655 row
656 });
657
658 for status in statuses {
659 let mut row = Row::new();
660 row.add_cell(status.sink_id.as_raw_id().into());
661 try_add_cell(
662 &mut row,
663 sink_names.get(&status.sink_id.as_raw_id()).cloned(),
664 );
665 row.add_cell(status.task_type.into());
666 row.add_cell(status.schedule_state.into());
667 row.add_cell(status.trigger_interval_sec.into());
668 row.add_cell((status.trigger_snapshot_count as u64).into());
669 try_add_cell(
670 &mut row,
671 status.pending_snapshot_count.map(|count| count as u64),
672 );
673 try_add_cell(&mut row, status.next_compaction_after_sec);
674 row.add_cell(status.is_triggerable.into());
675 table.add_row(row);
676 }
677
678 let _ = writeln!(s, "{table}");
679 }
680
681 async fn write_streaming_prometheus(&self, s: &mut String) {
682 let _ = writeln!(s, "top sources by throughput (rows/s)");
683 let query = format!(
684 "topk(3, sum(rate(stream_source_output_rows_counts{{{}}}[10m]))by (source_name))",
685 self.prometheus_selector
686 );
687 self.write_instant_vector_impl(s, &query, vec!["source_name"])
688 .await;
689
690 let _ = writeln!(s);
691 let _ = writeln!(s, "top materialized views by throughput (rows/s)");
692 let query = format!(
693 "topk(3, sum(rate(stream_mview_input_row_count{{{}}}[10m]))by (table_id))",
694 self.prometheus_selector
695 );
696 self.write_instant_vector_impl(s, &query, vec!["table_id"])
697 .await;
698
699 let _ = writeln!(s);
700 let _ = writeln!(s, "top join executor by matched rows");
701 let query = format!(
702 "topk(3, histogram_quantile(0.9, sum(rate(stream_join_matched_join_keys_bucket{{{}}}[10m])) by (le, fragment_id, table_id)))",
703 self.prometheus_selector
704 );
705 self.write_instant_vector_impl(s, &query, vec!["table_id", "fragment_id"])
706 .await;
707 }
708
709 async fn write_storage_prometheus(&self, s: &mut String) {
710 let _ = writeln!(s, "top Hummock Get by duration (second)");
711 let query = format!(
712 "topk(3, histogram_quantile(0.9, sum(rate(state_store_get_duration_bucket{{{}}}[10m])) by (le, table_id)))",
713 self.prometheus_selector
714 );
715 self.write_instant_vector_impl(s, &query, vec!["table_id"])
716 .await;
717
718 let _ = writeln!(s);
719 let _ = writeln!(s, "top Hummock Iter Init by duration (second)");
720 let query = format!(
721 "topk(3, histogram_quantile(0.9, sum(rate(state_store_iter_init_duration_bucket{{{}}}[10m])) by (le, table_id)))",
722 self.prometheus_selector
723 );
724 self.write_instant_vector_impl(s, &query, vec!["table_id"])
725 .await;
726
727 let _ = writeln!(s);
728 let _ = writeln!(s, "top table commit flush by size (byte)");
729 let query = format!(
730 "topk(3, sum(rate(storage_commit_write_throughput{{{}}}[10m])) by (table_id))",
731 self.prometheus_selector
732 );
733 self.write_instant_vector_impl(s, &query, vec!["table_id"])
734 .await;
735
736 let _ = writeln!(s);
737 let _ = writeln!(s, "object store read throughput (bytes/s)");
738 let query = format!(
739 "sum(rate(object_store_read_bytes{{{}}}[10m])) by (job, instance)",
740 merge_prometheus_selector([&self.prometheus_selector, "job=~\"compute|compactor\""])
741 );
742 self.write_instant_vector_impl(s, &query, vec!["job", "instance"])
743 .await;
744
745 let _ = writeln!(s);
746 let _ = writeln!(s, "object store write throughput (bytes/s)");
747 let query = format!(
748 "sum(rate(object_store_write_bytes{{{}}}[10m])) by (job, instance)",
749 merge_prometheus_selector([&self.prometheus_selector, "job=~\"compute|compactor\""])
750 );
751 self.write_instant_vector_impl(s, &query, vec!["job", "instance"])
752 .await;
753
754 let _ = writeln!(s);
755 let _ = writeln!(s, "object store operation rate");
756 let query = format!(
757 "sum(rate(object_store_operation_latency_count{{{}}}[10m])) by (le, type, job, instance)",
758 merge_prometheus_selector([
759 &self.prometheus_selector,
760 "job=~\"compute|compactor\", type!~\"streaming_upload_write_bytes|streaming_read_read_bytes|streaming_read\""
761 ])
762 );
763 self.write_instant_vector_impl(s, &query, vec!["type", "job", "instance"])
764 .await;
765
766 let _ = writeln!(s);
767 let _ = writeln!(s, "object store operation duration (second)");
768 let query = format!(
769 "histogram_quantile(0.9, sum(rate(object_store_operation_latency_bucket{{{}}}[10m])) by (le, type, job, instance))",
770 merge_prometheus_selector([
771 &self.prometheus_selector,
772 "job=~\"compute|compactor\", type!~\"streaming_upload_write_bytes|streaming_read\""
773 ])
774 );
775 self.write_instant_vector_impl(s, &query, vec!["type", "job", "instance"])
776 .await;
777 }
778
779 async fn write_instant_vector_impl(&self, s: &mut String, query: &str, labels: Vec<&str>) {
780 let Some(ref client) = self.prometheus_client else {
781 return;
782 };
783 if let Ok(Vector(instant_vec)) = client
784 .query(query)
785 .get()
786 .await
787 .map(|result| result.into_inner().0)
788 {
789 for i in instant_vec {
790 let l = labels
791 .iter()
792 .map(|label| {
793 format!(
794 "{}={}",
795 *label,
796 i.metric()
797 .get(*label)
798 .map(|s| s.as_str())
799 .unwrap_or_default()
800 )
801 })
802 .join(",");
803 let _ = writeln!(s, "{}: {:.3}", l, i.sample().value());
804 }
805 }
806 }
807
808 async fn write_await_tree(&self, s: &mut String, actor_traces_format: ActorTracesFormat) {
809 let all = dump_cluster_await_tree(
810 &self.metadata_manager,
811 &self.await_tree_reg,
812 actor_traces_format,
813 )
814 .await;
815
816 if let Ok(all) = all {
817 write!(s, "{}", all.output()).unwrap();
818 } else {
819 tracing::warn!("failed to dump await tree");
820 }
821 }
822
823 async fn write_table_definition(&self, s: &mut String) -> MetaResult<()> {
824 let sources = self
825 .metadata_manager
826 .catalog_controller
827 .list_sources()
828 .await?
829 .into_iter()
830 .map(|s| {
831 (
832 s.id.into(),
833 (
834 s.name,
835 s.database_id,
836 s.schema_id,
837 s.definition,
838 s.created_at_epoch,
839 ),
840 )
841 })
842 .collect::<BTreeMap<_, _>>();
843 let mut user_tables = BTreeMap::new();
844 let mut mvs = BTreeMap::new();
845 let mut indexes = BTreeMap::new();
846 let mut internal_tables = BTreeMap::new();
847 {
848 let grouped = self
849 .metadata_manager
850 .catalog_controller
851 .list_all_state_tables()
852 .await?
853 .into_iter()
854 .chunk_by(|t| t.table_type());
855 for (table_type, tables) in &grouped {
856 let tables = tables.into_iter().map(|t| {
857 (
858 t.id.into(),
859 (
860 t.name,
861 t.database_id,
862 t.schema_id,
863 t.definition,
864 t.created_at_epoch,
865 ),
866 )
867 });
868 match table_type {
869 PbTableType::Table => user_tables.extend(tables),
870 PbTableType::MaterializedView => mvs.extend(tables),
871 PbTableType::Index | PbTableType::VectorIndex => indexes.extend(tables),
872 PbTableType::Internal => internal_tables.extend(tables),
873 PbTableType::Unspecified => {
874 tracing::error!("unspecified table type: {:?}", tables.collect_vec());
875 }
876 }
877 }
878 }
879 let sinks = self
880 .metadata_manager
881 .catalog_controller
882 .list_sinks()
883 .await?
884 .into_iter()
885 .map(|s| {
886 (
887 s.id.into(),
888 (
889 s.name,
890 s.database_id,
891 s.schema_id,
892 s.definition,
893 s.created_at_epoch,
894 ),
895 )
896 })
897 .collect::<BTreeMap<_, _>>();
898 let views = self
899 .metadata_manager
900 .catalog_controller
901 .list_views()
902 .await?
903 .into_iter()
904 .map(|v| {
905 (
906 v.id.into(),
907 (v.name, v.database_id, v.schema_id, v.sql, None),
908 )
909 })
910 .collect::<BTreeMap<_, _>>();
911 let mut streaming_jobs = self
912 .metadata_manager
913 .catalog_controller
914 .list_streaming_job_infos()
915 .await?;
916 streaming_jobs.sort_by_key(|info| (info.obj_type as usize, info.job_id));
917 {
918 use comfy_table::{Row, Table};
919 let mut table = Table::new();
920 table.set_header({
921 let mut row = Row::new();
922 row.add_cell("job_id".into());
923 row.add_cell("name".into());
924 row.add_cell("obj_type".into());
925 row.add_cell("state".into());
926 row.add_cell("parallelism".into());
927 row.add_cell("max_parallelism".into());
928 row.add_cell("resource_group".into());
929 row.add_cell("database_id".into());
930 row.add_cell("schema_id".into());
931 row.add_cell("config_override".into());
932 row
933 });
934 for job in streaming_jobs {
935 let mut row = Row::new();
936 row.add_cell(job.job_id.into());
937 row.add_cell(job.name.into());
938 row.add_cell(job.obj_type.as_str().into());
939 row.add_cell(format_job_status(job.job_status).into());
940 row.add_cell(
941 format_streaming_parallelism(
942 &job.parallelism,
943 job.adaptive_parallelism_strategy.as_deref(),
944 )
945 .into(),
946 );
947 row.add_cell(job.max_parallelism.into());
948 row.add_cell(job.resource_group.into());
949 row.add_cell(job.database_id.into());
950 row.add_cell(job.schema_id.into());
951 row.add_cell(job.config_override.into());
952 table.add_row(row);
953 }
954 let _ = writeln!(s);
955 let _ = writeln!(s, "STREAMING JOB");
956 let _ = writeln!(s, "{table}");
957 }
958 let catalogs = [
959 ("SOURCE", sources),
960 ("TABLE", user_tables),
961 ("MATERIALIZED VIEW", mvs),
962 ("INDEX", indexes),
963 ("SINK", sinks),
964 ("VIEW", views),
965 ("INTERNAL TABLE", internal_tables),
966 ];
967 let mut obj_id_to_name: HashMap<ObjectId, _> = HashMap::new();
968 for (title, items) in catalogs {
969 use comfy_table::{Row, Table};
970 let mut table = Table::new();
971 table.set_header({
972 let mut row = Row::new();
973 row.add_cell("id".into());
974 row.add_cell("name".into());
975 row.add_cell("database_id".into());
976 row.add_cell("schema_id".into());
977 row.add_cell("created_at".into());
978 row.add_cell("definition".into());
979 row
980 });
981 for (id, (name, database_id, schema_id, definition, created_at_epoch)) in items {
982 obj_id_to_name.insert(id, name.clone());
983 let mut row = Row::new();
984 let may_redact = redact_sql(&definition, self.redact_sql_option_keywords.clone())
985 .unwrap_or_else(|| "[REDACTED]".into());
986 let created_at = if let Some(created_at_epoch) = created_at_epoch {
987 format!("{}", Epoch::from(created_at_epoch).as_timestamptz())
988 } else {
989 "".into()
990 };
991 row.add_cell(id.into());
992 row.add_cell(name.into());
993 row.add_cell(database_id.into());
994 row.add_cell(schema_id.into());
995 row.add_cell(created_at.into());
996 row.add_cell(may_redact.into());
997 table.add_row(row);
998 }
999 let _ = writeln!(s);
1000 let _ = writeln!(s, "{title}");
1001 let _ = writeln!(s, "{table}");
1002 }
1003
1004 let mut fragments = BTreeMap::new();
1005 for (actor_id, fragment_id, job_id, schema_id, obj_type) in self
1006 .metadata_manager
1007 .catalog_controller
1008 .list_actor_info()
1009 .await?
1010 {
1011 let entry = fragments
1012 .entry(fragment_id)
1013 .or_insert_with(|| (job_id, schema_id, obj_type, Vec::new()));
1014 entry.3.push(actor_id);
1015 }
1016
1017 use comfy_table::{Row, Table};
1018 let mut table = Table::new();
1019 table.set_header({
1020 let mut row = Row::new();
1021 row.add_cell("fragment_id".into());
1022 row.add_cell("job_id".into());
1023 row.add_cell("schema_id".into());
1024 row.add_cell("type".into());
1025 row.add_cell("name".into());
1026 row.add_cell("actor_count".into());
1027 row.add_cell("actors".into());
1028 row
1029 });
1030 for (fragment_id, (job_id, schema_id, obj_type, mut actor_ids)) in fragments {
1031 actor_ids.sort_unstable();
1032 let mut row = Row::new();
1033 row.add_cell(fragment_id.into());
1034 row.add_cell(job_id.into());
1035 row.add_cell(schema_id.into());
1036 row.add_cell(obj_type.as_str().into());
1037 row.add_cell(
1038 obj_id_to_name
1039 .get(&job_id)
1040 .cloned()
1041 .unwrap_or_default()
1042 .into(),
1043 );
1044 row.add_cell(actor_ids.len().into());
1045 row.add_cell(actor_ids.iter().join(", ").into());
1046 table.add_row(row);
1047 }
1048 let _ = writeln!(s);
1049 let _ = writeln!(s, "FRAGMENT");
1050 let _ = writeln!(s, "{table}");
1051 Ok(())
1052 }
1053
1054 fn write_license(&self, s: &mut String) {
1055 use comfy_table::presets::ASCII_BORDERS_ONLY;
1056 use comfy_table::{ContentArrangement, Row, Table};
1057
1058 let mut table = Table::new();
1059 table.load_preset(ASCII_BORDERS_ONLY);
1060 table.set_content_arrangement(ContentArrangement::Dynamic);
1061 table.set_header({
1062 let mut row = Row::new();
1063 row.add_cell("field".into());
1064 row.add_cell("value".into());
1065 row
1066 });
1067
1068 match LicenseManager::get().license() {
1069 Ok(license) => {
1070 let fmt_option = |value: Option<u64>| match value {
1071 Some(v) => v.to_string(),
1072 None => "unlimited".to_owned(),
1073 };
1074
1075 let expires_at = if license.exp == u64::MAX {
1076 "never".to_owned()
1077 } else {
1078 let exp_i64 = license.exp as i64;
1079 chrono::DateTime::<chrono::Utc>::from_timestamp(exp_i64, 0)
1080 .map(|ts| ts.to_rfc3339())
1081 .unwrap_or_else(|| format!("invalid ({})", license.exp))
1082 };
1083
1084 let mut row = Row::new();
1085 row.add_cell("status".into());
1086 row.add_cell("valid".into());
1087 table.add_row(row);
1088
1089 let mut row = Row::new();
1090 row.add_cell("tier".into());
1091 row.add_cell(license.tier.name().into());
1092 table.add_row(row);
1093
1094 let mut row = Row::new();
1095 row.add_cell("expires_at".into());
1096 row.add_cell(expires_at.into());
1097 table.add_row(row);
1098
1099 let mut row = Row::new();
1100 row.add_cell("rwu_limit".into());
1101 row.add_cell(fmt_option(license.rwu_limit.map(|v| v.get())).into());
1102 table.add_row(row);
1103
1104 let mut row = Row::new();
1105 row.add_cell("cpu_core_limit".into());
1106 row.add_cell(fmt_option(license.cpu_core_limit()).into());
1107 table.add_row(row);
1108
1109 let mut row = Row::new();
1110 row.add_cell("memory_limit_bytes".into());
1111 row.add_cell(fmt_option(license.memory_limit()).into());
1112 table.add_row(row);
1113
1114 let mut features: Vec<_> = license
1115 .tier
1116 .available_features()
1117 .map(|f| f.name())
1118 .collect();
1119 features.sort_unstable();
1120 let feature_summary = format_features(&features);
1121
1122 let mut row = Row::new();
1123 row.add_cell("available_features".into());
1124 row.add_cell(feature_summary.into());
1125 table.add_row(row);
1126 }
1127 Err(error) => {
1128 let mut row = Row::new();
1129 row.add_cell("status".into());
1130 row.add_cell("invalid".into());
1131 table.add_row(row);
1132
1133 let mut row = Row::new();
1134 row.add_cell("error".into());
1135 row.add_cell(error.to_report_string().into());
1136 table.add_row(row);
1137 }
1138 }
1139
1140 let _ = writeln!(s, "LICENSE");
1141 let _ = writeln!(s, "{table}");
1142 }
1143
1144 async fn write_params(&self, s: &mut String) {
1145 let params = self.system_params_controller.get_params().await;
1146
1147 use comfy_table::{Row, Table};
1148 let mut table = Table::new();
1149 table.set_header({
1150 let mut row = Row::new();
1151 row.add_cell("key".into());
1152 row.add_cell("value".into());
1153 row
1154 });
1155
1156 let mut row = Row::new();
1157 row.add_cell("barrier_interval_ms".into());
1158 row.add_cell(params.barrier_interval_ms().to_string().into());
1159 table.add_row(row);
1160
1161 let mut row = Row::new();
1162 row.add_cell("checkpoint_frequency".into());
1163 row.add_cell(params.checkpoint_frequency().to_string().into());
1164 table.add_row(row);
1165
1166 let mut row = Row::new();
1167 row.add_cell("state_store".into());
1168 row.add_cell(params.state_store().to_owned().into());
1169 table.add_row(row);
1170
1171 let mut row = Row::new();
1172 row.add_cell("data_directory".into());
1173 row.add_cell(params.data_directory().to_owned().into());
1174 table.add_row(row);
1175
1176 let mut row = Row::new();
1177 row.add_cell("max_concurrent_creating_streaming_jobs".into());
1178 row.add_cell(
1179 params
1180 .max_concurrent_creating_streaming_jobs()
1181 .to_string()
1182 .into(),
1183 );
1184 table.add_row(row);
1185
1186 let mut row = Row::new();
1187 row.add_cell("time_travel_retention_ms".into());
1188 row.add_cell(params.time_travel_retention_ms().to_string().into());
1189 table.add_row(row);
1190
1191 let mut row = Row::new();
1192 row.add_cell("per_database_isolation".into());
1193 row.add_cell(params.per_database_isolation().to_string().into());
1194 table.add_row(row);
1195
1196 let _ = writeln!(s, "SYSTEM PARAMS");
1197 let _ = writeln!(s, "{table}");
1198 }
1199}
1200
1201fn try_add_cell<T: Into<comfy_table::Cell>>(row: &mut comfy_table::Row, t: Option<T>) {
1202 match t {
1203 Some(t) => {
1204 row.add_cell(t.into());
1205 }
1206 None => {
1207 row.add_cell("".into());
1208 }
1209 }
1210}
1211
1212fn merge_prometheus_selector<'a>(selectors: impl IntoIterator<Item = &'a str>) -> String {
1213 selectors.into_iter().filter(|s| !s.is_empty()).join(",")
1214}
1215
1216fn redact_sql(sql: &str, keywords: RedactSqlOptionKeywordsRef) -> Option<String> {
1217 match Parser::parse_sql(sql) {
1218 Ok(sqls) => Some(
1219 sqls.into_iter()
1220 .map(|sql| sql.to_redacted_string(keywords.clone()))
1221 .join(";"),
1222 ),
1223 Err(_) => None,
1224 }
1225}
1226
1227fn format_features(features: &[&'static str]) -> String {
1228 if features.is_empty() {
1229 return "(none)".into();
1230 }
1231
1232 const PER_LINE: usize = 6;
1233 features
1234 .chunks(PER_LINE)
1235 .map(|chunk| format!(" {}", chunk.join(", ")))
1236 .collect::<Vec<_>>()
1237 .join("\n")
1238}
1239
1240fn format_job_status(status: JobStatus) -> &'static str {
1241 match status {
1242 JobStatus::Initial => "initial",
1243 JobStatus::Creating => "creating",
1244 JobStatus::Created => "created",
1245 }
1246}
1247
1248fn format_streaming_parallelism(
1249 parallelism: &StreamingParallelism,
1250 adaptive_parallelism_strategy: Option<&str>,
1251) -> String {
1252 match parallelism {
1253 StreamingParallelism::Adaptive => adaptive_parallelism_strategy
1254 .and_then(format_adaptive_parallelism_strategy)
1255 .unwrap_or_else(|| "adaptive".into()),
1256 StreamingParallelism::Fixed(n) => n.to_string(),
1257 StreamingParallelism::Custom => adaptive_parallelism_strategy
1258 .and_then(format_adaptive_parallelism_strategy)
1259 .unwrap_or_else(|| "custom".into()),
1260 }
1261}
1262
1263fn format_adaptive_parallelism_strategy(strategy: &str) -> Option<String> {
1264 parse_strategy(strategy)
1265 .ok()
1266 .map(|strategy| match strategy {
1267 AdaptiveParallelismStrategy::Auto | AdaptiveParallelismStrategy::Full => {
1268 "adaptive".to_owned()
1269 }
1270 AdaptiveParallelismStrategy::Bounded(n) => format!("bounded({n})"),
1271 AdaptiveParallelismStrategy::Ratio(r) => format!("ratio({r})"),
1272 })
1273}