1use std::cmp::{Ordering, Reverse};
16use std::collections::{BTreeMap, BinaryHeap, HashMap};
17use std::fmt::Write;
18use std::sync::Arc;
19
20use itertools::Itertools;
21use prometheus_http_query::response::Data::Vector;
22use risingwave_common::id::ObjectId;
23use risingwave_common::system_param::reader::SystemParamsRead;
24use risingwave_common::types::Timestamptz;
25use risingwave_common::util::StackTraceResponseExt;
26use risingwave_common::util::epoch::Epoch;
27use risingwave_hummock_sdk::level::Level;
28use risingwave_hummock_sdk::{CompactionGroupId, HummockSstableId};
29use risingwave_license::LicenseManager;
30use risingwave_meta_model::{JobStatus, StreamingParallelism};
31use risingwave_pb::catalog::table::PbTableType;
32use risingwave_pb::common::WorkerType;
33use risingwave_pb::meta::EventLog;
34use risingwave_pb::meta::event_log::Event;
35use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
36use risingwave_sqlparser::ast::RedactSqlOptionKeywordsRef;
37use risingwave_sqlparser::parser::Parser;
38use serde_json::json;
39use thiserror_ext::AsReport;
40
41use crate::MetaResult;
42use crate::controller::system_param::SystemParamsControllerRef;
43use crate::hummock::HummockManagerRef;
44use crate::manager::MetadataManager;
45use crate::manager::event_log::EventLogManagerRef;
46use crate::manager::iceberg_compaction::IcebergCompactionManagerRef;
47use crate::rpc::await_tree::dump_cluster_await_tree;
48
49pub type DiagnoseCommandRef = Arc<DiagnoseCommand>;
50
51pub struct DiagnoseCommand {
52 metadata_manager: MetadataManager,
53 await_tree_reg: await_tree::Registry,
54 hummock_manger: HummockManagerRef,
55 iceberg_compaction_manager: IcebergCompactionManagerRef,
56 event_log_manager: EventLogManagerRef,
57 prometheus_client: Option<prometheus_http_query::Client>,
58 prometheus_selector: String,
59 redact_sql_option_keywords: RedactSqlOptionKeywordsRef,
60 system_params_controller: SystemParamsControllerRef,
61}
62
63impl DiagnoseCommand {
64 pub fn new(
65 metadata_manager: MetadataManager,
66 await_tree_reg: await_tree::Registry,
67 hummock_manger: HummockManagerRef,
68 iceberg_compaction_manager: IcebergCompactionManagerRef,
69 event_log_manager: EventLogManagerRef,
70 prometheus_client: Option<prometheus_http_query::Client>,
71 prometheus_selector: String,
72 redact_sql_option_keywords: RedactSqlOptionKeywordsRef,
73 system_params_controller: SystemParamsControllerRef,
74 ) -> Self {
75 Self {
76 metadata_manager,
77 await_tree_reg,
78 hummock_manger,
79 iceberg_compaction_manager,
80 event_log_manager,
81 prometheus_client,
82 prometheus_selector,
83 redact_sql_option_keywords,
84 system_params_controller,
85 }
86 }
87
88 pub async fn report(&self, actor_traces_format: ActorTracesFormat) -> String {
89 let mut report = String::new();
90 let _ = writeln!(
91 report,
92 "report created at: {}\nversion: {}",
93 chrono::DateTime::<chrono::offset::Utc>::from(std::time::SystemTime::now()),
94 risingwave_common::current_cluster_version(),
95 );
96 let _ = writeln!(report);
97 self.write_license(&mut report);
98 let _ = writeln!(report);
99 self.write_catalog(&mut report).await;
100 let _ = writeln!(report);
101 self.write_worker_nodes(&mut report).await;
102 let _ = writeln!(report);
103 self.write_streaming_prometheus(&mut report).await;
104 let _ = writeln!(report);
105 self.write_storage(&mut report).await;
106 let _ = writeln!(report);
107 self.write_iceberg_compaction_schedule(&mut report).await;
108 let _ = writeln!(report);
109 self.write_event_logs(&mut report);
110 let _ = writeln!(report);
111 self.write_params(&mut report).await;
112 let _ = writeln!(report);
113 self.write_await_tree(&mut report, actor_traces_format)
114 .await;
115
116 report
117 }
118
119 async fn write_catalog(&self, s: &mut String) {
120 self.write_catalog_inner(s).await;
121 let _ = self.write_table_definition(s).await.inspect_err(|e| {
122 tracing::warn!(
123 error = e.to_report_string(),
124 "failed to display table definition"
125 )
126 });
127 }
128
129 async fn write_catalog_inner(&self, s: &mut String) {
130 let stats = self.metadata_manager.catalog_controller.stats().await;
131
132 let stat = match stats {
133 Ok(stat) => stat,
134 Err(err) => {
135 tracing::warn!(error=?err.as_report(), "failed to get catalog stats");
136 return;
137 }
138 };
139 let _ = writeln!(s, "number of database: {}", stat.database_num);
140 let _ = writeln!(s, "number of fragment: {}", stat.streaming_job_num);
141 let _ = writeln!(s, "number of actor: {}", stat.actor_num);
142 let _ = writeln!(s, "number of source: {}", stat.source_num);
143 let _ = writeln!(s, "number of table: {}", stat.table_num);
144 let _ = writeln!(s, "number of materialized view: {}", stat.mview_num);
145 let _ = writeln!(s, "number of sink: {}", stat.sink_num);
146 let _ = writeln!(s, "number of index: {}", stat.index_num);
147 let _ = writeln!(s, "number of function: {}", stat.function_num);
148
149 self.write_databases(s).await;
150 self.write_schemas(s).await;
151 }
152
153 async fn write_databases(&self, s: &mut String) {
154 let databases = match self
155 .metadata_manager
156 .catalog_controller
157 .list_databases()
158 .await
159 {
160 Ok(databases) => databases,
161 Err(err) => {
162 tracing::warn!(error=?err.as_report(), "failed to list databases");
163 return;
164 }
165 };
166
167 use comfy_table::{Row, Table};
168 let mut table = Table::new();
169 table.set_header({
170 let mut row = Row::new();
171 row.add_cell("id".into());
172 row.add_cell("name".into());
173 row.add_cell("resource_group".into());
174 row.add_cell("barrier_interval_ms".into());
175 row.add_cell("checkpoint_frequency".into());
176 row
177 });
178 for db in databases {
179 let mut row = Row::new();
180 row.add_cell(db.id.into());
181 row.add_cell(db.name.into());
182 row.add_cell(db.resource_group.into());
183 row.add_cell(
184 db.barrier_interval_ms
185 .map(|v| v.to_string())
186 .unwrap_or("default".into())
187 .into(),
188 );
189 row.add_cell(
190 db.checkpoint_frequency
191 .map(|v| v.to_string())
192 .unwrap_or("default".into())
193 .into(),
194 );
195 table.add_row(row);
196 }
197
198 let _ = writeln!(s, "DATABASE");
199 let _ = writeln!(s, "{table}");
200 }
201
202 async fn write_schemas(&self, s: &mut String) {
203 let schemas = match self
204 .metadata_manager
205 .catalog_controller
206 .list_schemas()
207 .await
208 {
209 Ok(schemas) => schemas,
210 Err(err) => {
211 tracing::warn!(error=?err.as_report(), "failed to list schemas");
212 return;
213 }
214 };
215
216 use comfy_table::{Row, Table};
217 let mut table = Table::new();
218 table.set_header({
219 let mut row = Row::new();
220 row.add_cell("id".into());
221 row.add_cell("database_id".into());
222 row.add_cell("name".into());
223 row
224 });
225 for schema in schemas {
226 let mut row = Row::new();
227 row.add_cell(schema.id.into());
228 row.add_cell(schema.database_id.into());
229 row.add_cell(schema.name.into());
230 table.add_row(row);
231 }
232
233 let _ = writeln!(s, "SCHEMA");
234 let _ = writeln!(s, "{table}");
235 }
236
237 async fn write_worker_nodes(&self, s: &mut String) {
238 let Ok(worker_actor_count) = self.metadata_manager.worker_actor_count() else {
239 tracing::warn!("failed to get worker actor count");
240 return;
241 };
242
243 use comfy_table::{Row, Table};
244 let Ok(worker_nodes) = self.metadata_manager.list_worker_node(None, None).await else {
245 tracing::warn!("failed to get worker nodes");
246 return;
247 };
248 let mut table = Table::new();
249 table.set_header({
250 let mut row = Row::new();
251 row.add_cell("id".into());
252 row.add_cell("host".into());
253 row.add_cell("hostname".into());
254 row.add_cell("type".into());
255 row.add_cell("state".into());
256 row.add_cell("parallelism".into());
257 row.add_cell("resource_group".into());
258 row.add_cell("is_streaming".into());
259 row.add_cell("is_serving".into());
260 row.add_cell("is_iceberg_compactor".into());
261 row.add_cell("rw_version".into());
262 row.add_cell("total_memory_bytes".into());
263 row.add_cell("total_cpu_cores".into());
264 row.add_cell("started_at".into());
265 row.add_cell("actor_count".into());
266 row
267 });
268 for worker_node in worker_nodes {
269 let mut row = Row::new();
270 row.add_cell(worker_node.id.into());
271 try_add_cell(
272 &mut row,
273 worker_node
274 .host
275 .as_ref()
276 .map(|h| format!("{}:{}", h.host, h.port)),
277 );
278 try_add_cell(
279 &mut row,
280 worker_node.resource.as_ref().map(|r| r.hostname.clone()),
281 );
282 try_add_cell(
283 &mut row,
284 worker_node.get_type().ok().map(|t| t.as_str_name()),
285 );
286 try_add_cell(
287 &mut row,
288 worker_node.get_state().ok().map(|s| s.as_str_name()),
289 );
290 try_add_cell(&mut row, worker_node.parallelism());
291 try_add_cell(
292 &mut row,
293 worker_node
294 .property
295 .as_ref()
296 .map(|p| p.resource_group.clone().unwrap_or("".to_owned())),
297 );
298 let (is_streaming, is_serving) = {
300 if let Ok(t) = worker_node.get_type()
301 && t == WorkerType::ComputeNode
302 {
303 (
304 worker_node.property.as_ref().map(|p| p.is_streaming),
305 worker_node.property.as_ref().map(|p| p.is_serving),
306 )
307 } else {
308 (None, None)
309 }
310 };
311 try_add_cell(&mut row, is_streaming);
312 try_add_cell(&mut row, is_serving);
313 let is_iceberg_compactor = {
315 if let Ok(t) = worker_node.get_type()
316 && t == WorkerType::Compactor
317 {
318 worker_node
319 .property
320 .as_ref()
321 .map(|p| p.is_iceberg_compactor)
322 } else {
323 None
324 }
325 };
326 try_add_cell(&mut row, is_iceberg_compactor);
327 try_add_cell(
328 &mut row,
329 worker_node.resource.as_ref().map(|r| r.rw_version.clone()),
330 );
331 try_add_cell(
332 &mut row,
333 worker_node.resource.as_ref().map(|r| r.total_memory_bytes),
334 );
335 try_add_cell(
336 &mut row,
337 worker_node.resource.as_ref().map(|r| r.total_cpu_cores),
338 );
339 try_add_cell(
340 &mut row,
341 worker_node
342 .started_at
343 .and_then(|ts| Timestamptz::from_secs(ts as _).map(|t| t.to_string())),
344 );
345 let actor_count = {
346 if let Ok(t) = worker_node.get_type()
347 && t != WorkerType::ComputeNode
348 {
349 None
350 } else {
351 match worker_actor_count.get(&worker_node.id) {
352 None => Some(0),
353 Some(c) => Some(*c),
354 }
355 }
356 };
357 try_add_cell(&mut row, actor_count);
358 table.add_row(row);
359 }
360 let _ = writeln!(s, "{table}");
361 }
362
363 fn write_event_logs(&self, s: &mut String) {
364 let event_logs = self
365 .event_log_manager
366 .list_event_logs()
367 .into_iter()
368 .sorted_by(|a, b| {
369 a.timestamp
370 .unwrap_or(0)
371 .cmp(&b.timestamp.unwrap_or(0))
372 .reverse()
373 })
374 .collect_vec();
375
376 let _ = writeln!(s, "latest barrier completions");
377 Self::write_event_logs_impl(
378 s,
379 event_logs.iter(),
380 |e| {
381 let Event::BarrierComplete(info) = e else {
382 return None;
383 };
384 Some(json!(info).to_string())
385 },
386 Some(10),
387 );
388
389 let _ = writeln!(s);
390 let _ = writeln!(s, "latest barrier collection failures");
391 Self::write_event_logs_impl(
392 s,
393 event_logs.iter(),
394 |e| {
395 let Event::CollectBarrierFail(info) = e else {
396 return None;
397 };
398 Some(json!(info).to_string())
399 },
400 Some(3),
401 );
402
403 let _ = writeln!(s);
404 let _ = writeln!(s, "latest barrier injection failures");
405 Self::write_event_logs_impl(
406 s,
407 event_logs.iter(),
408 |e| {
409 let Event::InjectBarrierFail(info) = e else {
410 return None;
411 };
412 Some(json!(info).to_string())
413 },
414 Some(3),
415 );
416
417 let _ = writeln!(s);
418 let _ = writeln!(s, "latest worker node panics");
419 Self::write_event_logs_impl(
420 s,
421 event_logs.iter(),
422 |e| {
423 let Event::WorkerNodePanic(info) = e else {
424 return None;
425 };
426 Some(json!(info).to_string())
427 },
428 Some(10),
429 );
430
431 let _ = writeln!(s);
432 let _ = writeln!(s, "latest create stream job failures");
433 Self::write_event_logs_impl(
434 s,
435 event_logs.iter(),
436 |e| {
437 let Event::CreateStreamJobFail(info) = e else {
438 return None;
439 };
440 Some(json!(info).to_string())
441 },
442 Some(3),
443 );
444
445 let _ = writeln!(s);
446 let _ = writeln!(s, "latest dirty stream job clear-ups");
447 Self::write_event_logs_impl(
448 s,
449 event_logs.iter(),
450 |e| {
451 let Event::DirtyStreamJobClear(info) = e else {
452 return None;
453 };
454 Some(json!(info).to_string())
455 },
456 Some(3),
457 );
458 }
459
460 fn write_event_logs_impl<'a, F>(
461 s: &mut String,
462 event_logs: impl Iterator<Item = &'a EventLog>,
463 get_event_info: F,
464 limit: Option<usize>,
465 ) where
466 F: Fn(&Event) -> Option<String>,
467 {
468 use comfy_table::{Row, Table};
469 let mut table = Table::new();
470 table.set_header({
471 let mut row = Row::new();
472 row.add_cell("created_at".into());
473 row.add_cell("info".into());
474 row
475 });
476 let mut row_count = 0;
477 for event_log in event_logs {
478 let Some(ref inner) = event_log.event else {
479 continue;
480 };
481 if let Some(limit) = limit
482 && row_count >= limit
483 {
484 break;
485 }
486 let mut row = Row::new();
487 let ts = event_log
488 .timestamp
489 .and_then(|ts| Timestamptz::from_millis(ts as _).map(|ts| ts.to_string()));
490 try_add_cell(&mut row, ts);
491 if let Some(info) = get_event_info(inner) {
492 row.add_cell(info.into());
493 row_count += 1;
494 } else {
495 continue;
496 }
497 table.add_row(row);
498 }
499 let _ = writeln!(s, "{table}");
500 }
501
502 async fn write_storage(&self, s: &mut String) {
503 let mut sst_num = 0;
504 let mut sst_total_file_size = 0;
505 let back_pressured_compaction_groups = self
506 .hummock_manger
507 .write_limits()
508 .await
509 .into_iter()
510 .filter_map(|(k, v)| {
511 if v.table_ids.is_empty() {
512 None
513 } else {
514 Some(k)
515 }
516 })
517 .join(",");
518 if !back_pressured_compaction_groups.is_empty() {
519 let _ = writeln!(
520 s,
521 "back pressured compaction groups: {back_pressured_compaction_groups}"
522 );
523 }
524
525 #[derive(PartialEq, Eq)]
526 struct SstableSort {
527 compaction_group_id: CompactionGroupId,
528 sst_id: HummockSstableId,
529 delete_ratio: u64,
530 }
531 impl PartialOrd for SstableSort {
532 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
533 Some(self.cmp(other))
534 }
535 }
536 impl Ord for SstableSort {
537 fn cmp(&self, other: &Self) -> Ordering {
538 self.delete_ratio.cmp(&other.delete_ratio)
539 }
540 }
541 fn top_k_sstables(
542 top_k: usize,
543 heap: &mut BinaryHeap<Reverse<SstableSort>>,
544 e: SstableSort,
545 ) {
546 if heap.len() < top_k {
547 heap.push(Reverse(e));
548 } else if let Some(mut p) = heap.peek_mut()
549 && e.delete_ratio > p.0.delete_ratio
550 {
551 *p = Reverse(e);
552 }
553 }
554
555 let top_k = 10;
556 let mut top_tombstone_delete_sst = BinaryHeap::with_capacity(top_k);
557 let compaction_group_num = self
558 .hummock_manger
559 .on_current_version(|version| {
560 for compaction_group in version.levels.values() {
561 let mut visit_level = |level: &Level| {
562 sst_num += level.table_infos.len();
563 sst_total_file_size +=
564 level.table_infos.iter().map(|t| t.sst_size).sum::<u64>();
565 for sst in &level.table_infos {
566 if sst.total_key_count == 0 {
567 continue;
568 }
569 let tombstone_delete_ratio =
570 sst.stale_key_count * 10000 / sst.total_key_count;
571 let e = SstableSort {
572 compaction_group_id: compaction_group.group_id,
573 sst_id: sst.sst_id,
574 delete_ratio: tombstone_delete_ratio,
575 };
576 top_k_sstables(top_k, &mut top_tombstone_delete_sst, e);
577 }
578 };
579 let l0 = &compaction_group.l0;
580 for level in &l0.sub_levels {
582 visit_level(level);
583 }
584 for level in &compaction_group.levels {
585 visit_level(level);
586 }
587 }
588 version.levels.len()
589 })
590 .await;
591
592 let _ = writeln!(s, "number of SSTables: {sst_num}");
593 let _ = writeln!(s, "total size of SSTables (byte): {sst_total_file_size}");
594 let _ = writeln!(s, "number of compaction groups: {compaction_group_num}");
595 use comfy_table::{Row, Table};
596 fn format_table(heap: BinaryHeap<Reverse<SstableSort>>) -> Table {
597 let mut table = Table::new();
598 table.set_header({
599 let mut row = Row::new();
600 row.add_cell("compaction group id".into());
601 row.add_cell("sstable id".into());
602 row.add_cell("delete ratio".into());
603 row
604 });
605 for sst in &heap.into_sorted_vec() {
606 let mut row = Row::new();
607 row.add_cell(sst.0.compaction_group_id.into());
608 row.add_cell(sst.0.sst_id.into());
609 row.add_cell(format!("{:.2}%", sst.0.delete_ratio as f64 / 100.0).into());
610 table.add_row(row);
611 }
612 table
613 }
614 let _ = writeln!(s);
615 let _ = writeln!(s, "top tombstone delete ratio");
616 let _ = writeln!(s, "{}", format_table(top_tombstone_delete_sst));
617 let _ = writeln!(s);
618
619 let _ = writeln!(s);
620 self.write_storage_prometheus(s).await;
621 }
622
623 async fn write_iceberg_compaction_schedule(&self, s: &mut String) {
624 use comfy_table::{Row, Table};
625
626 let sink_names = match self.metadata_manager.catalog_controller.list_sinks().await {
627 Ok(sinks) => sinks
628 .into_iter()
629 .map(|sink| (sink.id.as_raw_id(), sink.name))
630 .collect::<BTreeMap<u32, String>>(),
631 Err(err) => {
632 tracing::warn!(error=?err.as_report(), "failed to list sinks");
633 return;
634 }
635 };
636
637 let statuses = self
638 .iceberg_compaction_manager
639 .list_compaction_statuses()
640 .await;
641
642 let _ = writeln!(s, "ICEBERG COMPACTION SCHEDULE");
643
644 let mut table = Table::new();
645 table.set_header({
646 let mut row = Row::new();
647 row.add_cell("sink_id".into());
648 row.add_cell("sink_name".into());
649 row.add_cell("task_type".into());
650 row.add_cell("schedule_state".into());
651 row.add_cell("trigger_interval_sec".into());
652 row.add_cell("trigger_snapshot_count".into());
653 row.add_cell("pending_snapshot_count".into());
654 row.add_cell("next_compaction_after_sec".into());
655 row.add_cell("is_triggerable".into());
656 row
657 });
658
659 for status in statuses {
660 let mut row = Row::new();
661 row.add_cell(status.sink_id.as_raw_id().into());
662 try_add_cell(
663 &mut row,
664 sink_names.get(&status.sink_id.as_raw_id()).cloned(),
665 );
666 row.add_cell(status.task_type.into());
667 row.add_cell(status.schedule_state.into());
668 row.add_cell(status.trigger_interval_sec.into());
669 row.add_cell((status.trigger_snapshot_count as u64).into());
670 try_add_cell(
671 &mut row,
672 status.pending_snapshot_count.map(|count| count as u64),
673 );
674 try_add_cell(&mut row, status.next_compaction_after_sec);
675 row.add_cell(status.is_triggerable.into());
676 table.add_row(row);
677 }
678
679 let _ = writeln!(s, "{table}");
680 }
681
682 async fn write_streaming_prometheus(&self, s: &mut String) {
683 let _ = writeln!(s, "top sources by throughput (rows/s)");
684 let query = format!(
685 "topk(3, sum(rate(stream_source_output_rows_counts{{{}}}[10m]))by (source_name))",
686 self.prometheus_selector
687 );
688 self.write_instant_vector_impl(s, &query, vec!["source_name"])
689 .await;
690
691 let _ = writeln!(s);
692 let _ = writeln!(s, "top materialized views by throughput (rows/s)");
693 let query = format!(
694 "topk(3, sum(rate(stream_mview_input_row_count{{{}}}[10m]))by (table_id))",
695 self.prometheus_selector
696 );
697 self.write_instant_vector_impl(s, &query, vec!["table_id"])
698 .await;
699
700 let _ = writeln!(s);
701 let _ = writeln!(s, "top join executor by matched rows");
702 let query = format!(
703 "topk(3, histogram_quantile(0.9, sum(rate(stream_join_matched_join_keys_bucket{{{}}}[10m])) by (le, fragment_id, table_id)))",
704 self.prometheus_selector
705 );
706 self.write_instant_vector_impl(s, &query, vec!["table_id", "fragment_id"])
707 .await;
708 }
709
710 async fn write_storage_prometheus(&self, s: &mut String) {
711 let _ = writeln!(s, "top Hummock Get by duration (second)");
712 let query = format!(
713 "topk(3, histogram_quantile(0.9, sum(rate(state_store_get_duration_bucket{{{}}}[10m])) by (le, table_id)))",
714 self.prometheus_selector
715 );
716 self.write_instant_vector_impl(s, &query, vec!["table_id"])
717 .await;
718
719 let _ = writeln!(s);
720 let _ = writeln!(s, "top Hummock Iter Init by duration (second)");
721 let query = format!(
722 "topk(3, histogram_quantile(0.9, sum(rate(state_store_iter_init_duration_bucket{{{}}}[10m])) by (le, table_id)))",
723 self.prometheus_selector
724 );
725 self.write_instant_vector_impl(s, &query, vec!["table_id"])
726 .await;
727
728 let _ = writeln!(s);
729 let _ = writeln!(s, "top table commit flush by size (byte)");
730 let query = format!(
731 "topk(3, sum(rate(storage_commit_write_throughput{{{}}}[10m])) by (table_id))",
732 self.prometheus_selector
733 );
734 self.write_instant_vector_impl(s, &query, vec!["table_id"])
735 .await;
736
737 let _ = writeln!(s);
738 let _ = writeln!(s, "object store read throughput (bytes/s)");
739 let query = format!(
740 "sum(rate(object_store_read_bytes{{{}}}[10m])) by (job, instance)",
741 merge_prometheus_selector([&self.prometheus_selector, "job=~\"compute|compactor\""])
742 );
743 self.write_instant_vector_impl(s, &query, vec!["job", "instance"])
744 .await;
745
746 let _ = writeln!(s);
747 let _ = writeln!(s, "object store write throughput (bytes/s)");
748 let query = format!(
749 "sum(rate(object_store_write_bytes{{{}}}[10m])) by (job, instance)",
750 merge_prometheus_selector([&self.prometheus_selector, "job=~\"compute|compactor\""])
751 );
752 self.write_instant_vector_impl(s, &query, vec!["job", "instance"])
753 .await;
754
755 let _ = writeln!(s);
756 let _ = writeln!(s, "object store operation rate");
757 let query = format!(
758 "sum(rate(object_store_operation_latency_count{{{}}}[10m])) by (le, type, job, instance)",
759 merge_prometheus_selector([
760 &self.prometheus_selector,
761 "job=~\"compute|compactor\", type!~\"streaming_upload_write_bytes|streaming_read_read_bytes|streaming_read\""
762 ])
763 );
764 self.write_instant_vector_impl(s, &query, vec!["type", "job", "instance"])
765 .await;
766
767 let _ = writeln!(s);
768 let _ = writeln!(s, "object store operation duration (second)");
769 let query = format!(
770 "histogram_quantile(0.9, sum(rate(object_store_operation_latency_bucket{{{}}}[10m])) by (le, type, job, instance))",
771 merge_prometheus_selector([
772 &self.prometheus_selector,
773 "job=~\"compute|compactor\", type!~\"streaming_upload_write_bytes|streaming_read\""
774 ])
775 );
776 self.write_instant_vector_impl(s, &query, vec!["type", "job", "instance"])
777 .await;
778 }
779
780 async fn write_instant_vector_impl(&self, s: &mut String, query: &str, labels: Vec<&str>) {
781 let Some(ref client) = self.prometheus_client else {
782 return;
783 };
784 if let Ok(Vector(instant_vec)) = client
785 .query(query)
786 .get()
787 .await
788 .map(|result| result.into_inner().0)
789 {
790 for i in instant_vec {
791 let l = labels
792 .iter()
793 .map(|label| {
794 format!(
795 "{}={}",
796 *label,
797 i.metric()
798 .get(*label)
799 .map(|s| s.as_str())
800 .unwrap_or_default()
801 )
802 })
803 .join(",");
804 let _ = writeln!(s, "{}: {:.3}", l, i.sample().value());
805 }
806 }
807 }
808
809 async fn write_await_tree(&self, s: &mut String, actor_traces_format: ActorTracesFormat) {
810 let all = dump_cluster_await_tree(
811 &self.metadata_manager,
812 &self.await_tree_reg,
813 actor_traces_format,
814 )
815 .await;
816
817 if let Ok(all) = all {
818 write!(s, "{}", all.output()).unwrap();
819 } else {
820 tracing::warn!("failed to dump await tree");
821 }
822 }
823
824 async fn write_table_definition(&self, s: &mut String) -> MetaResult<()> {
825 let sources = self
826 .metadata_manager
827 .catalog_controller
828 .list_sources()
829 .await?
830 .into_iter()
831 .map(|s| {
832 (
833 s.id.into(),
834 (
835 s.name,
836 s.database_id,
837 s.schema_id,
838 s.definition,
839 s.created_at_epoch,
840 ),
841 )
842 })
843 .collect::<BTreeMap<_, _>>();
844 let mut user_tables = BTreeMap::new();
845 let mut mvs = BTreeMap::new();
846 let mut indexes = BTreeMap::new();
847 let mut internal_tables = BTreeMap::new();
848 {
849 let grouped = self
850 .metadata_manager
851 .catalog_controller
852 .list_all_state_tables()
853 .await?
854 .into_iter()
855 .chunk_by(|t| t.table_type());
856 for (table_type, tables) in &grouped {
857 let tables = tables.into_iter().map(|t| {
858 (
859 t.id.into(),
860 (
861 t.name,
862 t.database_id,
863 t.schema_id,
864 t.definition,
865 t.created_at_epoch,
866 ),
867 )
868 });
869 match table_type {
870 PbTableType::Table => user_tables.extend(tables),
871 PbTableType::MaterializedView => mvs.extend(tables),
872 PbTableType::Index | PbTableType::VectorIndex => indexes.extend(tables),
873 PbTableType::Internal => internal_tables.extend(tables),
874 PbTableType::Unspecified => {
875 tracing::error!("unspecified table type: {:?}", tables.collect_vec());
876 }
877 }
878 }
879 }
880 let sinks = self
881 .metadata_manager
882 .catalog_controller
883 .list_sinks()
884 .await?
885 .into_iter()
886 .map(|s| {
887 (
888 s.id.into(),
889 (
890 s.name,
891 s.database_id,
892 s.schema_id,
893 s.definition,
894 s.created_at_epoch,
895 ),
896 )
897 })
898 .collect::<BTreeMap<_, _>>();
899 let views = self
900 .metadata_manager
901 .catalog_controller
902 .list_views()
903 .await?
904 .into_iter()
905 .map(|v| {
906 (
907 v.id.into(),
908 (v.name, v.database_id, v.schema_id, v.sql, None),
909 )
910 })
911 .collect::<BTreeMap<_, _>>();
912 let mut streaming_jobs = self
913 .metadata_manager
914 .catalog_controller
915 .list_streaming_job_infos()
916 .await?;
917 streaming_jobs.sort_by_key(|info| (info.obj_type as usize, info.job_id));
918 {
919 use comfy_table::{Row, Table};
920 let mut table = Table::new();
921 table.set_header({
922 let mut row = Row::new();
923 row.add_cell("job_id".into());
924 row.add_cell("name".into());
925 row.add_cell("obj_type".into());
926 row.add_cell("state".into());
927 row.add_cell("parallelism".into());
928 row.add_cell("max_parallelism".into());
929 row.add_cell("resource_group".into());
930 row.add_cell("database_id".into());
931 row.add_cell("schema_id".into());
932 row.add_cell("config_override".into());
933 row
934 });
935 for job in streaming_jobs {
936 let mut row = Row::new();
937 row.add_cell(job.job_id.into());
938 row.add_cell(job.name.into());
939 row.add_cell(job.obj_type.as_str().into());
940 row.add_cell(format_job_status(job.job_status).into());
941 row.add_cell(format_streaming_parallelism(&job.parallelism).into());
942 row.add_cell(job.max_parallelism.into());
943 row.add_cell(job.resource_group.into());
944 row.add_cell(job.database_id.into());
945 row.add_cell(job.schema_id.into());
946 row.add_cell(job.config_override.into());
947 table.add_row(row);
948 }
949 let _ = writeln!(s);
950 let _ = writeln!(s, "STREAMING JOB");
951 let _ = writeln!(s, "{table}");
952 }
953 let catalogs = [
954 ("SOURCE", sources),
955 ("TABLE", user_tables),
956 ("MATERIALIZED VIEW", mvs),
957 ("INDEX", indexes),
958 ("SINK", sinks),
959 ("VIEW", views),
960 ("INTERNAL TABLE", internal_tables),
961 ];
962 let mut obj_id_to_name: HashMap<ObjectId, _> = HashMap::new();
963 for (title, items) in catalogs {
964 use comfy_table::{Row, Table};
965 let mut table = Table::new();
966 table.set_header({
967 let mut row = Row::new();
968 row.add_cell("id".into());
969 row.add_cell("name".into());
970 row.add_cell("database_id".into());
971 row.add_cell("schema_id".into());
972 row.add_cell("created_at".into());
973 row.add_cell("definition".into());
974 row
975 });
976 for (id, (name, database_id, schema_id, definition, created_at_epoch)) in items {
977 obj_id_to_name.insert(id, name.clone());
978 let mut row = Row::new();
979 let may_redact = redact_sql(&definition, self.redact_sql_option_keywords.clone())
980 .unwrap_or_else(|| "[REDACTED]".into());
981 let created_at = if let Some(created_at_epoch) = created_at_epoch {
982 format!("{}", Epoch::from(created_at_epoch).as_timestamptz())
983 } else {
984 "".into()
985 };
986 row.add_cell(id.into());
987 row.add_cell(name.into());
988 row.add_cell(database_id.into());
989 row.add_cell(schema_id.into());
990 row.add_cell(created_at.into());
991 row.add_cell(may_redact.into());
992 table.add_row(row);
993 }
994 let _ = writeln!(s);
995 let _ = writeln!(s, "{title}");
996 let _ = writeln!(s, "{table}");
997 }
998
999 let actors = self
1000 .metadata_manager
1001 .catalog_controller
1002 .list_actor_info()
1003 .await?
1004 .into_iter()
1005 .map(|(actor_id, fragment_id, job_id, schema_id, obj_type)| {
1006 (
1007 actor_id,
1008 (
1009 fragment_id,
1010 job_id,
1011 schema_id,
1012 obj_type,
1013 obj_id_to_name.get(&job_id).cloned().unwrap_or_default(),
1014 ),
1015 )
1016 })
1017 .collect::<BTreeMap<_, _>>();
1018
1019 use comfy_table::{Row, Table};
1020 let mut table = Table::new();
1021 table.set_header({
1022 let mut row = Row::new();
1023 row.add_cell("id".into());
1024 row.add_cell("fragment_id".into());
1025 row.add_cell("job_id".into());
1026 row.add_cell("schema_id".into());
1027 row.add_cell("type".into());
1028 row.add_cell("name".into());
1029 row
1030 });
1031 for (actor_id, (fragment_id, job_id, schema_id, ddl_type, name)) in actors {
1032 let mut row = Row::new();
1033 row.add_cell(actor_id.into());
1034 row.add_cell(fragment_id.into());
1035 row.add_cell(job_id.into());
1036 row.add_cell(schema_id.into());
1037 row.add_cell(ddl_type.as_str().into());
1038 row.add_cell(name.into());
1039 table.add_row(row);
1040 }
1041 let _ = writeln!(s);
1042 let _ = writeln!(s, "ACTOR");
1043 let _ = writeln!(s, "{table}");
1044 Ok(())
1045 }
1046
1047 fn write_license(&self, s: &mut String) {
1048 use comfy_table::presets::ASCII_BORDERS_ONLY;
1049 use comfy_table::{ContentArrangement, Row, Table};
1050
1051 let mut table = Table::new();
1052 table.load_preset(ASCII_BORDERS_ONLY);
1053 table.set_content_arrangement(ContentArrangement::Dynamic);
1054 table.set_header({
1055 let mut row = Row::new();
1056 row.add_cell("field".into());
1057 row.add_cell("value".into());
1058 row
1059 });
1060
1061 match LicenseManager::get().license() {
1062 Ok(license) => {
1063 let fmt_option = |value: Option<u64>| match value {
1064 Some(v) => v.to_string(),
1065 None => "unlimited".to_owned(),
1066 };
1067
1068 let expires_at = if license.exp == u64::MAX {
1069 "never".to_owned()
1070 } else {
1071 let exp_i64 = license.exp as i64;
1072 chrono::DateTime::<chrono::Utc>::from_timestamp(exp_i64, 0)
1073 .map(|ts| ts.to_rfc3339())
1074 .unwrap_or_else(|| format!("invalid ({})", license.exp))
1075 };
1076
1077 let mut row = Row::new();
1078 row.add_cell("status".into());
1079 row.add_cell("valid".into());
1080 table.add_row(row);
1081
1082 let mut row = Row::new();
1083 row.add_cell("tier".into());
1084 row.add_cell(license.tier.name().into());
1085 table.add_row(row);
1086
1087 let mut row = Row::new();
1088 row.add_cell("expires_at".into());
1089 row.add_cell(expires_at.into());
1090 table.add_row(row);
1091
1092 let mut row = Row::new();
1093 row.add_cell("rwu_limit".into());
1094 row.add_cell(fmt_option(license.rwu_limit.map(|v| v.get())).into());
1095 table.add_row(row);
1096
1097 let mut row = Row::new();
1098 row.add_cell("cpu_core_limit".into());
1099 row.add_cell(fmt_option(license.cpu_core_limit()).into());
1100 table.add_row(row);
1101
1102 let mut row = Row::new();
1103 row.add_cell("memory_limit_bytes".into());
1104 row.add_cell(fmt_option(license.memory_limit()).into());
1105 table.add_row(row);
1106
1107 let mut features: Vec<_> = license
1108 .tier
1109 .available_features()
1110 .map(|f| f.name())
1111 .collect();
1112 features.sort_unstable();
1113 let feature_summary = format_features(&features);
1114
1115 let mut row = Row::new();
1116 row.add_cell("available_features".into());
1117 row.add_cell(feature_summary.into());
1118 table.add_row(row);
1119 }
1120 Err(error) => {
1121 let mut row = Row::new();
1122 row.add_cell("status".into());
1123 row.add_cell("invalid".into());
1124 table.add_row(row);
1125
1126 let mut row = Row::new();
1127 row.add_cell("error".into());
1128 row.add_cell(error.to_report_string().into());
1129 table.add_row(row);
1130 }
1131 }
1132
1133 let _ = writeln!(s, "LICENSE");
1134 let _ = writeln!(s, "{table}");
1135 }
1136
1137 async fn write_params(&self, s: &mut String) {
1138 let params = self.system_params_controller.get_params().await;
1139
1140 use comfy_table::{Row, Table};
1141 let mut table = Table::new();
1142 table.set_header({
1143 let mut row = Row::new();
1144 row.add_cell("key".into());
1145 row.add_cell("value".into());
1146 row
1147 });
1148
1149 let mut row = Row::new();
1150 row.add_cell("barrier_interval_ms".into());
1151 row.add_cell(params.barrier_interval_ms().to_string().into());
1152 table.add_row(row);
1153
1154 let mut row = Row::new();
1155 row.add_cell("checkpoint_frequency".into());
1156 row.add_cell(params.checkpoint_frequency().to_string().into());
1157 table.add_row(row);
1158
1159 let mut row = Row::new();
1160 row.add_cell("state_store".into());
1161 row.add_cell(params.state_store().to_owned().into());
1162 table.add_row(row);
1163
1164 let mut row = Row::new();
1165 row.add_cell("data_directory".into());
1166 row.add_cell(params.data_directory().to_owned().into());
1167 table.add_row(row);
1168
1169 let mut row = Row::new();
1170 row.add_cell("max_concurrent_creating_streaming_jobs".into());
1171 row.add_cell(
1172 params
1173 .max_concurrent_creating_streaming_jobs()
1174 .to_string()
1175 .into(),
1176 );
1177 table.add_row(row);
1178
1179 let mut row = Row::new();
1180 row.add_cell("time_travel_retention_ms".into());
1181 row.add_cell(params.time_travel_retention_ms().to_string().into());
1182 table.add_row(row);
1183
1184 let mut row = Row::new();
1185 row.add_cell("adaptive_parallelism_strategy".into());
1186 row.add_cell(params.adaptive_parallelism_strategy().to_string().into());
1187 table.add_row(row);
1188
1189 let mut row = Row::new();
1190 row.add_cell("per_database_isolation".into());
1191 row.add_cell(params.per_database_isolation().to_string().into());
1192 table.add_row(row);
1193
1194 let _ = writeln!(s, "SYSTEM PARAMS");
1195 let _ = writeln!(s, "{table}");
1196 }
1197}
1198
1199fn try_add_cell<T: Into<comfy_table::Cell>>(row: &mut comfy_table::Row, t: Option<T>) {
1200 match t {
1201 Some(t) => {
1202 row.add_cell(t.into());
1203 }
1204 None => {
1205 row.add_cell("".into());
1206 }
1207 }
1208}
1209
1210fn merge_prometheus_selector<'a>(selectors: impl IntoIterator<Item = &'a str>) -> String {
1211 selectors.into_iter().filter(|s| !s.is_empty()).join(",")
1212}
1213
1214fn redact_sql(sql: &str, keywords: RedactSqlOptionKeywordsRef) -> Option<String> {
1215 match Parser::parse_sql(sql) {
1216 Ok(sqls) => Some(
1217 sqls.into_iter()
1218 .map(|sql| sql.to_redacted_string(keywords.clone()))
1219 .join(";"),
1220 ),
1221 Err(_) => None,
1222 }
1223}
1224
1225fn format_features(features: &[&'static str]) -> String {
1226 if features.is_empty() {
1227 return "(none)".into();
1228 }
1229
1230 const PER_LINE: usize = 6;
1231 features
1232 .chunks(PER_LINE)
1233 .map(|chunk| format!(" {}", chunk.join(", ")))
1234 .collect::<Vec<_>>()
1235 .join("\n")
1236}
1237
1238fn format_job_status(status: JobStatus) -> &'static str {
1239 match status {
1240 JobStatus::Initial => "initial",
1241 JobStatus::Creating => "creating",
1242 JobStatus::Created => "created",
1243 }
1244}
1245
1246fn format_streaming_parallelism(parallelism: &StreamingParallelism) -> String {
1247 match parallelism {
1248 StreamingParallelism::Adaptive => "adaptive".into(),
1249 StreamingParallelism::Fixed(n) => format!("fixed({n})"),
1250 StreamingParallelism::Custom => "custom".into(),
1251 }
1252}