1use std::cmp::{Ordering, Reverse};
16use std::collections::{BTreeMap, BinaryHeap, HashMap};
17use std::fmt::Write;
18use std::sync::Arc;
19
20use itertools::Itertools;
21use prometheus_http_query::response::Data::Vector;
22use risingwave_common::id::ObjectId;
23use risingwave_common::system_param::reader::SystemParamsRead;
24use risingwave_common::types::Timestamptz;
25use risingwave_common::util::StackTraceResponseExt;
26use risingwave_common::util::epoch::Epoch;
27use risingwave_hummock_sdk::HummockSstableId;
28use risingwave_hummock_sdk::level::Level;
29use risingwave_license::LicenseManager;
30use risingwave_meta_model::{JobStatus, StreamingParallelism};
31use risingwave_pb::catalog::table::PbTableType;
32use risingwave_pb::common::WorkerType;
33use risingwave_pb::meta::EventLog;
34use risingwave_pb::meta::event_log::Event;
35use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
36use risingwave_sqlparser::ast::RedactSqlOptionKeywordsRef;
37use risingwave_sqlparser::parser::Parser;
38use serde_json::json;
39use thiserror_ext::AsReport;
40
41use crate::MetaResult;
42use crate::controller::system_param::SystemParamsControllerRef;
43use crate::hummock::HummockManagerRef;
44use crate::manager::MetadataManager;
45use crate::manager::event_log::EventLogManagerRef;
46use crate::rpc::await_tree::dump_cluster_await_tree;
47
48pub type DiagnoseCommandRef = Arc<DiagnoseCommand>;
49
50pub struct DiagnoseCommand {
51 metadata_manager: MetadataManager,
52 await_tree_reg: await_tree::Registry,
53 hummock_manger: HummockManagerRef,
54 event_log_manager: EventLogManagerRef,
55 prometheus_client: Option<prometheus_http_query::Client>,
56 prometheus_selector: String,
57 redact_sql_option_keywords: RedactSqlOptionKeywordsRef,
58 system_params_controller: SystemParamsControllerRef,
59}
60
61impl DiagnoseCommand {
62 pub fn new(
63 metadata_manager: MetadataManager,
64 await_tree_reg: await_tree::Registry,
65 hummock_manger: HummockManagerRef,
66 event_log_manager: EventLogManagerRef,
67 prometheus_client: Option<prometheus_http_query::Client>,
68 prometheus_selector: String,
69 redact_sql_option_keywords: RedactSqlOptionKeywordsRef,
70 system_params_controller: SystemParamsControllerRef,
71 ) -> Self {
72 Self {
73 metadata_manager,
74 await_tree_reg,
75 hummock_manger,
76 event_log_manager,
77 prometheus_client,
78 prometheus_selector,
79 redact_sql_option_keywords,
80 system_params_controller,
81 }
82 }
83
84 pub async fn report(&self, actor_traces_format: ActorTracesFormat) -> String {
85 let mut report = String::new();
86 let _ = writeln!(
87 report,
88 "report created at: {}\nversion: {}",
89 chrono::DateTime::<chrono::offset::Utc>::from(std::time::SystemTime::now()),
90 risingwave_common::current_cluster_version(),
91 );
92 let _ = writeln!(report);
93 self.write_license(&mut report);
94 let _ = writeln!(report);
95 self.write_catalog(&mut report).await;
96 let _ = writeln!(report);
97 self.write_worker_nodes(&mut report).await;
98 let _ = writeln!(report);
99 self.write_streaming_prometheus(&mut report).await;
100 let _ = writeln!(report);
101 self.write_storage(&mut report).await;
102 let _ = writeln!(report);
103 self.write_event_logs(&mut report);
104 let _ = writeln!(report);
105 self.write_params(&mut report).await;
106 let _ = writeln!(report);
107 self.write_await_tree(&mut report, actor_traces_format)
108 .await;
109
110 report
111 }
112
113 async fn write_catalog(&self, s: &mut String) {
114 self.write_catalog_inner(s).await;
115 let _ = self.write_table_definition(s).await.inspect_err(|e| {
116 tracing::warn!(
117 error = e.to_report_string(),
118 "failed to display table definition"
119 )
120 });
121 }
122
123 async fn write_catalog_inner(&self, s: &mut String) {
124 let stats = self.metadata_manager.catalog_controller.stats().await;
125
126 let stat = match stats {
127 Ok(stat) => stat,
128 Err(err) => {
129 tracing::warn!(error=?err.as_report(), "failed to get catalog stats");
130 return;
131 }
132 };
133 let _ = writeln!(s, "number of database: {}", stat.database_num);
134 let _ = writeln!(s, "number of fragment: {}", stat.streaming_job_num);
135 let _ = writeln!(s, "number of actor: {}", stat.actor_num);
136 let _ = writeln!(s, "number of source: {}", stat.source_num);
137 let _ = writeln!(s, "number of table: {}", stat.table_num);
138 let _ = writeln!(s, "number of materialized view: {}", stat.mview_num);
139 let _ = writeln!(s, "number of sink: {}", stat.sink_num);
140 let _ = writeln!(s, "number of index: {}", stat.index_num);
141 let _ = writeln!(s, "number of function: {}", stat.function_num);
142
143 self.write_databases(s).await;
144 self.write_schemas(s).await;
145 }
146
147 async fn write_databases(&self, s: &mut String) {
148 let databases = match self
149 .metadata_manager
150 .catalog_controller
151 .list_databases()
152 .await
153 {
154 Ok(databases) => databases,
155 Err(err) => {
156 tracing::warn!(error=?err.as_report(), "failed to list databases");
157 return;
158 }
159 };
160
161 use comfy_table::{Row, Table};
162 let mut table = Table::new();
163 table.set_header({
164 let mut row = Row::new();
165 row.add_cell("id".into());
166 row.add_cell("name".into());
167 row.add_cell("resource_group".into());
168 row.add_cell("barrier_interval_ms".into());
169 row.add_cell("checkpoint_frequency".into());
170 row
171 });
172 for db in databases {
173 let mut row = Row::new();
174 row.add_cell(db.id.into());
175 row.add_cell(db.name.into());
176 row.add_cell(db.resource_group.into());
177 row.add_cell(
178 db.barrier_interval_ms
179 .map(|v| v.to_string())
180 .unwrap_or("default".into())
181 .into(),
182 );
183 row.add_cell(
184 db.checkpoint_frequency
185 .map(|v| v.to_string())
186 .unwrap_or("default".into())
187 .into(),
188 );
189 table.add_row(row);
190 }
191
192 let _ = writeln!(s, "DATABASE");
193 let _ = writeln!(s, "{table}");
194 }
195
196 async fn write_schemas(&self, s: &mut String) {
197 let schemas = match self
198 .metadata_manager
199 .catalog_controller
200 .list_schemas()
201 .await
202 {
203 Ok(schemas) => schemas,
204 Err(err) => {
205 tracing::warn!(error=?err.as_report(), "failed to list schemas");
206 return;
207 }
208 };
209
210 use comfy_table::{Row, Table};
211 let mut table = Table::new();
212 table.set_header({
213 let mut row = Row::new();
214 row.add_cell("id".into());
215 row.add_cell("database_id".into());
216 row.add_cell("name".into());
217 row
218 });
219 for schema in schemas {
220 let mut row = Row::new();
221 row.add_cell(schema.id.into());
222 row.add_cell(schema.database_id.into());
223 row.add_cell(schema.name.into());
224 table.add_row(row);
225 }
226
227 let _ = writeln!(s, "SCHEMA");
228 let _ = writeln!(s, "{table}");
229 }
230
231 async fn write_worker_nodes(&self, s: &mut String) {
232 let Ok(worker_actor_count) = self.metadata_manager.worker_actor_count() else {
233 tracing::warn!("failed to get worker actor count");
234 return;
235 };
236
237 use comfy_table::{Row, Table};
238 let Ok(worker_nodes) = self.metadata_manager.list_worker_node(None, None).await else {
239 tracing::warn!("failed to get worker nodes");
240 return;
241 };
242 let mut table = Table::new();
243 table.set_header({
244 let mut row = Row::new();
245 row.add_cell("id".into());
246 row.add_cell("host".into());
247 row.add_cell("type".into());
248 row.add_cell("state".into());
249 row.add_cell("parallelism".into());
250 row.add_cell("resource_group".into());
251 row.add_cell("is_streaming".into());
252 row.add_cell("is_serving".into());
253 row.add_cell("is_iceberg_compactor".into());
254 row.add_cell("rw_version".into());
255 row.add_cell("total_memory_bytes".into());
256 row.add_cell("total_cpu_cores".into());
257 row.add_cell("started_at".into());
258 row.add_cell("actor_count".into());
259 row
260 });
261 for worker_node in worker_nodes {
262 let mut row = Row::new();
263 row.add_cell(worker_node.id.into());
264 try_add_cell(
265 &mut row,
266 worker_node
267 .host
268 .as_ref()
269 .map(|h| format!("{}:{}", h.host, h.port)),
270 );
271 try_add_cell(
272 &mut row,
273 worker_node.get_type().ok().map(|t| t.as_str_name()),
274 );
275 try_add_cell(
276 &mut row,
277 worker_node.get_state().ok().map(|s| s.as_str_name()),
278 );
279 try_add_cell(&mut row, worker_node.parallelism());
280 try_add_cell(
281 &mut row,
282 worker_node
283 .property
284 .as_ref()
285 .map(|p| p.resource_group.clone().unwrap_or("".to_owned())),
286 );
287 let (is_streaming, is_serving) = {
289 if let Ok(t) = worker_node.get_type()
290 && t == WorkerType::ComputeNode
291 {
292 (
293 worker_node.property.as_ref().map(|p| p.is_streaming),
294 worker_node.property.as_ref().map(|p| p.is_serving),
295 )
296 } else {
297 (None, None)
298 }
299 };
300 try_add_cell(&mut row, is_streaming);
301 try_add_cell(&mut row, is_serving);
302 let is_iceberg_compactor = {
304 if let Ok(t) = worker_node.get_type()
305 && t == WorkerType::Compactor
306 {
307 worker_node
308 .property
309 .as_ref()
310 .map(|p| p.is_iceberg_compactor)
311 } else {
312 None
313 }
314 };
315 try_add_cell(&mut row, is_iceberg_compactor);
316 try_add_cell(
317 &mut row,
318 worker_node.resource.as_ref().map(|r| r.rw_version.clone()),
319 );
320 try_add_cell(
321 &mut row,
322 worker_node.resource.as_ref().map(|r| r.total_memory_bytes),
323 );
324 try_add_cell(
325 &mut row,
326 worker_node.resource.as_ref().map(|r| r.total_cpu_cores),
327 );
328 try_add_cell(
329 &mut row,
330 worker_node
331 .started_at
332 .and_then(|ts| Timestamptz::from_secs(ts as _).map(|t| t.to_string())),
333 );
334 let actor_count = {
335 if let Ok(t) = worker_node.get_type()
336 && t != WorkerType::ComputeNode
337 {
338 None
339 } else {
340 match worker_actor_count.get(&worker_node.id) {
341 None => Some(0),
342 Some(c) => Some(*c),
343 }
344 }
345 };
346 try_add_cell(&mut row, actor_count);
347 table.add_row(row);
348 }
349 let _ = writeln!(s, "{table}");
350 }
351
352 fn write_event_logs(&self, s: &mut String) {
353 let event_logs = self
354 .event_log_manager
355 .list_event_logs()
356 .into_iter()
357 .sorted_by(|a, b| {
358 a.timestamp
359 .unwrap_or(0)
360 .cmp(&b.timestamp.unwrap_or(0))
361 .reverse()
362 })
363 .collect_vec();
364
365 let _ = writeln!(s, "latest barrier completions");
366 Self::write_event_logs_impl(
367 s,
368 event_logs.iter(),
369 |e| {
370 let Event::BarrierComplete(info) = e else {
371 return None;
372 };
373 Some(json!(info).to_string())
374 },
375 Some(10),
376 );
377
378 let _ = writeln!(s);
379 let _ = writeln!(s, "latest barrier collection failures");
380 Self::write_event_logs_impl(
381 s,
382 event_logs.iter(),
383 |e| {
384 let Event::CollectBarrierFail(info) = e else {
385 return None;
386 };
387 Some(json!(info).to_string())
388 },
389 Some(3),
390 );
391
392 let _ = writeln!(s);
393 let _ = writeln!(s, "latest barrier injection failures");
394 Self::write_event_logs_impl(
395 s,
396 event_logs.iter(),
397 |e| {
398 let Event::InjectBarrierFail(info) = e else {
399 return None;
400 };
401 Some(json!(info).to_string())
402 },
403 Some(3),
404 );
405
406 let _ = writeln!(s);
407 let _ = writeln!(s, "latest worker node panics");
408 Self::write_event_logs_impl(
409 s,
410 event_logs.iter(),
411 |e| {
412 let Event::WorkerNodePanic(info) = e else {
413 return None;
414 };
415 Some(json!(info).to_string())
416 },
417 Some(10),
418 );
419
420 let _ = writeln!(s);
421 let _ = writeln!(s, "latest create stream job failures");
422 Self::write_event_logs_impl(
423 s,
424 event_logs.iter(),
425 |e| {
426 let Event::CreateStreamJobFail(info) = e else {
427 return None;
428 };
429 Some(json!(info).to_string())
430 },
431 Some(3),
432 );
433
434 let _ = writeln!(s);
435 let _ = writeln!(s, "latest dirty stream job clear-ups");
436 Self::write_event_logs_impl(
437 s,
438 event_logs.iter(),
439 |e| {
440 let Event::DirtyStreamJobClear(info) = e else {
441 return None;
442 };
443 Some(json!(info).to_string())
444 },
445 Some(3),
446 );
447 }
448
449 fn write_event_logs_impl<'a, F>(
450 s: &mut String,
451 event_logs: impl Iterator<Item = &'a EventLog>,
452 get_event_info: F,
453 limit: Option<usize>,
454 ) where
455 F: Fn(&Event) -> Option<String>,
456 {
457 use comfy_table::{Row, Table};
458 let mut table = Table::new();
459 table.set_header({
460 let mut row = Row::new();
461 row.add_cell("created_at".into());
462 row.add_cell("info".into());
463 row
464 });
465 let mut row_count = 0;
466 for event_log in event_logs {
467 let Some(ref inner) = event_log.event else {
468 continue;
469 };
470 if let Some(limit) = limit
471 && row_count >= limit
472 {
473 break;
474 }
475 let mut row = Row::new();
476 let ts = event_log
477 .timestamp
478 .and_then(|ts| Timestamptz::from_millis(ts as _).map(|ts| ts.to_string()));
479 try_add_cell(&mut row, ts);
480 if let Some(info) = get_event_info(inner) {
481 row.add_cell(info.into());
482 row_count += 1;
483 } else {
484 continue;
485 }
486 table.add_row(row);
487 }
488 let _ = writeln!(s, "{table}");
489 }
490
491 async fn write_storage(&self, s: &mut String) {
492 let mut sst_num = 0;
493 let mut sst_total_file_size = 0;
494 let back_pressured_compaction_groups = self
495 .hummock_manger
496 .write_limits()
497 .await
498 .into_iter()
499 .filter_map(|(k, v)| {
500 if v.table_ids.is_empty() {
501 None
502 } else {
503 Some(k)
504 }
505 })
506 .join(",");
507 if !back_pressured_compaction_groups.is_empty() {
508 let _ = writeln!(
509 s,
510 "back pressured compaction groups: {back_pressured_compaction_groups}"
511 );
512 }
513
514 #[derive(PartialEq, Eq)]
515 struct SstableSort {
516 compaction_group_id: u64,
517 sst_id: HummockSstableId,
518 delete_ratio: u64,
519 }
520 impl PartialOrd for SstableSort {
521 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
522 Some(self.cmp(other))
523 }
524 }
525 impl Ord for SstableSort {
526 fn cmp(&self, other: &Self) -> Ordering {
527 self.delete_ratio.cmp(&other.delete_ratio)
528 }
529 }
530 fn top_k_sstables(
531 top_k: usize,
532 heap: &mut BinaryHeap<Reverse<SstableSort>>,
533 e: SstableSort,
534 ) {
535 if heap.len() < top_k {
536 heap.push(Reverse(e));
537 } else if let Some(mut p) = heap.peek_mut()
538 && e.delete_ratio > p.0.delete_ratio
539 {
540 *p = Reverse(e);
541 }
542 }
543
544 let top_k = 10;
545 let mut top_tombstone_delete_sst = BinaryHeap::with_capacity(top_k);
546 let compaction_group_num = self
547 .hummock_manger
548 .on_current_version(|version| {
549 for compaction_group in version.levels.values() {
550 let mut visit_level = |level: &Level| {
551 sst_num += level.table_infos.len();
552 sst_total_file_size +=
553 level.table_infos.iter().map(|t| t.sst_size).sum::<u64>();
554 for sst in &level.table_infos {
555 if sst.total_key_count == 0 {
556 continue;
557 }
558 let tombstone_delete_ratio =
559 sst.stale_key_count * 10000 / sst.total_key_count;
560 let e = SstableSort {
561 compaction_group_id: compaction_group.group_id,
562 sst_id: sst.sst_id,
563 delete_ratio: tombstone_delete_ratio,
564 };
565 top_k_sstables(top_k, &mut top_tombstone_delete_sst, e);
566 }
567 };
568 let l0 = &compaction_group.l0;
569 for level in &l0.sub_levels {
571 visit_level(level);
572 }
573 for level in &compaction_group.levels {
574 visit_level(level);
575 }
576 }
577 version.levels.len()
578 })
579 .await;
580
581 let _ = writeln!(s, "number of SSTables: {sst_num}");
582 let _ = writeln!(s, "total size of SSTables (byte): {sst_total_file_size}");
583 let _ = writeln!(s, "number of compaction groups: {compaction_group_num}");
584 use comfy_table::{Row, Table};
585 fn format_table(heap: BinaryHeap<Reverse<SstableSort>>) -> Table {
586 let mut table = Table::new();
587 table.set_header({
588 let mut row = Row::new();
589 row.add_cell("compaction group id".into());
590 row.add_cell("sstable id".into());
591 row.add_cell("delete ratio".into());
592 row
593 });
594 for sst in &heap.into_sorted_vec() {
595 let mut row = Row::new();
596 row.add_cell(sst.0.compaction_group_id.into());
597 row.add_cell(sst.0.sst_id.into());
598 row.add_cell(format!("{:.2}%", sst.0.delete_ratio as f64 / 100.0).into());
599 table.add_row(row);
600 }
601 table
602 }
603 let _ = writeln!(s);
604 let _ = writeln!(s, "top tombstone delete ratio");
605 let _ = writeln!(s, "{}", format_table(top_tombstone_delete_sst));
606 let _ = writeln!(s);
607
608 let _ = writeln!(s);
609 self.write_storage_prometheus(s).await;
610 }
611
612 async fn write_streaming_prometheus(&self, s: &mut String) {
613 let _ = writeln!(s, "top sources by throughput (rows/s)");
614 let query = format!(
615 "topk(3, sum(rate(stream_source_output_rows_counts{{{}}}[10m]))by (source_name))",
616 self.prometheus_selector
617 );
618 self.write_instant_vector_impl(s, &query, vec!["source_name"])
619 .await;
620
621 let _ = writeln!(s);
622 let _ = writeln!(s, "top materialized views by throughput (rows/s)");
623 let query = format!(
624 "topk(3, sum(rate(stream_mview_input_row_count{{{}}}[10m]))by (table_id))",
625 self.prometheus_selector
626 );
627 self.write_instant_vector_impl(s, &query, vec!["table_id"])
628 .await;
629
630 let _ = writeln!(s);
631 let _ = writeln!(s, "top join executor by matched rows");
632 let query = format!(
633 "topk(3, histogram_quantile(0.9, sum(rate(stream_join_matched_join_keys_bucket{{{}}}[10m])) by (le, fragment_id, table_id)))",
634 self.prometheus_selector
635 );
636 self.write_instant_vector_impl(s, &query, vec!["table_id", "fragment_id"])
637 .await;
638 }
639
640 async fn write_storage_prometheus(&self, s: &mut String) {
641 let _ = writeln!(s, "top Hummock Get by duration (second)");
642 let query = format!(
643 "topk(3, histogram_quantile(0.9, sum(rate(state_store_get_duration_bucket{{{}}}[10m])) by (le, table_id)))",
644 self.prometheus_selector
645 );
646 self.write_instant_vector_impl(s, &query, vec!["table_id"])
647 .await;
648
649 let _ = writeln!(s);
650 let _ = writeln!(s, "top Hummock Iter Init by duration (second)");
651 let query = format!(
652 "topk(3, histogram_quantile(0.9, sum(rate(state_store_iter_init_duration_bucket{{{}}}[10m])) by (le, table_id)))",
653 self.prometheus_selector
654 );
655 self.write_instant_vector_impl(s, &query, vec!["table_id"])
656 .await;
657
658 let _ = writeln!(s);
659 let _ = writeln!(s, "top table commit flush by size (byte)");
660 let query = format!(
661 "topk(3, sum(rate(storage_commit_write_throughput{{{}}}[10m])) by (table_id))",
662 self.prometheus_selector
663 );
664 self.write_instant_vector_impl(s, &query, vec!["table_id"])
665 .await;
666
667 let _ = writeln!(s);
668 let _ = writeln!(s, "object store read throughput (bytes/s)");
669 let query = format!(
670 "sum(rate(object_store_read_bytes{{{}}}[10m])) by (job, instance)",
671 merge_prometheus_selector([&self.prometheus_selector, "job=~\"compute|compactor\""])
672 );
673 self.write_instant_vector_impl(s, &query, vec!["job", "instance"])
674 .await;
675
676 let _ = writeln!(s);
677 let _ = writeln!(s, "object store write throughput (bytes/s)");
678 let query = format!(
679 "sum(rate(object_store_write_bytes{{{}}}[10m])) by (job, instance)",
680 merge_prometheus_selector([&self.prometheus_selector, "job=~\"compute|compactor\""])
681 );
682 self.write_instant_vector_impl(s, &query, vec!["job", "instance"])
683 .await;
684
685 let _ = writeln!(s);
686 let _ = writeln!(s, "object store operation rate");
687 let query = format!(
688 "sum(rate(object_store_operation_latency_count{{{}}}[10m])) by (le, type, job, instance)",
689 merge_prometheus_selector([
690 &self.prometheus_selector,
691 "job=~\"compute|compactor\", type!~\"streaming_upload_write_bytes|streaming_read_read_bytes|streaming_read\""
692 ])
693 );
694 self.write_instant_vector_impl(s, &query, vec!["type", "job", "instance"])
695 .await;
696
697 let _ = writeln!(s);
698 let _ = writeln!(s, "object store operation duration (second)");
699 let query = format!(
700 "histogram_quantile(0.9, sum(rate(object_store_operation_latency_bucket{{{}}}[10m])) by (le, type, job, instance))",
701 merge_prometheus_selector([
702 &self.prometheus_selector,
703 "job=~\"compute|compactor\", type!~\"streaming_upload_write_bytes|streaming_read\""
704 ])
705 );
706 self.write_instant_vector_impl(s, &query, vec!["type", "job", "instance"])
707 .await;
708 }
709
710 async fn write_instant_vector_impl(&self, s: &mut String, query: &str, labels: Vec<&str>) {
711 let Some(ref client) = self.prometheus_client else {
712 return;
713 };
714 if let Ok(Vector(instant_vec)) = client
715 .query(query)
716 .get()
717 .await
718 .map(|result| result.into_inner().0)
719 {
720 for i in instant_vec {
721 let l = labels
722 .iter()
723 .map(|label| {
724 format!(
725 "{}={}",
726 *label,
727 i.metric()
728 .get(*label)
729 .map(|s| s.as_str())
730 .unwrap_or_default()
731 )
732 })
733 .join(",");
734 let _ = writeln!(s, "{}: {:.3}", l, i.sample().value());
735 }
736 }
737 }
738
739 async fn write_await_tree(&self, s: &mut String, actor_traces_format: ActorTracesFormat) {
740 let all = dump_cluster_await_tree(
741 &self.metadata_manager,
742 &self.await_tree_reg,
743 actor_traces_format,
744 )
745 .await;
746
747 if let Ok(all) = all {
748 write!(s, "{}", all.output()).unwrap();
749 } else {
750 tracing::warn!("failed to dump await tree");
751 }
752 }
753
754 async fn write_table_definition(&self, s: &mut String) -> MetaResult<()> {
755 let sources = self
756 .metadata_manager
757 .catalog_controller
758 .list_sources()
759 .await?
760 .into_iter()
761 .map(|s| {
762 (
763 s.id.into(),
764 (
765 s.name,
766 s.database_id,
767 s.schema_id,
768 s.definition,
769 s.created_at_epoch,
770 ),
771 )
772 })
773 .collect::<BTreeMap<_, _>>();
774 let mut user_tables = BTreeMap::new();
775 let mut mvs = BTreeMap::new();
776 let mut indexes = BTreeMap::new();
777 let mut internal_tables = BTreeMap::new();
778 {
779 let grouped = self
780 .metadata_manager
781 .catalog_controller
782 .list_all_state_tables()
783 .await?
784 .into_iter()
785 .chunk_by(|t| t.table_type());
786 for (table_type, tables) in &grouped {
787 let tables = tables.into_iter().map(|t| {
788 (
789 t.id.into(),
790 (
791 t.name,
792 t.database_id,
793 t.schema_id,
794 t.definition,
795 t.created_at_epoch,
796 ),
797 )
798 });
799 match table_type {
800 PbTableType::Table => user_tables.extend(tables),
801 PbTableType::MaterializedView => mvs.extend(tables),
802 PbTableType::Index | PbTableType::VectorIndex => indexes.extend(tables),
803 PbTableType::Internal => internal_tables.extend(tables),
804 PbTableType::Unspecified => {
805 tracing::error!("unspecified table type: {:?}", tables.collect_vec());
806 }
807 }
808 }
809 }
810 let sinks = self
811 .metadata_manager
812 .catalog_controller
813 .list_sinks()
814 .await?
815 .into_iter()
816 .map(|s| {
817 (
818 s.id.into(),
819 (
820 s.name,
821 s.database_id,
822 s.schema_id,
823 s.definition,
824 s.created_at_epoch,
825 ),
826 )
827 })
828 .collect::<BTreeMap<_, _>>();
829 let views = self
830 .metadata_manager
831 .catalog_controller
832 .list_views()
833 .await?
834 .into_iter()
835 .map(|v| {
836 (
837 v.id.into(),
838 (v.name, v.database_id, v.schema_id, v.sql, None),
839 )
840 })
841 .collect::<BTreeMap<_, _>>();
842 let mut streaming_jobs = self
843 .metadata_manager
844 .catalog_controller
845 .list_streaming_job_infos()
846 .await?;
847 streaming_jobs.sort_by_key(|info| (info.obj_type as usize, info.job_id));
848 {
849 use comfy_table::{Row, Table};
850 let mut table = Table::new();
851 table.set_header({
852 let mut row = Row::new();
853 row.add_cell("job_id".into());
854 row.add_cell("name".into());
855 row.add_cell("obj_type".into());
856 row.add_cell("state".into());
857 row.add_cell("parallelism".into());
858 row.add_cell("max_parallelism".into());
859 row.add_cell("resource_group".into());
860 row.add_cell("database_id".into());
861 row.add_cell("schema_id".into());
862 row.add_cell("config_override".into());
863 row
864 });
865 for job in streaming_jobs {
866 let mut row = Row::new();
867 row.add_cell(job.job_id.into());
868 row.add_cell(job.name.into());
869 row.add_cell(job.obj_type.as_str().into());
870 row.add_cell(format_job_status(job.job_status).into());
871 row.add_cell(format_streaming_parallelism(&job.parallelism).into());
872 row.add_cell(job.max_parallelism.into());
873 row.add_cell(job.resource_group.into());
874 row.add_cell(job.database_id.into());
875 row.add_cell(job.schema_id.into());
876 row.add_cell(job.config_override.into());
877 table.add_row(row);
878 }
879 let _ = writeln!(s);
880 let _ = writeln!(s, "STREAMING JOB");
881 let _ = writeln!(s, "{table}");
882 }
883 let catalogs = [
884 ("SOURCE", sources),
885 ("TABLE", user_tables),
886 ("MATERIALIZED VIEW", mvs),
887 ("INDEX", indexes),
888 ("SINK", sinks),
889 ("VIEW", views),
890 ("INTERNAL TABLE", internal_tables),
891 ];
892 let mut obj_id_to_name: HashMap<ObjectId, _> = HashMap::new();
893 for (title, items) in catalogs {
894 use comfy_table::{Row, Table};
895 let mut table = Table::new();
896 table.set_header({
897 let mut row = Row::new();
898 row.add_cell("id".into());
899 row.add_cell("name".into());
900 row.add_cell("database_id".into());
901 row.add_cell("schema_id".into());
902 row.add_cell("created_at".into());
903 row.add_cell("definition".into());
904 row
905 });
906 for (id, (name, database_id, schema_id, definition, created_at_epoch)) in items {
907 obj_id_to_name.insert(id, name.clone());
908 let mut row = Row::new();
909 let may_redact = redact_sql(&definition, self.redact_sql_option_keywords.clone())
910 .unwrap_or_else(|| "[REDACTED]".into());
911 let created_at = if let Some(created_at_epoch) = created_at_epoch {
912 format!("{}", Epoch::from(created_at_epoch).as_timestamptz())
913 } else {
914 "".into()
915 };
916 row.add_cell(id.into());
917 row.add_cell(name.into());
918 row.add_cell(database_id.into());
919 row.add_cell(schema_id.into());
920 row.add_cell(created_at.into());
921 row.add_cell(may_redact.into());
922 table.add_row(row);
923 }
924 let _ = writeln!(s);
925 let _ = writeln!(s, "{title}");
926 let _ = writeln!(s, "{table}");
927 }
928
929 let actors = self
930 .metadata_manager
931 .catalog_controller
932 .list_actor_info()
933 .await?
934 .into_iter()
935 .map(|(actor_id, fragment_id, job_id, schema_id, obj_type)| {
936 (
937 actor_id,
938 (
939 fragment_id,
940 job_id,
941 schema_id,
942 obj_type,
943 obj_id_to_name.get(&job_id).cloned().unwrap_or_default(),
944 ),
945 )
946 })
947 .collect::<BTreeMap<_, _>>();
948
949 use comfy_table::{Row, Table};
950 let mut table = Table::new();
951 table.set_header({
952 let mut row = Row::new();
953 row.add_cell("id".into());
954 row.add_cell("fragment_id".into());
955 row.add_cell("job_id".into());
956 row.add_cell("schema_id".into());
957 row.add_cell("type".into());
958 row.add_cell("name".into());
959 row
960 });
961 for (actor_id, (fragment_id, job_id, schema_id, ddl_type, name)) in actors {
962 let mut row = Row::new();
963 row.add_cell(actor_id.into());
964 row.add_cell(fragment_id.into());
965 row.add_cell(job_id.into());
966 row.add_cell(schema_id.into());
967 row.add_cell(ddl_type.as_str().into());
968 row.add_cell(name.into());
969 table.add_row(row);
970 }
971 let _ = writeln!(s);
972 let _ = writeln!(s, "ACTOR");
973 let _ = writeln!(s, "{table}");
974 Ok(())
975 }
976
977 fn write_license(&self, s: &mut String) {
978 use comfy_table::presets::ASCII_BORDERS_ONLY;
979 use comfy_table::{ContentArrangement, Row, Table};
980
981 let mut table = Table::new();
982 table.load_preset(ASCII_BORDERS_ONLY);
983 table.set_content_arrangement(ContentArrangement::Dynamic);
984 table.set_header({
985 let mut row = Row::new();
986 row.add_cell("field".into());
987 row.add_cell("value".into());
988 row
989 });
990
991 match LicenseManager::get().license() {
992 Ok(license) => {
993 let fmt_option = |value: Option<u64>| match value {
994 Some(v) => v.to_string(),
995 None => "unlimited".to_owned(),
996 };
997
998 let expires_at = if license.exp == u64::MAX {
999 "never".to_owned()
1000 } else {
1001 let exp_i64 = license.exp as i64;
1002 chrono::DateTime::<chrono::Utc>::from_timestamp(exp_i64, 0)
1003 .map(|ts| ts.to_rfc3339())
1004 .unwrap_or_else(|| format!("invalid ({})", license.exp))
1005 };
1006
1007 let mut row = Row::new();
1008 row.add_cell("status".into());
1009 row.add_cell("valid".into());
1010 table.add_row(row);
1011
1012 let mut row = Row::new();
1013 row.add_cell("tier".into());
1014 row.add_cell(license.tier.name().into());
1015 table.add_row(row);
1016
1017 let mut row = Row::new();
1018 row.add_cell("expires_at".into());
1019 row.add_cell(expires_at.into());
1020 table.add_row(row);
1021
1022 let mut row = Row::new();
1023 row.add_cell("rwu_limit".into());
1024 row.add_cell(fmt_option(license.rwu_limit.map(|v| v.get())).into());
1025 table.add_row(row);
1026
1027 let mut row = Row::new();
1028 row.add_cell("cpu_core_limit".into());
1029 row.add_cell(fmt_option(license.cpu_core_limit()).into());
1030 table.add_row(row);
1031
1032 let mut row = Row::new();
1033 row.add_cell("memory_limit_bytes".into());
1034 row.add_cell(fmt_option(license.memory_limit()).into());
1035 table.add_row(row);
1036
1037 let mut features: Vec<_> = license
1038 .tier
1039 .available_features()
1040 .map(|f| f.name())
1041 .collect();
1042 features.sort_unstable();
1043 let feature_summary = format_features(&features);
1044
1045 let mut row = Row::new();
1046 row.add_cell("available_features".into());
1047 row.add_cell(feature_summary.into());
1048 table.add_row(row);
1049 }
1050 Err(error) => {
1051 let mut row = Row::new();
1052 row.add_cell("status".into());
1053 row.add_cell("invalid".into());
1054 table.add_row(row);
1055
1056 let mut row = Row::new();
1057 row.add_cell("error".into());
1058 row.add_cell(error.to_report_string().into());
1059 table.add_row(row);
1060 }
1061 }
1062
1063 let _ = writeln!(s, "LICENSE");
1064 let _ = writeln!(s, "{table}");
1065 }
1066
1067 async fn write_params(&self, s: &mut String) {
1068 let params = self.system_params_controller.get_params().await;
1069
1070 use comfy_table::{Row, Table};
1071 let mut table = Table::new();
1072 table.set_header({
1073 let mut row = Row::new();
1074 row.add_cell("key".into());
1075 row.add_cell("value".into());
1076 row
1077 });
1078
1079 let mut row = Row::new();
1080 row.add_cell("barrier_interval_ms".into());
1081 row.add_cell(params.barrier_interval_ms().to_string().into());
1082 table.add_row(row);
1083
1084 let mut row = Row::new();
1085 row.add_cell("checkpoint_frequency".into());
1086 row.add_cell(params.checkpoint_frequency().to_string().into());
1087 table.add_row(row);
1088
1089 let mut row = Row::new();
1090 row.add_cell("state_store".into());
1091 row.add_cell(params.state_store().to_owned().into());
1092 table.add_row(row);
1093
1094 let mut row = Row::new();
1095 row.add_cell("data_directory".into());
1096 row.add_cell(params.data_directory().to_owned().into());
1097 table.add_row(row);
1098
1099 let mut row = Row::new();
1100 row.add_cell("max_concurrent_creating_streaming_jobs".into());
1101 row.add_cell(
1102 params
1103 .max_concurrent_creating_streaming_jobs()
1104 .to_string()
1105 .into(),
1106 );
1107 table.add_row(row);
1108
1109 let mut row = Row::new();
1110 row.add_cell("time_travel_retention_ms".into());
1111 row.add_cell(params.time_travel_retention_ms().to_string().into());
1112 table.add_row(row);
1113
1114 let mut row = Row::new();
1115 row.add_cell("adaptive_parallelism_strategy".into());
1116 row.add_cell(params.adaptive_parallelism_strategy().to_string().into());
1117 table.add_row(row);
1118
1119 let mut row = Row::new();
1120 row.add_cell("per_database_isolation".into());
1121 row.add_cell(params.per_database_isolation().to_string().into());
1122 table.add_row(row);
1123
1124 let _ = writeln!(s, "SYSTEM PARAMS");
1125 let _ = writeln!(s, "{table}");
1126 }
1127}
1128
1129fn try_add_cell<T: Into<comfy_table::Cell>>(row: &mut comfy_table::Row, t: Option<T>) {
1130 match t {
1131 Some(t) => {
1132 row.add_cell(t.into());
1133 }
1134 None => {
1135 row.add_cell("".into());
1136 }
1137 }
1138}
1139
1140fn merge_prometheus_selector<'a>(selectors: impl IntoIterator<Item = &'a str>) -> String {
1141 selectors.into_iter().filter(|s| !s.is_empty()).join(",")
1142}
1143
1144fn redact_sql(sql: &str, keywords: RedactSqlOptionKeywordsRef) -> Option<String> {
1145 match Parser::parse_sql(sql) {
1146 Ok(sqls) => Some(
1147 sqls.into_iter()
1148 .map(|sql| sql.to_redacted_string(keywords.clone()))
1149 .join(";"),
1150 ),
1151 Err(_) => None,
1152 }
1153}
1154
1155fn format_features(features: &[&'static str]) -> String {
1156 if features.is_empty() {
1157 return "(none)".into();
1158 }
1159
1160 const PER_LINE: usize = 6;
1161 features
1162 .chunks(PER_LINE)
1163 .map(|chunk| format!(" {}", chunk.join(", ")))
1164 .collect::<Vec<_>>()
1165 .join("\n")
1166}
1167
1168fn format_job_status(status: JobStatus) -> &'static str {
1169 match status {
1170 JobStatus::Initial => "initial",
1171 JobStatus::Creating => "creating",
1172 JobStatus::Created => "created",
1173 }
1174}
1175
1176fn format_streaming_parallelism(parallelism: &StreamingParallelism) -> String {
1177 match parallelism {
1178 StreamingParallelism::Adaptive => "adaptive".into(),
1179 StreamingParallelism::Fixed(n) => format!("fixed({n})"),
1180 StreamingParallelism::Custom => "custom".into(),
1181 }
1182}