1use std::cmp::{Ordering, Reverse};
16use std::collections::{BTreeMap, BinaryHeap, HashMap};
17use std::fmt::Write;
18use std::sync::Arc;
19
20use itertools::Itertools;
21use prometheus_http_query::response::Data::Vector;
22use risingwave_common::id::ObjectId;
23use risingwave_common::types::Timestamptz;
24use risingwave_common::util::StackTraceResponseExt;
25use risingwave_common::util::epoch::Epoch;
26use risingwave_hummock_sdk::HummockSstableId;
27use risingwave_hummock_sdk::level::Level;
28use risingwave_license::LicenseManager;
29use risingwave_meta_model::{JobStatus, StreamingParallelism};
30use risingwave_pb::catalog::table::PbTableType;
31use risingwave_pb::common::WorkerType;
32use risingwave_pb::meta::EventLog;
33use risingwave_pb::meta::event_log::Event;
34use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
35use risingwave_sqlparser::ast::RedactSqlOptionKeywordsRef;
36use risingwave_sqlparser::parser::Parser;
37use serde_json::json;
38use thiserror_ext::AsReport;
39
40use crate::MetaResult;
41use crate::hummock::HummockManagerRef;
42use crate::manager::MetadataManager;
43use crate::manager::event_log::EventLogManagerRef;
44use crate::rpc::await_tree::dump_cluster_await_tree;
45
46pub type DiagnoseCommandRef = Arc<DiagnoseCommand>;
47
48pub struct DiagnoseCommand {
49 metadata_manager: MetadataManager,
50 await_tree_reg: await_tree::Registry,
51 hummock_manger: HummockManagerRef,
52 event_log_manager: EventLogManagerRef,
53 prometheus_client: Option<prometheus_http_query::Client>,
54 prometheus_selector: String,
55 redact_sql_option_keywords: RedactSqlOptionKeywordsRef,
56}
57
58impl DiagnoseCommand {
59 pub fn new(
60 metadata_manager: MetadataManager,
61 await_tree_reg: await_tree::Registry,
62 hummock_manger: HummockManagerRef,
63 event_log_manager: EventLogManagerRef,
64 prometheus_client: Option<prometheus_http_query::Client>,
65 prometheus_selector: String,
66 redact_sql_option_keywords: RedactSqlOptionKeywordsRef,
67 ) -> Self {
68 Self {
69 metadata_manager,
70 await_tree_reg,
71 hummock_manger,
72 event_log_manager,
73 prometheus_client,
74 prometheus_selector,
75 redact_sql_option_keywords,
76 }
77 }
78
79 pub async fn report(&self, actor_traces_format: ActorTracesFormat) -> String {
80 let mut report = String::new();
81 let _ = writeln!(
82 report,
83 "report created at: {}\nversion: {}",
84 chrono::DateTime::<chrono::offset::Utc>::from(std::time::SystemTime::now()),
85 risingwave_common::current_cluster_version(),
86 );
87 let _ = writeln!(report);
88 self.write_license(&mut report);
89 let _ = writeln!(report);
90 self.write_catalog(&mut report).await;
91 let _ = writeln!(report);
92 self.write_worker_nodes(&mut report).await;
93 let _ = writeln!(report);
94 self.write_streaming_prometheus(&mut report).await;
95 let _ = writeln!(report);
96 self.write_storage(&mut report).await;
97 let _ = writeln!(report);
98 self.write_await_tree(&mut report, actor_traces_format)
99 .await;
100 let _ = writeln!(report);
101 self.write_event_logs(&mut report);
102 report
103 }
104
105 async fn write_catalog(&self, s: &mut String) {
106 self.write_catalog_inner(s).await;
107 let _ = self.write_table_definition(s).await.inspect_err(|e| {
108 tracing::warn!(
109 error = e.to_report_string(),
110 "failed to display table definition"
111 )
112 });
113 }
114
115 async fn write_catalog_inner(&self, s: &mut String) {
116 let stats = self.metadata_manager.catalog_controller.stats().await;
117
118 let stat = match stats {
119 Ok(stat) => stat,
120 Err(err) => {
121 tracing::warn!(error=?err.as_report(), "failed to get catalog stats");
122 return;
123 }
124 };
125 let _ = writeln!(s, "number of fragment: {}", stat.streaming_job_num);
126 let _ = writeln!(s, "number of actor: {}", stat.actor_num);
127 let _ = writeln!(s, "number of source: {}", stat.source_num);
128 let _ = writeln!(s, "number of table: {}", stat.table_num);
129 let _ = writeln!(s, "number of materialized view: {}", stat.mview_num);
130 let _ = writeln!(s, "number of sink: {}", stat.sink_num);
131 let _ = writeln!(s, "number of index: {}", stat.index_num);
132 let _ = writeln!(s, "number of function: {}", stat.function_num);
133 }
134
135 async fn write_worker_nodes(&self, s: &mut String) {
136 let Ok(worker_actor_count) = self.metadata_manager.worker_actor_count() else {
137 tracing::warn!("failed to get worker actor count");
138 return;
139 };
140
141 use comfy_table::{Row, Table};
142 let Ok(worker_nodes) = self.metadata_manager.list_worker_node(None, None).await else {
143 tracing::warn!("failed to get worker nodes");
144 return;
145 };
146 let mut table = Table::new();
147 table.set_header({
148 let mut row = Row::new();
149 row.add_cell("id".into());
150 row.add_cell("host".into());
151 row.add_cell("type".into());
152 row.add_cell("state".into());
153 row.add_cell("parallelism".into());
154 row.add_cell("is_streaming".into());
155 row.add_cell("is_serving".into());
156 row.add_cell("is_iceberg_compactor".into());
157 row.add_cell("rw_version".into());
158 row.add_cell("total_memory_bytes".into());
159 row.add_cell("total_cpu_cores".into());
160 row.add_cell("started_at".into());
161 row.add_cell("actor_count".into());
162 row
163 });
164 for worker_node in worker_nodes {
165 let mut row = Row::new();
166 row.add_cell(worker_node.id.into());
167 try_add_cell(
168 &mut row,
169 worker_node
170 .host
171 .as_ref()
172 .map(|h| format!("{}:{}", h.host, h.port)),
173 );
174 try_add_cell(
175 &mut row,
176 worker_node.get_type().ok().map(|t| t.as_str_name()),
177 );
178 try_add_cell(
179 &mut row,
180 worker_node.get_state().ok().map(|s| s.as_str_name()),
181 );
182 try_add_cell(&mut row, worker_node.parallelism());
183 let (is_streaming, is_serving) = {
185 if let Ok(t) = worker_node.get_type()
186 && t == WorkerType::ComputeNode
187 {
188 (
189 worker_node.property.as_ref().map(|p| p.is_streaming),
190 worker_node.property.as_ref().map(|p| p.is_serving),
191 )
192 } else {
193 (None, None)
194 }
195 };
196 try_add_cell(&mut row, is_streaming);
197 try_add_cell(&mut row, is_serving);
198 let is_iceberg_compactor = {
200 if let Ok(t) = worker_node.get_type()
201 && t == WorkerType::Compactor
202 {
203 worker_node
204 .property
205 .as_ref()
206 .map(|p| p.is_iceberg_compactor)
207 } else {
208 None
209 }
210 };
211 try_add_cell(&mut row, is_iceberg_compactor);
212 try_add_cell(
213 &mut row,
214 worker_node.resource.as_ref().map(|r| r.rw_version.clone()),
215 );
216 try_add_cell(
217 &mut row,
218 worker_node.resource.as_ref().map(|r| r.total_memory_bytes),
219 );
220 try_add_cell(
221 &mut row,
222 worker_node.resource.as_ref().map(|r| r.total_cpu_cores),
223 );
224 try_add_cell(
225 &mut row,
226 worker_node
227 .started_at
228 .and_then(|ts| Timestamptz::from_secs(ts as _).map(|t| t.to_string())),
229 );
230 let actor_count = {
231 if let Ok(t) = worker_node.get_type()
232 && t != WorkerType::ComputeNode
233 {
234 None
235 } else {
236 match worker_actor_count.get(&worker_node.id) {
237 None => Some(0),
238 Some(c) => Some(*c),
239 }
240 }
241 };
242 try_add_cell(&mut row, actor_count);
243 table.add_row(row);
244 }
245 let _ = writeln!(s, "{table}");
246 }
247
248 fn write_event_logs(&self, s: &mut String) {
249 let event_logs = self
250 .event_log_manager
251 .list_event_logs()
252 .into_iter()
253 .sorted_by(|a, b| {
254 a.timestamp
255 .unwrap_or(0)
256 .cmp(&b.timestamp.unwrap_or(0))
257 .reverse()
258 })
259 .collect_vec();
260
261 let _ = writeln!(s, "latest barrier completions");
262 Self::write_event_logs_impl(
263 s,
264 event_logs.iter(),
265 |e| {
266 let Event::BarrierComplete(info) = e else {
267 return None;
268 };
269 Some(json!(info).to_string())
270 },
271 Some(10),
272 );
273
274 let _ = writeln!(s);
275 let _ = writeln!(s, "latest barrier collection failures");
276 Self::write_event_logs_impl(
277 s,
278 event_logs.iter(),
279 |e| {
280 let Event::CollectBarrierFail(info) = e else {
281 return None;
282 };
283 Some(json!(info).to_string())
284 },
285 Some(3),
286 );
287
288 let _ = writeln!(s);
289 let _ = writeln!(s, "latest barrier injection failures");
290 Self::write_event_logs_impl(
291 s,
292 event_logs.iter(),
293 |e| {
294 let Event::InjectBarrierFail(info) = e else {
295 return None;
296 };
297 Some(json!(info).to_string())
298 },
299 Some(3),
300 );
301
302 let _ = writeln!(s);
303 let _ = writeln!(s, "latest worker node panics");
304 Self::write_event_logs_impl(
305 s,
306 event_logs.iter(),
307 |e| {
308 let Event::WorkerNodePanic(info) = e else {
309 return None;
310 };
311 Some(json!(info).to_string())
312 },
313 Some(10),
314 );
315
316 let _ = writeln!(s);
317 let _ = writeln!(s, "latest create stream job failures");
318 Self::write_event_logs_impl(
319 s,
320 event_logs.iter(),
321 |e| {
322 let Event::CreateStreamJobFail(info) = e else {
323 return None;
324 };
325 Some(json!(info).to_string())
326 },
327 Some(3),
328 );
329
330 let _ = writeln!(s);
331 let _ = writeln!(s, "latest dirty stream job clear-ups");
332 Self::write_event_logs_impl(
333 s,
334 event_logs.iter(),
335 |e| {
336 let Event::DirtyStreamJobClear(info) = e else {
337 return None;
338 };
339 Some(json!(info).to_string())
340 },
341 Some(3),
342 );
343 }
344
345 fn write_event_logs_impl<'a, F>(
346 s: &mut String,
347 event_logs: impl Iterator<Item = &'a EventLog>,
348 get_event_info: F,
349 limit: Option<usize>,
350 ) where
351 F: Fn(&Event) -> Option<String>,
352 {
353 use comfy_table::{Row, Table};
354 let mut table = Table::new();
355 table.set_header({
356 let mut row = Row::new();
357 row.add_cell("created_at".into());
358 row.add_cell("info".into());
359 row
360 });
361 let mut row_count = 0;
362 for event_log in event_logs {
363 let Some(ref inner) = event_log.event else {
364 continue;
365 };
366 if let Some(limit) = limit
367 && row_count >= limit
368 {
369 break;
370 }
371 let mut row = Row::new();
372 let ts = event_log
373 .timestamp
374 .and_then(|ts| Timestamptz::from_millis(ts as _).map(|ts| ts.to_string()));
375 try_add_cell(&mut row, ts);
376 if let Some(info) = get_event_info(inner) {
377 row.add_cell(info.into());
378 row_count += 1;
379 } else {
380 continue;
381 }
382 table.add_row(row);
383 }
384 let _ = writeln!(s, "{table}");
385 }
386
387 async fn write_storage(&self, s: &mut String) {
388 let mut sst_num = 0;
389 let mut sst_total_file_size = 0;
390 let back_pressured_compaction_groups = self
391 .hummock_manger
392 .write_limits()
393 .await
394 .into_iter()
395 .filter_map(|(k, v)| {
396 if v.table_ids.is_empty() {
397 None
398 } else {
399 Some(k)
400 }
401 })
402 .join(",");
403 if !back_pressured_compaction_groups.is_empty() {
404 let _ = writeln!(
405 s,
406 "back pressured compaction groups: {back_pressured_compaction_groups}"
407 );
408 }
409
410 #[derive(PartialEq, Eq)]
411 struct SstableSort {
412 compaction_group_id: u64,
413 sst_id: HummockSstableId,
414 delete_ratio: u64,
415 }
416 impl PartialOrd for SstableSort {
417 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
418 Some(self.cmp(other))
419 }
420 }
421 impl Ord for SstableSort {
422 fn cmp(&self, other: &Self) -> Ordering {
423 self.delete_ratio.cmp(&other.delete_ratio)
424 }
425 }
426 fn top_k_sstables(
427 top_k: usize,
428 heap: &mut BinaryHeap<Reverse<SstableSort>>,
429 e: SstableSort,
430 ) {
431 if heap.len() < top_k {
432 heap.push(Reverse(e));
433 } else if let Some(mut p) = heap.peek_mut()
434 && e.delete_ratio > p.0.delete_ratio
435 {
436 *p = Reverse(e);
437 }
438 }
439
440 let top_k = 10;
441 let mut top_tombstone_delete_sst = BinaryHeap::with_capacity(top_k);
442 let compaction_group_num = self
443 .hummock_manger
444 .on_current_version(|version| {
445 for compaction_group in version.levels.values() {
446 let mut visit_level = |level: &Level| {
447 sst_num += level.table_infos.len();
448 sst_total_file_size +=
449 level.table_infos.iter().map(|t| t.sst_size).sum::<u64>();
450 for sst in &level.table_infos {
451 if sst.total_key_count == 0 {
452 continue;
453 }
454 let tombstone_delete_ratio =
455 sst.stale_key_count * 10000 / sst.total_key_count;
456 let e = SstableSort {
457 compaction_group_id: compaction_group.group_id,
458 sst_id: sst.sst_id,
459 delete_ratio: tombstone_delete_ratio,
460 };
461 top_k_sstables(top_k, &mut top_tombstone_delete_sst, e);
462 }
463 };
464 let l0 = &compaction_group.l0;
465 for level in &l0.sub_levels {
467 visit_level(level);
468 }
469 for level in &compaction_group.levels {
470 visit_level(level);
471 }
472 }
473 version.levels.len()
474 })
475 .await;
476
477 let _ = writeln!(s, "number of SSTables: {sst_num}");
478 let _ = writeln!(s, "total size of SSTables (byte): {sst_total_file_size}");
479 let _ = writeln!(s, "number of compaction groups: {compaction_group_num}");
480 use comfy_table::{Row, Table};
481 fn format_table(heap: BinaryHeap<Reverse<SstableSort>>) -> Table {
482 let mut table = Table::new();
483 table.set_header({
484 let mut row = Row::new();
485 row.add_cell("compaction group id".into());
486 row.add_cell("sstable id".into());
487 row.add_cell("delete ratio".into());
488 row
489 });
490 for sst in &heap.into_sorted_vec() {
491 let mut row = Row::new();
492 row.add_cell(sst.0.compaction_group_id.into());
493 row.add_cell(sst.0.sst_id.into());
494 row.add_cell(format!("{:.2}%", sst.0.delete_ratio as f64 / 100.0).into());
495 table.add_row(row);
496 }
497 table
498 }
499 let _ = writeln!(s);
500 let _ = writeln!(s, "top tombstone delete ratio");
501 let _ = writeln!(s, "{}", format_table(top_tombstone_delete_sst));
502 let _ = writeln!(s);
503
504 let _ = writeln!(s);
505 self.write_storage_prometheus(s).await;
506 }
507
508 async fn write_streaming_prometheus(&self, s: &mut String) {
509 let _ = writeln!(s, "top sources by throughput (rows/s)");
510 let query = format!(
511 "topk(3, sum(rate(stream_source_output_rows_counts{{{}}}[10m]))by (source_name))",
512 self.prometheus_selector
513 );
514 self.write_instant_vector_impl(s, &query, vec!["source_name"])
515 .await;
516
517 let _ = writeln!(s);
518 let _ = writeln!(s, "top materialized views by throughput (rows/s)");
519 let query = format!(
520 "topk(3, sum(rate(stream_mview_input_row_count{{{}}}[10m]))by (table_id))",
521 self.prometheus_selector
522 );
523 self.write_instant_vector_impl(s, &query, vec!["table_id"])
524 .await;
525
526 let _ = writeln!(s);
527 let _ = writeln!(s, "top join executor by matched rows");
528 let query = format!(
529 "topk(3, histogram_quantile(0.9, sum(rate(stream_join_matched_join_keys_bucket{{{}}}[10m])) by (le, fragment_id, table_id)))",
530 self.prometheus_selector
531 );
532 self.write_instant_vector_impl(s, &query, vec!["table_id", "fragment_id"])
533 .await;
534 }
535
536 async fn write_storage_prometheus(&self, s: &mut String) {
537 let _ = writeln!(s, "top Hummock Get by duration (second)");
538 let query = format!(
539 "topk(3, histogram_quantile(0.9, sum(rate(state_store_get_duration_bucket{{{}}}[10m])) by (le, table_id)))",
540 self.prometheus_selector
541 );
542 self.write_instant_vector_impl(s, &query, vec!["table_id"])
543 .await;
544
545 let _ = writeln!(s);
546 let _ = writeln!(s, "top Hummock Iter Init by duration (second)");
547 let query = format!(
548 "topk(3, histogram_quantile(0.9, sum(rate(state_store_iter_init_duration_bucket{{{}}}[10m])) by (le, table_id)))",
549 self.prometheus_selector
550 );
551 self.write_instant_vector_impl(s, &query, vec!["table_id"])
552 .await;
553
554 let _ = writeln!(s);
555 let _ = writeln!(s, "top table commit flush by size (byte)");
556 let query = format!(
557 "topk(3, sum(rate(storage_commit_write_throughput{{{}}}[10m])) by (table_id))",
558 self.prometheus_selector
559 );
560 self.write_instant_vector_impl(s, &query, vec!["table_id"])
561 .await;
562
563 let _ = writeln!(s);
564 let _ = writeln!(s, "object store read throughput (bytes/s)");
565 let query = format!(
566 "sum(rate(object_store_read_bytes{{{}}}[10m])) by (job, instance)",
567 merge_prometheus_selector([&self.prometheus_selector, "job=~\"compute|compactor\""])
568 );
569 self.write_instant_vector_impl(s, &query, vec!["job", "instance"])
570 .await;
571
572 let _ = writeln!(s);
573 let _ = writeln!(s, "object store write throughput (bytes/s)");
574 let query = format!(
575 "sum(rate(object_store_write_bytes{{{}}}[10m])) by (job, instance)",
576 merge_prometheus_selector([&self.prometheus_selector, "job=~\"compute|compactor\""])
577 );
578 self.write_instant_vector_impl(s, &query, vec!["job", "instance"])
579 .await;
580
581 let _ = writeln!(s);
582 let _ = writeln!(s, "object store operation rate");
583 let query = format!(
584 "sum(rate(object_store_operation_latency_count{{{}}}[10m])) by (le, type, job, instance)",
585 merge_prometheus_selector([
586 &self.prometheus_selector,
587 "job=~\"compute|compactor\", type!~\"streaming_upload_write_bytes|streaming_read_read_bytes|streaming_read\""
588 ])
589 );
590 self.write_instant_vector_impl(s, &query, vec!["type", "job", "instance"])
591 .await;
592
593 let _ = writeln!(s);
594 let _ = writeln!(s, "object store operation duration (second)");
595 let query = format!(
596 "histogram_quantile(0.9, sum(rate(object_store_operation_latency_bucket{{{}}}[10m])) by (le, type, job, instance))",
597 merge_prometheus_selector([
598 &self.prometheus_selector,
599 "job=~\"compute|compactor\", type!~\"streaming_upload_write_bytes|streaming_read\""
600 ])
601 );
602 self.write_instant_vector_impl(s, &query, vec!["type", "job", "instance"])
603 .await;
604 }
605
606 async fn write_instant_vector_impl(&self, s: &mut String, query: &str, labels: Vec<&str>) {
607 let Some(ref client) = self.prometheus_client else {
608 return;
609 };
610 if let Ok(Vector(instant_vec)) = client
611 .query(query)
612 .get()
613 .await
614 .map(|result| result.into_inner().0)
615 {
616 for i in instant_vec {
617 let l = labels
618 .iter()
619 .map(|label| {
620 format!(
621 "{}={}",
622 *label,
623 i.metric()
624 .get(*label)
625 .map(|s| s.as_str())
626 .unwrap_or_default()
627 )
628 })
629 .join(",");
630 let _ = writeln!(s, "{}: {:.3}", l, i.sample().value());
631 }
632 }
633 }
634
635 async fn write_await_tree(&self, s: &mut String, actor_traces_format: ActorTracesFormat) {
636 let all = dump_cluster_await_tree(
637 &self.metadata_manager,
638 &self.await_tree_reg,
639 actor_traces_format,
640 )
641 .await;
642
643 if let Ok(all) = all {
644 write!(s, "{}", all.output()).unwrap();
645 } else {
646 tracing::warn!("failed to dump await tree");
647 }
648 }
649
650 async fn write_table_definition(&self, s: &mut String) -> MetaResult<()> {
651 let sources = self
652 .metadata_manager
653 .catalog_controller
654 .list_sources()
655 .await?
656 .into_iter()
657 .map(|s| {
658 (
659 s.id.into(),
660 (s.name, s.schema_id, s.definition, s.created_at_epoch),
661 )
662 })
663 .collect::<BTreeMap<_, _>>();
664 let mut user_tables = BTreeMap::new();
665 let mut mvs = BTreeMap::new();
666 let mut indexes = BTreeMap::new();
667 let mut internal_tables = BTreeMap::new();
668 {
669 let grouped = self
670 .metadata_manager
671 .catalog_controller
672 .list_all_state_tables()
673 .await?
674 .into_iter()
675 .chunk_by(|t| t.table_type());
676 for (table_type, tables) in &grouped {
677 let tables = tables.into_iter().map(|t| {
678 (
679 t.id.into(),
680 (t.name, t.schema_id, t.definition, t.created_at_epoch),
681 )
682 });
683 match table_type {
684 PbTableType::Table => user_tables.extend(tables),
685 PbTableType::MaterializedView => mvs.extend(tables),
686 PbTableType::Index | PbTableType::VectorIndex => indexes.extend(tables),
687 PbTableType::Internal => internal_tables.extend(tables),
688 PbTableType::Unspecified => {
689 tracing::error!("unspecified table type: {:?}", tables.collect_vec());
690 }
691 }
692 }
693 }
694 let sinks = self
695 .metadata_manager
696 .catalog_controller
697 .list_sinks()
698 .await?
699 .into_iter()
700 .map(|s| {
701 (
702 s.id.into(),
703 (s.name, s.schema_id, s.definition, s.created_at_epoch),
704 )
705 })
706 .collect::<BTreeMap<_, _>>();
707 let views = self
708 .metadata_manager
709 .catalog_controller
710 .list_views()
711 .await?
712 .into_iter()
713 .map(|v| (v.id.into(), (v.name, v.schema_id, v.sql, None)))
714 .collect::<BTreeMap<_, _>>();
715 let mut streaming_jobs = self
716 .metadata_manager
717 .catalog_controller
718 .list_streaming_job_infos()
719 .await?;
720 streaming_jobs.sort_by_key(|info| (info.obj_type as usize, info.job_id));
721 {
722 use comfy_table::{Row, Table};
723 let mut table = Table::new();
724 table.set_header({
725 let mut row = Row::new();
726 row.add_cell("job_id".into());
727 row.add_cell("name".into());
728 row.add_cell("obj_type".into());
729 row.add_cell("state".into());
730 row.add_cell("parallelism".into());
731 row.add_cell("max_parallelism".into());
732 row.add_cell("resource_group".into());
733 row.add_cell("database_id".into());
734 row.add_cell("schema_id".into());
735 row.add_cell("config_override".into());
736 row
737 });
738 for job in streaming_jobs {
739 let mut row = Row::new();
740 row.add_cell(job.job_id.into());
741 row.add_cell(job.name.into());
742 row.add_cell(job.obj_type.as_str().into());
743 row.add_cell(format_job_status(job.job_status).into());
744 row.add_cell(format_streaming_parallelism(&job.parallelism).into());
745 row.add_cell(job.max_parallelism.into());
746 row.add_cell(job.resource_group.into());
747 row.add_cell(job.database_id.into());
748 row.add_cell(job.schema_id.into());
749 row.add_cell(job.config_override.into());
750 table.add_row(row);
751 }
752 let _ = writeln!(s);
753 let _ = writeln!(s, "STREAMING JOB");
754 let _ = writeln!(s, "{table}");
755 }
756 let catalogs = [
757 ("SOURCE", sources),
758 ("TABLE", user_tables),
759 ("MATERIALIZED VIEW", mvs),
760 ("INDEX", indexes),
761 ("SINK", sinks),
762 ("VIEW", views),
763 ("INTERNAL TABLE", internal_tables),
764 ];
765 let mut obj_id_to_name: HashMap<ObjectId, _> = HashMap::new();
766 for (title, items) in catalogs {
767 use comfy_table::{Row, Table};
768 let mut table = Table::new();
769 table.set_header({
770 let mut row = Row::new();
771 row.add_cell("id".into());
772 row.add_cell("name".into());
773 row.add_cell("schema_id".into());
774 row.add_cell("created_at".into());
775 row.add_cell("definition".into());
776 row
777 });
778 for (id, (name, schema_id, definition, created_at_epoch)) in items {
779 obj_id_to_name.insert(id, name.clone());
780 let mut row = Row::new();
781 let may_redact = redact_sql(&definition, self.redact_sql_option_keywords.clone())
782 .unwrap_or_else(|| "[REDACTED]".into());
783 let created_at = if let Some(created_at_epoch) = created_at_epoch {
784 format!("{}", Epoch::from(created_at_epoch).as_timestamptz())
785 } else {
786 "".into()
787 };
788 row.add_cell(id.into());
789 row.add_cell(name.into());
790 row.add_cell(schema_id.into());
791 row.add_cell(created_at.into());
792 row.add_cell(may_redact.into());
793 table.add_row(row);
794 }
795 let _ = writeln!(s);
796 let _ = writeln!(s, "{title}");
797 let _ = writeln!(s, "{table}");
798 }
799
800 let actors = self
801 .metadata_manager
802 .catalog_controller
803 .list_actor_info()
804 .await?
805 .into_iter()
806 .map(|(actor_id, fragment_id, job_id, schema_id, obj_type)| {
807 (
808 actor_id,
809 (
810 fragment_id,
811 job_id,
812 schema_id,
813 obj_type,
814 obj_id_to_name.get(&job_id).cloned().unwrap_or_default(),
815 ),
816 )
817 })
818 .collect::<BTreeMap<_, _>>();
819
820 use comfy_table::{Row, Table};
821 let mut table = Table::new();
822 table.set_header({
823 let mut row = Row::new();
824 row.add_cell("id".into());
825 row.add_cell("fragment_id".into());
826 row.add_cell("job_id".into());
827 row.add_cell("schema_id".into());
828 row.add_cell("type".into());
829 row.add_cell("name".into());
830 row
831 });
832 for (actor_id, (fragment_id, job_id, schema_id, ddl_type, name)) in actors {
833 let mut row = Row::new();
834 row.add_cell(actor_id.into());
835 row.add_cell(fragment_id.into());
836 row.add_cell(job_id.into());
837 row.add_cell(schema_id.into());
838 row.add_cell(ddl_type.as_str().into());
839 row.add_cell(name.into());
840 table.add_row(row);
841 }
842 let _ = writeln!(s);
843 let _ = writeln!(s, "ACTOR");
844 let _ = writeln!(s, "{table}");
845 Ok(())
846 }
847
848 fn write_license(&self, s: &mut String) {
849 use comfy_table::presets::ASCII_BORDERS_ONLY;
850 use comfy_table::{ContentArrangement, Row, Table};
851
852 let mut table = Table::new();
853 table.load_preset(ASCII_BORDERS_ONLY);
854 table.set_content_arrangement(ContentArrangement::Dynamic);
855 table.set_header({
856 let mut row = Row::new();
857 row.add_cell("field".into());
858 row.add_cell("value".into());
859 row
860 });
861
862 match LicenseManager::get().license() {
863 Ok(license) => {
864 let fmt_option = |value: Option<u64>| match value {
865 Some(v) => v.to_string(),
866 None => "unlimited".to_owned(),
867 };
868
869 let expires_at = if license.exp == u64::MAX {
870 "never".to_owned()
871 } else {
872 let exp_i64 = license.exp as i64;
873 chrono::DateTime::<chrono::Utc>::from_timestamp(exp_i64, 0)
874 .map(|ts| ts.to_rfc3339())
875 .unwrap_or_else(|| format!("invalid ({})", license.exp))
876 };
877
878 let mut row = Row::new();
879 row.add_cell("status".into());
880 row.add_cell("valid".into());
881 table.add_row(row);
882
883 let mut row = Row::new();
884 row.add_cell("tier".into());
885 row.add_cell(license.tier.name().into());
886 table.add_row(row);
887
888 let mut row = Row::new();
889 row.add_cell("expires_at".into());
890 row.add_cell(expires_at.into());
891 table.add_row(row);
892
893 let mut row = Row::new();
894 row.add_cell("rwu_limit".into());
895 row.add_cell(fmt_option(license.rwu_limit.map(|v| v.get())).into());
896 table.add_row(row);
897
898 let mut row = Row::new();
899 row.add_cell("cpu_core_limit".into());
900 row.add_cell(fmt_option(license.cpu_core_limit()).into());
901 table.add_row(row);
902
903 let mut row = Row::new();
904 row.add_cell("memory_limit_bytes".into());
905 row.add_cell(fmt_option(license.memory_limit()).into());
906 table.add_row(row);
907
908 let mut features: Vec<_> = license
909 .tier
910 .available_features()
911 .map(|f| f.name())
912 .collect();
913 features.sort_unstable();
914 let feature_summary = format_features(&features);
915
916 let mut row = Row::new();
917 row.add_cell("available_features".into());
918 row.add_cell(feature_summary.into());
919 table.add_row(row);
920 }
921 Err(error) => {
922 let mut row = Row::new();
923 row.add_cell("status".into());
924 row.add_cell("invalid".into());
925 table.add_row(row);
926
927 let mut row = Row::new();
928 row.add_cell("error".into());
929 row.add_cell(error.to_report_string().into());
930 table.add_row(row);
931 }
932 }
933
934 let _ = writeln!(s, "LICENSE");
935 let _ = writeln!(s, "{table}");
936 }
937}
938
939fn try_add_cell<T: Into<comfy_table::Cell>>(row: &mut comfy_table::Row, t: Option<T>) {
940 match t {
941 Some(t) => {
942 row.add_cell(t.into());
943 }
944 None => {
945 row.add_cell("".into());
946 }
947 }
948}
949
950fn merge_prometheus_selector<'a>(selectors: impl IntoIterator<Item = &'a str>) -> String {
951 selectors.into_iter().filter(|s| !s.is_empty()).join(",")
952}
953
954fn redact_sql(sql: &str, keywords: RedactSqlOptionKeywordsRef) -> Option<String> {
955 match Parser::parse_sql(sql) {
956 Ok(sqls) => Some(
957 sqls.into_iter()
958 .map(|sql| sql.to_redacted_string(keywords.clone()))
959 .join(";"),
960 ),
961 Err(_) => None,
962 }
963}
964
965fn format_features(features: &[&'static str]) -> String {
966 if features.is_empty() {
967 return "(none)".into();
968 }
969
970 const PER_LINE: usize = 6;
971 features
972 .chunks(PER_LINE)
973 .map(|chunk| format!(" {}", chunk.join(", ")))
974 .collect::<Vec<_>>()
975 .join("\n")
976}
977
978fn format_job_status(status: JobStatus) -> &'static str {
979 match status {
980 JobStatus::Initial => "initial",
981 JobStatus::Creating => "creating",
982 JobStatus::Created => "created",
983 }
984}
985
986fn format_streaming_parallelism(parallelism: &StreamingParallelism) -> String {
987 match parallelism {
988 StreamingParallelism::Adaptive => "adaptive".into(),
989 StreamingParallelism::Fixed(n) => format!("fixed({n})"),
990 StreamingParallelism::Custom => "custom".into(),
991 }
992}