1use std::cmp::{Ordering, Reverse};
16use std::collections::{BTreeMap, BinaryHeap, HashMap};
17use std::fmt::Write;
18use std::sync::Arc;
19
20use itertools::Itertools;
21use prometheus_http_query::response::Data::Vector;
22use risingwave_common::id::ObjectId;
23use risingwave_common::system_param::reader::SystemParamsRead;
24use risingwave_common::types::Timestamptz;
25use risingwave_common::util::StackTraceResponseExt;
26use risingwave_common::util::epoch::Epoch;
27use risingwave_hummock_sdk::HummockSstableId;
28use risingwave_hummock_sdk::level::Level;
29use risingwave_license::LicenseManager;
30use risingwave_meta_model::{JobStatus, StreamingParallelism};
31use risingwave_pb::catalog::table::PbTableType;
32use risingwave_pb::common::WorkerType;
33use risingwave_pb::meta::EventLog;
34use risingwave_pb::meta::event_log::Event;
35use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
36use risingwave_sqlparser::ast::RedactSqlOptionKeywordsRef;
37use risingwave_sqlparser::parser::Parser;
38use serde_json::json;
39use thiserror_ext::AsReport;
40
41use crate::MetaResult;
42use crate::controller::system_param::SystemParamsControllerRef;
43use crate::hummock::HummockManagerRef;
44use crate::manager::MetadataManager;
45use crate::manager::event_log::EventLogManagerRef;
46use crate::rpc::await_tree::dump_cluster_await_tree;
47
48pub type DiagnoseCommandRef = Arc<DiagnoseCommand>;
49
50pub struct DiagnoseCommand {
51 metadata_manager: MetadataManager,
52 await_tree_reg: await_tree::Registry,
53 hummock_manger: HummockManagerRef,
54 event_log_manager: EventLogManagerRef,
55 prometheus_client: Option<prometheus_http_query::Client>,
56 prometheus_selector: String,
57 redact_sql_option_keywords: RedactSqlOptionKeywordsRef,
58 system_params_controller: SystemParamsControllerRef,
59}
60
61impl DiagnoseCommand {
62 pub fn new(
63 metadata_manager: MetadataManager,
64 await_tree_reg: await_tree::Registry,
65 hummock_manger: HummockManagerRef,
66 event_log_manager: EventLogManagerRef,
67 prometheus_client: Option<prometheus_http_query::Client>,
68 prometheus_selector: String,
69 redact_sql_option_keywords: RedactSqlOptionKeywordsRef,
70 system_params_controller: SystemParamsControllerRef,
71 ) -> Self {
72 Self {
73 metadata_manager,
74 await_tree_reg,
75 hummock_manger,
76 event_log_manager,
77 prometheus_client,
78 prometheus_selector,
79 redact_sql_option_keywords,
80 system_params_controller,
81 }
82 }
83
84 pub async fn report(&self, actor_traces_format: ActorTracesFormat) -> String {
85 let mut report = String::new();
86 let _ = writeln!(
87 report,
88 "report created at: {}\nversion: {}",
89 chrono::DateTime::<chrono::offset::Utc>::from(std::time::SystemTime::now()),
90 risingwave_common::current_cluster_version(),
91 );
92 let _ = writeln!(report);
93 self.write_license(&mut report);
94 let _ = writeln!(report);
95 self.write_catalog(&mut report).await;
96 let _ = writeln!(report);
97 self.write_worker_nodes(&mut report).await;
98 let _ = writeln!(report);
99 self.write_streaming_prometheus(&mut report).await;
100 let _ = writeln!(report);
101 self.write_storage(&mut report).await;
102 let _ = writeln!(report);
103 self.write_event_logs(&mut report);
104 let _ = writeln!(report);
105 self.write_params(&mut report).await;
106 let _ = writeln!(report);
107 self.write_await_tree(&mut report, actor_traces_format)
108 .await;
109
110 report
111 }
112
113 async fn write_catalog(&self, s: &mut String) {
114 self.write_catalog_inner(s).await;
115 let _ = self.write_table_definition(s).await.inspect_err(|e| {
116 tracing::warn!(
117 error = e.to_report_string(),
118 "failed to display table definition"
119 )
120 });
121 }
122
123 async fn write_catalog_inner(&self, s: &mut String) {
124 let stats = self.metadata_manager.catalog_controller.stats().await;
125
126 let stat = match stats {
127 Ok(stat) => stat,
128 Err(err) => {
129 tracing::warn!(error=?err.as_report(), "failed to get catalog stats");
130 return;
131 }
132 };
133 let _ = writeln!(s, "number of database: {}", stat.database_num);
134 let _ = writeln!(s, "number of fragment: {}", stat.streaming_job_num);
135 let _ = writeln!(s, "number of actor: {}", stat.actor_num);
136 let _ = writeln!(s, "number of source: {}", stat.source_num);
137 let _ = writeln!(s, "number of table: {}", stat.table_num);
138 let _ = writeln!(s, "number of materialized view: {}", stat.mview_num);
139 let _ = writeln!(s, "number of sink: {}", stat.sink_num);
140 let _ = writeln!(s, "number of index: {}", stat.index_num);
141 let _ = writeln!(s, "number of function: {}", stat.function_num);
142
143 self.write_databases(s).await;
144 self.write_schemas(s).await;
145 }
146
147 async fn write_databases(&self, s: &mut String) {
148 let databases = match self
149 .metadata_manager
150 .catalog_controller
151 .list_databases()
152 .await
153 {
154 Ok(databases) => databases,
155 Err(err) => {
156 tracing::warn!(error=?err.as_report(), "failed to list databases");
157 return;
158 }
159 };
160
161 use comfy_table::{Row, Table};
162 let mut table = Table::new();
163 table.set_header({
164 let mut row = Row::new();
165 row.add_cell("id".into());
166 row.add_cell("name".into());
167 row.add_cell("resource_group".into());
168 row.add_cell("barrier_interval_ms".into());
169 row.add_cell("checkpoint_frequency".into());
170 row
171 });
172 for db in databases {
173 let mut row = Row::new();
174 row.add_cell(db.id.into());
175 row.add_cell(db.name.into());
176 row.add_cell(db.resource_group.into());
177 row.add_cell(
178 db.barrier_interval_ms
179 .map(|v| v.to_string())
180 .unwrap_or("default".into())
181 .into(),
182 );
183 row.add_cell(
184 db.checkpoint_frequency
185 .map(|v| v.to_string())
186 .unwrap_or("default".into())
187 .into(),
188 );
189 table.add_row(row);
190 }
191
192 let _ = writeln!(s, "DATABASE");
193 let _ = writeln!(s, "{table}");
194 }
195
196 async fn write_schemas(&self, s: &mut String) {
197 let schemas = match self
198 .metadata_manager
199 .catalog_controller
200 .list_schemas()
201 .await
202 {
203 Ok(schemas) => schemas,
204 Err(err) => {
205 tracing::warn!(error=?err.as_report(), "failed to list schemas");
206 return;
207 }
208 };
209
210 use comfy_table::{Row, Table};
211 let mut table = Table::new();
212 table.set_header({
213 let mut row = Row::new();
214 row.add_cell("id".into());
215 row.add_cell("database_id".into());
216 row.add_cell("name".into());
217 row
218 });
219 for schema in schemas {
220 let mut row = Row::new();
221 row.add_cell(schema.id.into());
222 row.add_cell(schema.database_id.into());
223 row.add_cell(schema.name.into());
224 table.add_row(row);
225 }
226
227 let _ = writeln!(s, "SCHEMA");
228 let _ = writeln!(s, "{table}");
229 }
230
231 async fn write_worker_nodes(&self, s: &mut String) {
232 let Ok(worker_actor_count) = self.metadata_manager.worker_actor_count() else {
233 tracing::warn!("failed to get worker actor count");
234 return;
235 };
236
237 use comfy_table::{Row, Table};
238 let Ok(worker_nodes) = self.metadata_manager.list_worker_node(None, None).await else {
239 tracing::warn!("failed to get worker nodes");
240 return;
241 };
242 let mut table = Table::new();
243 table.set_header({
244 let mut row = Row::new();
245 row.add_cell("id".into());
246 row.add_cell("host".into());
247 row.add_cell("hostname".into());
248 row.add_cell("type".into());
249 row.add_cell("state".into());
250 row.add_cell("parallelism".into());
251 row.add_cell("resource_group".into());
252 row.add_cell("is_streaming".into());
253 row.add_cell("is_serving".into());
254 row.add_cell("is_iceberg_compactor".into());
255 row.add_cell("rw_version".into());
256 row.add_cell("total_memory_bytes".into());
257 row.add_cell("total_cpu_cores".into());
258 row.add_cell("started_at".into());
259 row.add_cell("actor_count".into());
260 row
261 });
262 for worker_node in worker_nodes {
263 let mut row = Row::new();
264 row.add_cell(worker_node.id.into());
265 try_add_cell(
266 &mut row,
267 worker_node
268 .host
269 .as_ref()
270 .map(|h| format!("{}:{}", h.host, h.port)),
271 );
272 try_add_cell(
273 &mut row,
274 worker_node.resource.as_ref().map(|r| r.hostname.clone()),
275 );
276 try_add_cell(
277 &mut row,
278 worker_node.get_type().ok().map(|t| t.as_str_name()),
279 );
280 try_add_cell(
281 &mut row,
282 worker_node.get_state().ok().map(|s| s.as_str_name()),
283 );
284 try_add_cell(&mut row, worker_node.parallelism());
285 try_add_cell(
286 &mut row,
287 worker_node
288 .property
289 .as_ref()
290 .map(|p| p.resource_group.clone().unwrap_or("".to_owned())),
291 );
292 let (is_streaming, is_serving) = {
294 if let Ok(t) = worker_node.get_type()
295 && t == WorkerType::ComputeNode
296 {
297 (
298 worker_node.property.as_ref().map(|p| p.is_streaming),
299 worker_node.property.as_ref().map(|p| p.is_serving),
300 )
301 } else {
302 (None, None)
303 }
304 };
305 try_add_cell(&mut row, is_streaming);
306 try_add_cell(&mut row, is_serving);
307 let is_iceberg_compactor = {
309 if let Ok(t) = worker_node.get_type()
310 && t == WorkerType::Compactor
311 {
312 worker_node
313 .property
314 .as_ref()
315 .map(|p| p.is_iceberg_compactor)
316 } else {
317 None
318 }
319 };
320 try_add_cell(&mut row, is_iceberg_compactor);
321 try_add_cell(
322 &mut row,
323 worker_node.resource.as_ref().map(|r| r.rw_version.clone()),
324 );
325 try_add_cell(
326 &mut row,
327 worker_node.resource.as_ref().map(|r| r.total_memory_bytes),
328 );
329 try_add_cell(
330 &mut row,
331 worker_node.resource.as_ref().map(|r| r.total_cpu_cores),
332 );
333 try_add_cell(
334 &mut row,
335 worker_node
336 .started_at
337 .and_then(|ts| Timestamptz::from_secs(ts as _).map(|t| t.to_string())),
338 );
339 let actor_count = {
340 if let Ok(t) = worker_node.get_type()
341 && t != WorkerType::ComputeNode
342 {
343 None
344 } else {
345 match worker_actor_count.get(&worker_node.id) {
346 None => Some(0),
347 Some(c) => Some(*c),
348 }
349 }
350 };
351 try_add_cell(&mut row, actor_count);
352 table.add_row(row);
353 }
354 let _ = writeln!(s, "{table}");
355 }
356
357 fn write_event_logs(&self, s: &mut String) {
358 let event_logs = self
359 .event_log_manager
360 .list_event_logs()
361 .into_iter()
362 .sorted_by(|a, b| {
363 a.timestamp
364 .unwrap_or(0)
365 .cmp(&b.timestamp.unwrap_or(0))
366 .reverse()
367 })
368 .collect_vec();
369
370 let _ = writeln!(s, "latest barrier completions");
371 Self::write_event_logs_impl(
372 s,
373 event_logs.iter(),
374 |e| {
375 let Event::BarrierComplete(info) = e else {
376 return None;
377 };
378 Some(json!(info).to_string())
379 },
380 Some(10),
381 );
382
383 let _ = writeln!(s);
384 let _ = writeln!(s, "latest barrier collection failures");
385 Self::write_event_logs_impl(
386 s,
387 event_logs.iter(),
388 |e| {
389 let Event::CollectBarrierFail(info) = e else {
390 return None;
391 };
392 Some(json!(info).to_string())
393 },
394 Some(3),
395 );
396
397 let _ = writeln!(s);
398 let _ = writeln!(s, "latest barrier injection failures");
399 Self::write_event_logs_impl(
400 s,
401 event_logs.iter(),
402 |e| {
403 let Event::InjectBarrierFail(info) = e else {
404 return None;
405 };
406 Some(json!(info).to_string())
407 },
408 Some(3),
409 );
410
411 let _ = writeln!(s);
412 let _ = writeln!(s, "latest worker node panics");
413 Self::write_event_logs_impl(
414 s,
415 event_logs.iter(),
416 |e| {
417 let Event::WorkerNodePanic(info) = e else {
418 return None;
419 };
420 Some(json!(info).to_string())
421 },
422 Some(10),
423 );
424
425 let _ = writeln!(s);
426 let _ = writeln!(s, "latest create stream job failures");
427 Self::write_event_logs_impl(
428 s,
429 event_logs.iter(),
430 |e| {
431 let Event::CreateStreamJobFail(info) = e else {
432 return None;
433 };
434 Some(json!(info).to_string())
435 },
436 Some(3),
437 );
438
439 let _ = writeln!(s);
440 let _ = writeln!(s, "latest dirty stream job clear-ups");
441 Self::write_event_logs_impl(
442 s,
443 event_logs.iter(),
444 |e| {
445 let Event::DirtyStreamJobClear(info) = e else {
446 return None;
447 };
448 Some(json!(info).to_string())
449 },
450 Some(3),
451 );
452 }
453
454 fn write_event_logs_impl<'a, F>(
455 s: &mut String,
456 event_logs: impl Iterator<Item = &'a EventLog>,
457 get_event_info: F,
458 limit: Option<usize>,
459 ) where
460 F: Fn(&Event) -> Option<String>,
461 {
462 use comfy_table::{Row, Table};
463 let mut table = Table::new();
464 table.set_header({
465 let mut row = Row::new();
466 row.add_cell("created_at".into());
467 row.add_cell("info".into());
468 row
469 });
470 let mut row_count = 0;
471 for event_log in event_logs {
472 let Some(ref inner) = event_log.event else {
473 continue;
474 };
475 if let Some(limit) = limit
476 && row_count >= limit
477 {
478 break;
479 }
480 let mut row = Row::new();
481 let ts = event_log
482 .timestamp
483 .and_then(|ts| Timestamptz::from_millis(ts as _).map(|ts| ts.to_string()));
484 try_add_cell(&mut row, ts);
485 if let Some(info) = get_event_info(inner) {
486 row.add_cell(info.into());
487 row_count += 1;
488 } else {
489 continue;
490 }
491 table.add_row(row);
492 }
493 let _ = writeln!(s, "{table}");
494 }
495
496 async fn write_storage(&self, s: &mut String) {
497 let mut sst_num = 0;
498 let mut sst_total_file_size = 0;
499 let back_pressured_compaction_groups = self
500 .hummock_manger
501 .write_limits()
502 .await
503 .into_iter()
504 .filter_map(|(k, v)| {
505 if v.table_ids.is_empty() {
506 None
507 } else {
508 Some(k)
509 }
510 })
511 .join(",");
512 if !back_pressured_compaction_groups.is_empty() {
513 let _ = writeln!(
514 s,
515 "back pressured compaction groups: {back_pressured_compaction_groups}"
516 );
517 }
518
519 #[derive(PartialEq, Eq)]
520 struct SstableSort {
521 compaction_group_id: u64,
522 sst_id: HummockSstableId,
523 delete_ratio: u64,
524 }
525 impl PartialOrd for SstableSort {
526 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
527 Some(self.cmp(other))
528 }
529 }
530 impl Ord for SstableSort {
531 fn cmp(&self, other: &Self) -> Ordering {
532 self.delete_ratio.cmp(&other.delete_ratio)
533 }
534 }
535 fn top_k_sstables(
536 top_k: usize,
537 heap: &mut BinaryHeap<Reverse<SstableSort>>,
538 e: SstableSort,
539 ) {
540 if heap.len() < top_k {
541 heap.push(Reverse(e));
542 } else if let Some(mut p) = heap.peek_mut()
543 && e.delete_ratio > p.0.delete_ratio
544 {
545 *p = Reverse(e);
546 }
547 }
548
549 let top_k = 10;
550 let mut top_tombstone_delete_sst = BinaryHeap::with_capacity(top_k);
551 let compaction_group_num = self
552 .hummock_manger
553 .on_current_version(|version| {
554 for compaction_group in version.levels.values() {
555 let mut visit_level = |level: &Level| {
556 sst_num += level.table_infos.len();
557 sst_total_file_size +=
558 level.table_infos.iter().map(|t| t.sst_size).sum::<u64>();
559 for sst in &level.table_infos {
560 if sst.total_key_count == 0 {
561 continue;
562 }
563 let tombstone_delete_ratio =
564 sst.stale_key_count * 10000 / sst.total_key_count;
565 let e = SstableSort {
566 compaction_group_id: compaction_group.group_id,
567 sst_id: sst.sst_id,
568 delete_ratio: tombstone_delete_ratio,
569 };
570 top_k_sstables(top_k, &mut top_tombstone_delete_sst, e);
571 }
572 };
573 let l0 = &compaction_group.l0;
574 for level in &l0.sub_levels {
576 visit_level(level);
577 }
578 for level in &compaction_group.levels {
579 visit_level(level);
580 }
581 }
582 version.levels.len()
583 })
584 .await;
585
586 let _ = writeln!(s, "number of SSTables: {sst_num}");
587 let _ = writeln!(s, "total size of SSTables (byte): {sst_total_file_size}");
588 let _ = writeln!(s, "number of compaction groups: {compaction_group_num}");
589 use comfy_table::{Row, Table};
590 fn format_table(heap: BinaryHeap<Reverse<SstableSort>>) -> Table {
591 let mut table = Table::new();
592 table.set_header({
593 let mut row = Row::new();
594 row.add_cell("compaction group id".into());
595 row.add_cell("sstable id".into());
596 row.add_cell("delete ratio".into());
597 row
598 });
599 for sst in &heap.into_sorted_vec() {
600 let mut row = Row::new();
601 row.add_cell(sst.0.compaction_group_id.into());
602 row.add_cell(sst.0.sst_id.into());
603 row.add_cell(format!("{:.2}%", sst.0.delete_ratio as f64 / 100.0).into());
604 table.add_row(row);
605 }
606 table
607 }
608 let _ = writeln!(s);
609 let _ = writeln!(s, "top tombstone delete ratio");
610 let _ = writeln!(s, "{}", format_table(top_tombstone_delete_sst));
611 let _ = writeln!(s);
612
613 let _ = writeln!(s);
614 self.write_storage_prometheus(s).await;
615 }
616
617 async fn write_streaming_prometheus(&self, s: &mut String) {
618 let _ = writeln!(s, "top sources by throughput (rows/s)");
619 let query = format!(
620 "topk(3, sum(rate(stream_source_output_rows_counts{{{}}}[10m]))by (source_name))",
621 self.prometheus_selector
622 );
623 self.write_instant_vector_impl(s, &query, vec!["source_name"])
624 .await;
625
626 let _ = writeln!(s);
627 let _ = writeln!(s, "top materialized views by throughput (rows/s)");
628 let query = format!(
629 "topk(3, sum(rate(stream_mview_input_row_count{{{}}}[10m]))by (table_id))",
630 self.prometheus_selector
631 );
632 self.write_instant_vector_impl(s, &query, vec!["table_id"])
633 .await;
634
635 let _ = writeln!(s);
636 let _ = writeln!(s, "top join executor by matched rows");
637 let query = format!(
638 "topk(3, histogram_quantile(0.9, sum(rate(stream_join_matched_join_keys_bucket{{{}}}[10m])) by (le, fragment_id, table_id)))",
639 self.prometheus_selector
640 );
641 self.write_instant_vector_impl(s, &query, vec!["table_id", "fragment_id"])
642 .await;
643 }
644
645 async fn write_storage_prometheus(&self, s: &mut String) {
646 let _ = writeln!(s, "top Hummock Get by duration (second)");
647 let query = format!(
648 "topk(3, histogram_quantile(0.9, sum(rate(state_store_get_duration_bucket{{{}}}[10m])) by (le, table_id)))",
649 self.prometheus_selector
650 );
651 self.write_instant_vector_impl(s, &query, vec!["table_id"])
652 .await;
653
654 let _ = writeln!(s);
655 let _ = writeln!(s, "top Hummock Iter Init by duration (second)");
656 let query = format!(
657 "topk(3, histogram_quantile(0.9, sum(rate(state_store_iter_init_duration_bucket{{{}}}[10m])) by (le, table_id)))",
658 self.prometheus_selector
659 );
660 self.write_instant_vector_impl(s, &query, vec!["table_id"])
661 .await;
662
663 let _ = writeln!(s);
664 let _ = writeln!(s, "top table commit flush by size (byte)");
665 let query = format!(
666 "topk(3, sum(rate(storage_commit_write_throughput{{{}}}[10m])) by (table_id))",
667 self.prometheus_selector
668 );
669 self.write_instant_vector_impl(s, &query, vec!["table_id"])
670 .await;
671
672 let _ = writeln!(s);
673 let _ = writeln!(s, "object store read throughput (bytes/s)");
674 let query = format!(
675 "sum(rate(object_store_read_bytes{{{}}}[10m])) by (job, instance)",
676 merge_prometheus_selector([&self.prometheus_selector, "job=~\"compute|compactor\""])
677 );
678 self.write_instant_vector_impl(s, &query, vec!["job", "instance"])
679 .await;
680
681 let _ = writeln!(s);
682 let _ = writeln!(s, "object store write throughput (bytes/s)");
683 let query = format!(
684 "sum(rate(object_store_write_bytes{{{}}}[10m])) by (job, instance)",
685 merge_prometheus_selector([&self.prometheus_selector, "job=~\"compute|compactor\""])
686 );
687 self.write_instant_vector_impl(s, &query, vec!["job", "instance"])
688 .await;
689
690 let _ = writeln!(s);
691 let _ = writeln!(s, "object store operation rate");
692 let query = format!(
693 "sum(rate(object_store_operation_latency_count{{{}}}[10m])) by (le, type, job, instance)",
694 merge_prometheus_selector([
695 &self.prometheus_selector,
696 "job=~\"compute|compactor\", type!~\"streaming_upload_write_bytes|streaming_read_read_bytes|streaming_read\""
697 ])
698 );
699 self.write_instant_vector_impl(s, &query, vec!["type", "job", "instance"])
700 .await;
701
702 let _ = writeln!(s);
703 let _ = writeln!(s, "object store operation duration (second)");
704 let query = format!(
705 "histogram_quantile(0.9, sum(rate(object_store_operation_latency_bucket{{{}}}[10m])) by (le, type, job, instance))",
706 merge_prometheus_selector([
707 &self.prometheus_selector,
708 "job=~\"compute|compactor\", type!~\"streaming_upload_write_bytes|streaming_read\""
709 ])
710 );
711 self.write_instant_vector_impl(s, &query, vec!["type", "job", "instance"])
712 .await;
713 }
714
715 async fn write_instant_vector_impl(&self, s: &mut String, query: &str, labels: Vec<&str>) {
716 let Some(ref client) = self.prometheus_client else {
717 return;
718 };
719 if let Ok(Vector(instant_vec)) = client
720 .query(query)
721 .get()
722 .await
723 .map(|result| result.into_inner().0)
724 {
725 for i in instant_vec {
726 let l = labels
727 .iter()
728 .map(|label| {
729 format!(
730 "{}={}",
731 *label,
732 i.metric()
733 .get(*label)
734 .map(|s| s.as_str())
735 .unwrap_or_default()
736 )
737 })
738 .join(",");
739 let _ = writeln!(s, "{}: {:.3}", l, i.sample().value());
740 }
741 }
742 }
743
744 async fn write_await_tree(&self, s: &mut String, actor_traces_format: ActorTracesFormat) {
745 let all = dump_cluster_await_tree(
746 &self.metadata_manager,
747 &self.await_tree_reg,
748 actor_traces_format,
749 )
750 .await;
751
752 if let Ok(all) = all {
753 write!(s, "{}", all.output()).unwrap();
754 } else {
755 tracing::warn!("failed to dump await tree");
756 }
757 }
758
759 async fn write_table_definition(&self, s: &mut String) -> MetaResult<()> {
760 let sources = self
761 .metadata_manager
762 .catalog_controller
763 .list_sources()
764 .await?
765 .into_iter()
766 .map(|s| {
767 (
768 s.id.into(),
769 (
770 s.name,
771 s.database_id,
772 s.schema_id,
773 s.definition,
774 s.created_at_epoch,
775 ),
776 )
777 })
778 .collect::<BTreeMap<_, _>>();
779 let mut user_tables = BTreeMap::new();
780 let mut mvs = BTreeMap::new();
781 let mut indexes = BTreeMap::new();
782 let mut internal_tables = BTreeMap::new();
783 {
784 let grouped = self
785 .metadata_manager
786 .catalog_controller
787 .list_all_state_tables()
788 .await?
789 .into_iter()
790 .chunk_by(|t| t.table_type());
791 for (table_type, tables) in &grouped {
792 let tables = tables.into_iter().map(|t| {
793 (
794 t.id.into(),
795 (
796 t.name,
797 t.database_id,
798 t.schema_id,
799 t.definition,
800 t.created_at_epoch,
801 ),
802 )
803 });
804 match table_type {
805 PbTableType::Table => user_tables.extend(tables),
806 PbTableType::MaterializedView => mvs.extend(tables),
807 PbTableType::Index | PbTableType::VectorIndex => indexes.extend(tables),
808 PbTableType::Internal => internal_tables.extend(tables),
809 PbTableType::Unspecified => {
810 tracing::error!("unspecified table type: {:?}", tables.collect_vec());
811 }
812 }
813 }
814 }
815 let sinks = self
816 .metadata_manager
817 .catalog_controller
818 .list_sinks()
819 .await?
820 .into_iter()
821 .map(|s| {
822 (
823 s.id.into(),
824 (
825 s.name,
826 s.database_id,
827 s.schema_id,
828 s.definition,
829 s.created_at_epoch,
830 ),
831 )
832 })
833 .collect::<BTreeMap<_, _>>();
834 let views = self
835 .metadata_manager
836 .catalog_controller
837 .list_views()
838 .await?
839 .into_iter()
840 .map(|v| {
841 (
842 v.id.into(),
843 (v.name, v.database_id, v.schema_id, v.sql, None),
844 )
845 })
846 .collect::<BTreeMap<_, _>>();
847 let mut streaming_jobs = self
848 .metadata_manager
849 .catalog_controller
850 .list_streaming_job_infos()
851 .await?;
852 streaming_jobs.sort_by_key(|info| (info.obj_type as usize, info.job_id));
853 {
854 use comfy_table::{Row, Table};
855 let mut table = Table::new();
856 table.set_header({
857 let mut row = Row::new();
858 row.add_cell("job_id".into());
859 row.add_cell("name".into());
860 row.add_cell("obj_type".into());
861 row.add_cell("state".into());
862 row.add_cell("parallelism".into());
863 row.add_cell("max_parallelism".into());
864 row.add_cell("resource_group".into());
865 row.add_cell("database_id".into());
866 row.add_cell("schema_id".into());
867 row.add_cell("config_override".into());
868 row
869 });
870 for job in streaming_jobs {
871 let mut row = Row::new();
872 row.add_cell(job.job_id.into());
873 row.add_cell(job.name.into());
874 row.add_cell(job.obj_type.as_str().into());
875 row.add_cell(format_job_status(job.job_status).into());
876 row.add_cell(format_streaming_parallelism(&job.parallelism).into());
877 row.add_cell(job.max_parallelism.into());
878 row.add_cell(job.resource_group.into());
879 row.add_cell(job.database_id.into());
880 row.add_cell(job.schema_id.into());
881 row.add_cell(job.config_override.into());
882 table.add_row(row);
883 }
884 let _ = writeln!(s);
885 let _ = writeln!(s, "STREAMING JOB");
886 let _ = writeln!(s, "{table}");
887 }
888 let catalogs = [
889 ("SOURCE", sources),
890 ("TABLE", user_tables),
891 ("MATERIALIZED VIEW", mvs),
892 ("INDEX", indexes),
893 ("SINK", sinks),
894 ("VIEW", views),
895 ("INTERNAL TABLE", internal_tables),
896 ];
897 let mut obj_id_to_name: HashMap<ObjectId, _> = HashMap::new();
898 for (title, items) in catalogs {
899 use comfy_table::{Row, Table};
900 let mut table = Table::new();
901 table.set_header({
902 let mut row = Row::new();
903 row.add_cell("id".into());
904 row.add_cell("name".into());
905 row.add_cell("database_id".into());
906 row.add_cell("schema_id".into());
907 row.add_cell("created_at".into());
908 row.add_cell("definition".into());
909 row
910 });
911 for (id, (name, database_id, schema_id, definition, created_at_epoch)) in items {
912 obj_id_to_name.insert(id, name.clone());
913 let mut row = Row::new();
914 let may_redact = redact_sql(&definition, self.redact_sql_option_keywords.clone())
915 .unwrap_or_else(|| "[REDACTED]".into());
916 let created_at = if let Some(created_at_epoch) = created_at_epoch {
917 format!("{}", Epoch::from(created_at_epoch).as_timestamptz())
918 } else {
919 "".into()
920 };
921 row.add_cell(id.into());
922 row.add_cell(name.into());
923 row.add_cell(database_id.into());
924 row.add_cell(schema_id.into());
925 row.add_cell(created_at.into());
926 row.add_cell(may_redact.into());
927 table.add_row(row);
928 }
929 let _ = writeln!(s);
930 let _ = writeln!(s, "{title}");
931 let _ = writeln!(s, "{table}");
932 }
933
934 let actors = self
935 .metadata_manager
936 .catalog_controller
937 .list_actor_info()
938 .await?
939 .into_iter()
940 .map(|(actor_id, fragment_id, job_id, schema_id, obj_type)| {
941 (
942 actor_id,
943 (
944 fragment_id,
945 job_id,
946 schema_id,
947 obj_type,
948 obj_id_to_name.get(&job_id).cloned().unwrap_or_default(),
949 ),
950 )
951 })
952 .collect::<BTreeMap<_, _>>();
953
954 use comfy_table::{Row, Table};
955 let mut table = Table::new();
956 table.set_header({
957 let mut row = Row::new();
958 row.add_cell("id".into());
959 row.add_cell("fragment_id".into());
960 row.add_cell("job_id".into());
961 row.add_cell("schema_id".into());
962 row.add_cell("type".into());
963 row.add_cell("name".into());
964 row
965 });
966 for (actor_id, (fragment_id, job_id, schema_id, ddl_type, name)) in actors {
967 let mut row = Row::new();
968 row.add_cell(actor_id.into());
969 row.add_cell(fragment_id.into());
970 row.add_cell(job_id.into());
971 row.add_cell(schema_id.into());
972 row.add_cell(ddl_type.as_str().into());
973 row.add_cell(name.into());
974 table.add_row(row);
975 }
976 let _ = writeln!(s);
977 let _ = writeln!(s, "ACTOR");
978 let _ = writeln!(s, "{table}");
979 Ok(())
980 }
981
982 fn write_license(&self, s: &mut String) {
983 use comfy_table::presets::ASCII_BORDERS_ONLY;
984 use comfy_table::{ContentArrangement, Row, Table};
985
986 let mut table = Table::new();
987 table.load_preset(ASCII_BORDERS_ONLY);
988 table.set_content_arrangement(ContentArrangement::Dynamic);
989 table.set_header({
990 let mut row = Row::new();
991 row.add_cell("field".into());
992 row.add_cell("value".into());
993 row
994 });
995
996 match LicenseManager::get().license() {
997 Ok(license) => {
998 let fmt_option = |value: Option<u64>| match value {
999 Some(v) => v.to_string(),
1000 None => "unlimited".to_owned(),
1001 };
1002
1003 let expires_at = if license.exp == u64::MAX {
1004 "never".to_owned()
1005 } else {
1006 let exp_i64 = license.exp as i64;
1007 chrono::DateTime::<chrono::Utc>::from_timestamp(exp_i64, 0)
1008 .map(|ts| ts.to_rfc3339())
1009 .unwrap_or_else(|| format!("invalid ({})", license.exp))
1010 };
1011
1012 let mut row = Row::new();
1013 row.add_cell("status".into());
1014 row.add_cell("valid".into());
1015 table.add_row(row);
1016
1017 let mut row = Row::new();
1018 row.add_cell("tier".into());
1019 row.add_cell(license.tier.name().into());
1020 table.add_row(row);
1021
1022 let mut row = Row::new();
1023 row.add_cell("expires_at".into());
1024 row.add_cell(expires_at.into());
1025 table.add_row(row);
1026
1027 let mut row = Row::new();
1028 row.add_cell("rwu_limit".into());
1029 row.add_cell(fmt_option(license.rwu_limit.map(|v| v.get())).into());
1030 table.add_row(row);
1031
1032 let mut row = Row::new();
1033 row.add_cell("cpu_core_limit".into());
1034 row.add_cell(fmt_option(license.cpu_core_limit()).into());
1035 table.add_row(row);
1036
1037 let mut row = Row::new();
1038 row.add_cell("memory_limit_bytes".into());
1039 row.add_cell(fmt_option(license.memory_limit()).into());
1040 table.add_row(row);
1041
1042 let mut features: Vec<_> = license
1043 .tier
1044 .available_features()
1045 .map(|f| f.name())
1046 .collect();
1047 features.sort_unstable();
1048 let feature_summary = format_features(&features);
1049
1050 let mut row = Row::new();
1051 row.add_cell("available_features".into());
1052 row.add_cell(feature_summary.into());
1053 table.add_row(row);
1054 }
1055 Err(error) => {
1056 let mut row = Row::new();
1057 row.add_cell("status".into());
1058 row.add_cell("invalid".into());
1059 table.add_row(row);
1060
1061 let mut row = Row::new();
1062 row.add_cell("error".into());
1063 row.add_cell(error.to_report_string().into());
1064 table.add_row(row);
1065 }
1066 }
1067
1068 let _ = writeln!(s, "LICENSE");
1069 let _ = writeln!(s, "{table}");
1070 }
1071
1072 async fn write_params(&self, s: &mut String) {
1073 let params = self.system_params_controller.get_params().await;
1074
1075 use comfy_table::{Row, Table};
1076 let mut table = Table::new();
1077 table.set_header({
1078 let mut row = Row::new();
1079 row.add_cell("key".into());
1080 row.add_cell("value".into());
1081 row
1082 });
1083
1084 let mut row = Row::new();
1085 row.add_cell("barrier_interval_ms".into());
1086 row.add_cell(params.barrier_interval_ms().to_string().into());
1087 table.add_row(row);
1088
1089 let mut row = Row::new();
1090 row.add_cell("checkpoint_frequency".into());
1091 row.add_cell(params.checkpoint_frequency().to_string().into());
1092 table.add_row(row);
1093
1094 let mut row = Row::new();
1095 row.add_cell("state_store".into());
1096 row.add_cell(params.state_store().to_owned().into());
1097 table.add_row(row);
1098
1099 let mut row = Row::new();
1100 row.add_cell("data_directory".into());
1101 row.add_cell(params.data_directory().to_owned().into());
1102 table.add_row(row);
1103
1104 let mut row = Row::new();
1105 row.add_cell("max_concurrent_creating_streaming_jobs".into());
1106 row.add_cell(
1107 params
1108 .max_concurrent_creating_streaming_jobs()
1109 .to_string()
1110 .into(),
1111 );
1112 table.add_row(row);
1113
1114 let mut row = Row::new();
1115 row.add_cell("time_travel_retention_ms".into());
1116 row.add_cell(params.time_travel_retention_ms().to_string().into());
1117 table.add_row(row);
1118
1119 let mut row = Row::new();
1120 row.add_cell("adaptive_parallelism_strategy".into());
1121 row.add_cell(params.adaptive_parallelism_strategy().to_string().into());
1122 table.add_row(row);
1123
1124 let mut row = Row::new();
1125 row.add_cell("per_database_isolation".into());
1126 row.add_cell(params.per_database_isolation().to_string().into());
1127 table.add_row(row);
1128
1129 let _ = writeln!(s, "SYSTEM PARAMS");
1130 let _ = writeln!(s, "{table}");
1131 }
1132}
1133
1134fn try_add_cell<T: Into<comfy_table::Cell>>(row: &mut comfy_table::Row, t: Option<T>) {
1135 match t {
1136 Some(t) => {
1137 row.add_cell(t.into());
1138 }
1139 None => {
1140 row.add_cell("".into());
1141 }
1142 }
1143}
1144
1145fn merge_prometheus_selector<'a>(selectors: impl IntoIterator<Item = &'a str>) -> String {
1146 selectors.into_iter().filter(|s| !s.is_empty()).join(",")
1147}
1148
1149fn redact_sql(sql: &str, keywords: RedactSqlOptionKeywordsRef) -> Option<String> {
1150 match Parser::parse_sql(sql) {
1151 Ok(sqls) => Some(
1152 sqls.into_iter()
1153 .map(|sql| sql.to_redacted_string(keywords.clone()))
1154 .join(";"),
1155 ),
1156 Err(_) => None,
1157 }
1158}
1159
1160fn format_features(features: &[&'static str]) -> String {
1161 if features.is_empty() {
1162 return "(none)".into();
1163 }
1164
1165 const PER_LINE: usize = 6;
1166 features
1167 .chunks(PER_LINE)
1168 .map(|chunk| format!(" {}", chunk.join(", ")))
1169 .collect::<Vec<_>>()
1170 .join("\n")
1171}
1172
1173fn format_job_status(status: JobStatus) -> &'static str {
1174 match status {
1175 JobStatus::Initial => "initial",
1176 JobStatus::Creating => "creating",
1177 JobStatus::Created => "created",
1178 }
1179}
1180
1181fn format_streaming_parallelism(parallelism: &StreamingParallelism) -> String {
1182 match parallelism {
1183 StreamingParallelism::Adaptive => "adaptive".into(),
1184 StreamingParallelism::Fixed(n) => format!("fixed({n})"),
1185 StreamingParallelism::Custom => "custom".into(),
1186 }
1187}