1use std::cmp::{Ordering, Reverse};
16use std::collections::{BTreeMap, BinaryHeap, HashMap};
17use std::fmt::Write;
18use std::sync::Arc;
19
20use itertools::Itertools;
21use prometheus_http_query::response::Data::Vector;
22use risingwave_common::id::ObjectId;
23use risingwave_common::system_param::reader::SystemParamsRead;
24use risingwave_common::types::Timestamptz;
25use risingwave_common::util::StackTraceResponseExt;
26use risingwave_common::util::epoch::Epoch;
27use risingwave_hummock_sdk::level::Level;
28use risingwave_hummock_sdk::{CompactionGroupId, HummockSstableId};
29use risingwave_license::LicenseManager;
30use risingwave_meta_model::{JobStatus, StreamingParallelism};
31use risingwave_pb::catalog::table::PbTableType;
32use risingwave_pb::common::WorkerType;
33use risingwave_pb::meta::EventLog;
34use risingwave_pb::meta::event_log::Event;
35use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
36use risingwave_sqlparser::ast::RedactSqlOptionKeywordsRef;
37use risingwave_sqlparser::parser::Parser;
38use serde_json::json;
39use thiserror_ext::AsReport;
40
41use crate::MetaResult;
42use crate::controller::system_param::SystemParamsControllerRef;
43use crate::hummock::HummockManagerRef;
44use crate::manager::MetadataManager;
45use crate::manager::event_log::EventLogManagerRef;
46use crate::manager::iceberg_compaction::IcebergCompactionManagerRef;
47use crate::rpc::await_tree::dump_cluster_await_tree;
48
49pub type DiagnoseCommandRef = Arc<DiagnoseCommand>;
50
51pub struct DiagnoseCommand {
52 metadata_manager: MetadataManager,
53 await_tree_reg: await_tree::Registry,
54 hummock_manger: HummockManagerRef,
55 iceberg_compaction_manager: IcebergCompactionManagerRef,
56 event_log_manager: EventLogManagerRef,
57 prometheus_client: Option<prometheus_http_query::Client>,
58 prometheus_selector: String,
59 redact_sql_option_keywords: RedactSqlOptionKeywordsRef,
60 system_params_controller: SystemParamsControllerRef,
61}
62
63impl DiagnoseCommand {
64 pub fn new(
65 metadata_manager: MetadataManager,
66 await_tree_reg: await_tree::Registry,
67 hummock_manger: HummockManagerRef,
68 iceberg_compaction_manager: IcebergCompactionManagerRef,
69 event_log_manager: EventLogManagerRef,
70 prometheus_client: Option<prometheus_http_query::Client>,
71 prometheus_selector: String,
72 redact_sql_option_keywords: RedactSqlOptionKeywordsRef,
73 system_params_controller: SystemParamsControllerRef,
74 ) -> Self {
75 Self {
76 metadata_manager,
77 await_tree_reg,
78 hummock_manger,
79 iceberg_compaction_manager,
80 event_log_manager,
81 prometheus_client,
82 prometheus_selector,
83 redact_sql_option_keywords,
84 system_params_controller,
85 }
86 }
87
88 pub async fn report(&self, actor_traces_format: ActorTracesFormat) -> String {
89 let mut report = String::new();
90 let _ = writeln!(
91 report,
92 "report created at: {}\nversion: {}",
93 chrono::DateTime::<chrono::offset::Utc>::from(std::time::SystemTime::now()),
94 risingwave_common::current_cluster_version(),
95 );
96 let _ = writeln!(report);
97 self.write_license(&mut report);
98 let _ = writeln!(report);
99 self.write_catalog(&mut report).await;
100 let _ = writeln!(report);
101 self.write_worker_nodes(&mut report).await;
102 let _ = writeln!(report);
103 self.write_streaming_prometheus(&mut report).await;
104 let _ = writeln!(report);
105 self.write_storage(&mut report).await;
106 let _ = writeln!(report);
107 self.write_iceberg_compaction_schedule(&mut report).await;
108 let _ = writeln!(report);
109 self.write_event_logs(&mut report);
110 let _ = writeln!(report);
111 self.write_params(&mut report).await;
112 let _ = writeln!(report);
113 self.write_await_tree(&mut report, actor_traces_format)
114 .await;
115
116 report
117 }
118
119 async fn write_catalog(&self, s: &mut String) {
120 self.write_catalog_inner(s).await;
121 let _ = self.write_table_definition(s).await.inspect_err(|e| {
122 tracing::warn!(
123 error = e.to_report_string(),
124 "failed to display table definition"
125 )
126 });
127 }
128
129 async fn write_catalog_inner(&self, s: &mut String) {
130 let stats = self.metadata_manager.catalog_controller.stats().await;
131
132 let stat = match stats {
133 Ok(stat) => stat,
134 Err(err) => {
135 tracing::warn!(error=?err.as_report(), "failed to get catalog stats");
136 return;
137 }
138 };
139 let _ = writeln!(s, "number of database: {}", stat.database_num);
140 let _ = writeln!(s, "number of fragment: {}", stat.streaming_job_num);
141 let _ = writeln!(s, "number of actor: {}", stat.actor_num);
142 let _ = writeln!(s, "number of source: {}", stat.source_num);
143 let _ = writeln!(s, "number of table: {}", stat.table_num);
144 let _ = writeln!(s, "number of materialized view: {}", stat.mview_num);
145 let _ = writeln!(s, "number of sink: {}", stat.sink_num);
146 let _ = writeln!(s, "number of index: {}", stat.index_num);
147 let _ = writeln!(s, "number of function: {}", stat.function_num);
148
149 self.write_databases(s).await;
150 self.write_schemas(s).await;
151 }
152
153 async fn write_databases(&self, s: &mut String) {
154 let databases = match self
155 .metadata_manager
156 .catalog_controller
157 .list_databases()
158 .await
159 {
160 Ok(databases) => databases,
161 Err(err) => {
162 tracing::warn!(error=?err.as_report(), "failed to list databases");
163 return;
164 }
165 };
166
167 use comfy_table::{Row, Table};
168 let mut table = Table::new();
169 table.set_header({
170 let mut row = Row::new();
171 row.add_cell("id".into());
172 row.add_cell("name".into());
173 row.add_cell("resource_group".into());
174 row.add_cell("barrier_interval_ms".into());
175 row.add_cell("checkpoint_frequency".into());
176 row
177 });
178 for db in databases {
179 let mut row = Row::new();
180 row.add_cell(db.id.into());
181 row.add_cell(db.name.into());
182 row.add_cell(db.resource_group.into());
183 row.add_cell(
184 db.barrier_interval_ms
185 .map(|v| v.to_string())
186 .unwrap_or("default".into())
187 .into(),
188 );
189 row.add_cell(
190 db.checkpoint_frequency
191 .map(|v| v.to_string())
192 .unwrap_or("default".into())
193 .into(),
194 );
195 table.add_row(row);
196 }
197
198 let _ = writeln!(s, "DATABASE");
199 let _ = writeln!(s, "{table}");
200 }
201
202 async fn write_schemas(&self, s: &mut String) {
203 let schemas = match self
204 .metadata_manager
205 .catalog_controller
206 .list_schemas()
207 .await
208 {
209 Ok(schemas) => schemas,
210 Err(err) => {
211 tracing::warn!(error=?err.as_report(), "failed to list schemas");
212 return;
213 }
214 };
215
216 use comfy_table::{Row, Table};
217 let mut table = Table::new();
218 table.set_header({
219 let mut row = Row::new();
220 row.add_cell("id".into());
221 row.add_cell("database_id".into());
222 row.add_cell("name".into());
223 row
224 });
225 for schema in schemas {
226 let mut row = Row::new();
227 row.add_cell(schema.id.into());
228 row.add_cell(schema.database_id.into());
229 row.add_cell(schema.name.into());
230 table.add_row(row);
231 }
232
233 let _ = writeln!(s, "SCHEMA");
234 let _ = writeln!(s, "{table}");
235 }
236
237 async fn write_worker_nodes(&self, s: &mut String) {
238 let Ok(worker_actor_count) = self.metadata_manager.worker_actor_count() else {
239 tracing::warn!("failed to get worker actor count");
240 return;
241 };
242
243 use comfy_table::{Row, Table};
244 let Ok(worker_nodes) = self.metadata_manager.list_worker_node(None, None).await else {
245 tracing::warn!("failed to get worker nodes");
246 return;
247 };
248 let mut table = Table::new();
249 table.set_header({
250 let mut row = Row::new();
251 row.add_cell("id".into());
252 row.add_cell("host".into());
253 row.add_cell("hostname".into());
254 row.add_cell("type".into());
255 row.add_cell("state".into());
256 row.add_cell("parallelism".into());
257 row.add_cell("resource_group".into());
258 row.add_cell("is_streaming".into());
259 row.add_cell("is_serving".into());
260 row.add_cell("is_iceberg_compactor".into());
261 row.add_cell("rw_version".into());
262 row.add_cell("total_memory_bytes".into());
263 row.add_cell("total_cpu_cores".into());
264 row.add_cell("started_at".into());
265 row.add_cell("actor_count".into());
266 row
267 });
268 for worker_node in worker_nodes {
269 let mut row = Row::new();
270 row.add_cell(worker_node.id.into());
271 try_add_cell(
272 &mut row,
273 worker_node
274 .host
275 .as_ref()
276 .map(|h| format!("{}:{}", h.host, h.port)),
277 );
278 try_add_cell(
279 &mut row,
280 worker_node.resource.as_ref().map(|r| r.hostname.clone()),
281 );
282 try_add_cell(
283 &mut row,
284 worker_node.get_type().ok().map(|t| t.as_str_name()),
285 );
286 try_add_cell(
287 &mut row,
288 worker_node.get_state().ok().map(|s| s.as_str_name()),
289 );
290 try_add_cell(&mut row, worker_node.parallelism());
291 try_add_cell(
292 &mut row,
293 worker_node
294 .property
295 .as_ref()
296 .map(|p| p.resource_group.clone().unwrap_or("".to_owned())),
297 );
298 let (is_streaming, is_serving) = {
300 if let Ok(t) = worker_node.get_type()
301 && t == WorkerType::ComputeNode
302 {
303 (
304 worker_node.property.as_ref().map(|p| p.is_streaming),
305 worker_node.property.as_ref().map(|p| p.is_serving),
306 )
307 } else {
308 (None, None)
309 }
310 };
311 try_add_cell(&mut row, is_streaming);
312 try_add_cell(&mut row, is_serving);
313 let is_iceberg_compactor = {
315 if let Ok(t) = worker_node.get_type()
316 && t == WorkerType::Compactor
317 {
318 worker_node
319 .property
320 .as_ref()
321 .map(|p| p.is_iceberg_compactor)
322 } else {
323 None
324 }
325 };
326 try_add_cell(&mut row, is_iceberg_compactor);
327 try_add_cell(
328 &mut row,
329 worker_node.resource.as_ref().map(|r| r.rw_version.clone()),
330 );
331 try_add_cell(
332 &mut row,
333 worker_node.resource.as_ref().map(|r| r.total_memory_bytes),
334 );
335 try_add_cell(
336 &mut row,
337 worker_node.resource.as_ref().map(|r| r.total_cpu_cores),
338 );
339 try_add_cell(
340 &mut row,
341 worker_node
342 .started_at
343 .and_then(|ts| Timestamptz::from_secs(ts as _).map(|t| t.to_string())),
344 );
345 let actor_count = {
346 if let Ok(t) = worker_node.get_type()
347 && t != WorkerType::ComputeNode
348 {
349 None
350 } else {
351 match worker_actor_count.get(&worker_node.id) {
352 None => Some(0),
353 Some(c) => Some(*c),
354 }
355 }
356 };
357 try_add_cell(&mut row, actor_count);
358 table.add_row(row);
359 }
360 let _ = writeln!(s, "{table}");
361 }
362
363 fn write_event_logs(&self, s: &mut String) {
364 let event_logs = self
365 .event_log_manager
366 .list_event_logs()
367 .into_iter()
368 .sorted_by(|a, b| {
369 a.timestamp
370 .unwrap_or(0)
371 .cmp(&b.timestamp.unwrap_or(0))
372 .reverse()
373 })
374 .collect_vec();
375
376 let _ = writeln!(s, "latest barrier completions");
377 Self::write_event_logs_impl(
378 s,
379 event_logs.iter(),
380 |e| {
381 let Event::BarrierComplete(info) = e else {
382 return None;
383 };
384 Some(json!(info).to_string())
385 },
386 Some(10),
387 );
388
389 let _ = writeln!(s);
390 let _ = writeln!(s, "latest barrier collection failures");
391 Self::write_event_logs_impl(
392 s,
393 event_logs.iter(),
394 |e| {
395 let Event::CollectBarrierFail(info) = e else {
396 return None;
397 };
398 Some(json!(info).to_string())
399 },
400 Some(3),
401 );
402
403 let _ = writeln!(s);
404 let _ = writeln!(s, "latest barrier injection failures");
405 Self::write_event_logs_impl(
406 s,
407 event_logs.iter(),
408 |e| {
409 let Event::InjectBarrierFail(info) = e else {
410 return None;
411 };
412 Some(json!(info).to_string())
413 },
414 Some(3),
415 );
416
417 let _ = writeln!(s);
418 let _ = writeln!(s, "latest worker node panics");
419 Self::write_event_logs_impl(
420 s,
421 event_logs.iter(),
422 |e| {
423 let Event::WorkerNodePanic(info) = e else {
424 return None;
425 };
426 Some(json!(info).to_string())
427 },
428 Some(10),
429 );
430
431 let _ = writeln!(s);
432 let _ = writeln!(s, "latest create stream job failures");
433 Self::write_event_logs_impl(
434 s,
435 event_logs.iter(),
436 |e| {
437 let Event::CreateStreamJobFail(info) = e else {
438 return None;
439 };
440 Some(json!(info).to_string())
441 },
442 Some(3),
443 );
444
445 let _ = writeln!(s);
446 let _ = writeln!(s, "latest dirty stream job clear-ups");
447 Self::write_event_logs_impl(
448 s,
449 event_logs.iter(),
450 |e| {
451 let Event::DirtyStreamJobClear(info) = e else {
452 return None;
453 };
454 Some(json!(info).to_string())
455 },
456 Some(3),
457 );
458 }
459
460 fn write_event_logs_impl<'a, F>(
461 s: &mut String,
462 event_logs: impl Iterator<Item = &'a EventLog>,
463 get_event_info: F,
464 limit: Option<usize>,
465 ) where
466 F: Fn(&Event) -> Option<String>,
467 {
468 use comfy_table::{Row, Table};
469 let mut table = Table::new();
470 table.set_header({
471 let mut row = Row::new();
472 row.add_cell("created_at".into());
473 row.add_cell("info".into());
474 row
475 });
476 let mut row_count = 0;
477 for event_log in event_logs {
478 let Some(ref inner) = event_log.event else {
479 continue;
480 };
481 if let Some(limit) = limit
482 && row_count >= limit
483 {
484 break;
485 }
486 let mut row = Row::new();
487 let ts = event_log
488 .timestamp
489 .and_then(|ts| Timestamptz::from_millis(ts as _).map(|ts| ts.to_string()));
490 try_add_cell(&mut row, ts);
491 if let Some(info) = get_event_info(inner) {
492 row.add_cell(info.into());
493 row_count += 1;
494 } else {
495 continue;
496 }
497 table.add_row(row);
498 }
499 let _ = writeln!(s, "{table}");
500 }
501
502 async fn write_storage(&self, s: &mut String) {
503 let mut sst_num = 0;
504 let mut sst_total_file_size = 0;
505 let back_pressured_compaction_groups = self
506 .hummock_manger
507 .write_limits()
508 .await
509 .into_iter()
510 .filter_map(|(k, v)| {
511 if v.table_ids.is_empty() {
512 None
513 } else {
514 Some(k)
515 }
516 })
517 .join(",");
518 if !back_pressured_compaction_groups.is_empty() {
519 let _ = writeln!(
520 s,
521 "back pressured compaction groups: {back_pressured_compaction_groups}"
522 );
523 }
524
525 #[derive(PartialEq, Eq)]
526 struct SstableSort {
527 compaction_group_id: CompactionGroupId,
528 sst_id: HummockSstableId,
529 delete_ratio: u64,
530 }
531 impl PartialOrd for SstableSort {
532 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
533 Some(self.cmp(other))
534 }
535 }
536 impl Ord for SstableSort {
537 fn cmp(&self, other: &Self) -> Ordering {
538 self.delete_ratio.cmp(&other.delete_ratio)
539 }
540 }
541 fn top_k_sstables(
542 top_k: usize,
543 heap: &mut BinaryHeap<Reverse<SstableSort>>,
544 e: SstableSort,
545 ) {
546 if heap.len() < top_k {
547 heap.push(Reverse(e));
548 } else if let Some(mut p) = heap.peek_mut()
549 && e.delete_ratio > p.0.delete_ratio
550 {
551 *p = Reverse(e);
552 }
553 }
554
555 let top_k = 10;
556 let mut top_tombstone_delete_sst = BinaryHeap::with_capacity(top_k);
557 let compaction_group_num = self
558 .hummock_manger
559 .on_current_version(|version| {
560 for compaction_group in version.levels.values() {
561 let mut visit_level = |level: &Level| {
562 sst_num += level.table_infos.len();
563 sst_total_file_size +=
564 level.table_infos.iter().map(|t| t.sst_size).sum::<u64>();
565 for sst in &level.table_infos {
566 if sst.total_key_count == 0 {
567 continue;
568 }
569 let tombstone_delete_ratio =
570 sst.stale_key_count * 10000 / sst.total_key_count;
571 let e = SstableSort {
572 compaction_group_id: compaction_group.group_id,
573 sst_id: sst.sst_id,
574 delete_ratio: tombstone_delete_ratio,
575 };
576 top_k_sstables(top_k, &mut top_tombstone_delete_sst, e);
577 }
578 };
579 let l0 = &compaction_group.l0;
580 for level in &l0.sub_levels {
582 visit_level(level);
583 }
584 for level in &compaction_group.levels {
585 visit_level(level);
586 }
587 }
588 version.levels.len()
589 })
590 .await;
591
592 let _ = writeln!(s, "number of SSTables: {sst_num}");
593 let _ = writeln!(s, "total size of SSTables (byte): {sst_total_file_size}");
594 let _ = writeln!(s, "number of compaction groups: {compaction_group_num}");
595 use comfy_table::{Row, Table};
596 fn format_table(heap: BinaryHeap<Reverse<SstableSort>>) -> Table {
597 let mut table = Table::new();
598 table.set_header({
599 let mut row = Row::new();
600 row.add_cell("compaction group id".into());
601 row.add_cell("sstable id".into());
602 row.add_cell("delete ratio".into());
603 row
604 });
605 for sst in &heap.into_sorted_vec() {
606 let mut row = Row::new();
607 row.add_cell(sst.0.compaction_group_id.into());
608 row.add_cell(sst.0.sst_id.into());
609 row.add_cell(format!("{:.2}%", sst.0.delete_ratio as f64 / 100.0).into());
610 table.add_row(row);
611 }
612 table
613 }
614 let _ = writeln!(s);
615 let _ = writeln!(s, "top tombstone delete ratio");
616 let _ = writeln!(s, "{}", format_table(top_tombstone_delete_sst));
617 let _ = writeln!(s);
618
619 let _ = writeln!(s);
620 self.write_storage_prometheus(s).await;
621 }
622
623 async fn write_iceberg_compaction_schedule(&self, s: &mut String) {
624 use comfy_table::{Row, Table};
625
626 let sink_names = match self.metadata_manager.catalog_controller.list_sinks().await {
627 Ok(sinks) => sinks
628 .into_iter()
629 .map(|sink| (sink.id.as_raw_id(), sink.name))
630 .collect::<BTreeMap<u32, String>>(),
631 Err(err) => {
632 tracing::warn!(error=?err.as_report(), "failed to list sinks");
633 return;
634 }
635 };
636
637 let statuses = self.iceberg_compaction_manager.list_compaction_statuses();
638
639 let _ = writeln!(s, "ICEBERG COMPACTION SCHEDULE");
640
641 let mut table = Table::new();
642 table.set_header({
643 let mut row = Row::new();
644 row.add_cell("sink_id".into());
645 row.add_cell("sink_name".into());
646 row.add_cell("task_type".into());
647 row.add_cell("schedule_state".into());
648 row.add_cell("trigger_interval_sec".into());
649 row.add_cell("trigger_snapshot_count".into());
650 row.add_cell("pending_snapshot_count".into());
651 row.add_cell("next_compaction_after_sec".into());
652 row.add_cell("is_triggerable".into());
653 row
654 });
655
656 for status in statuses {
657 let mut row = Row::new();
658 row.add_cell(status.sink_id.as_raw_id().into());
659 try_add_cell(
660 &mut row,
661 sink_names.get(&status.sink_id.as_raw_id()).cloned(),
662 );
663 row.add_cell(status.task_type.into());
664 row.add_cell(status.schedule_state.into());
665 row.add_cell(status.trigger_interval_sec.into());
666 row.add_cell((status.trigger_snapshot_count as u64).into());
667 try_add_cell(
668 &mut row,
669 status.pending_snapshot_count.map(|count| count as u64),
670 );
671 try_add_cell(&mut row, status.next_compaction_after_sec);
672 row.add_cell(status.is_triggerable.into());
673 table.add_row(row);
674 }
675
676 let _ = writeln!(s, "{table}");
677 }
678
679 async fn write_streaming_prometheus(&self, s: &mut String) {
680 let _ = writeln!(s, "top sources by throughput (rows/s)");
681 let query = format!(
682 "topk(3, sum(rate(stream_source_output_rows_counts{{{}}}[10m]))by (source_name))",
683 self.prometheus_selector
684 );
685 self.write_instant_vector_impl(s, &query, vec!["source_name"])
686 .await;
687
688 let _ = writeln!(s);
689 let _ = writeln!(s, "top materialized views by throughput (rows/s)");
690 let query = format!(
691 "topk(3, sum(rate(stream_mview_input_row_count{{{}}}[10m]))by (table_id))",
692 self.prometheus_selector
693 );
694 self.write_instant_vector_impl(s, &query, vec!["table_id"])
695 .await;
696
697 let _ = writeln!(s);
698 let _ = writeln!(s, "top join executor by matched rows");
699 let query = format!(
700 "topk(3, histogram_quantile(0.9, sum(rate(stream_join_matched_join_keys_bucket{{{}}}[10m])) by (le, fragment_id, table_id)))",
701 self.prometheus_selector
702 );
703 self.write_instant_vector_impl(s, &query, vec!["table_id", "fragment_id"])
704 .await;
705 }
706
707 async fn write_storage_prometheus(&self, s: &mut String) {
708 let _ = writeln!(s, "top Hummock Get by duration (second)");
709 let query = format!(
710 "topk(3, histogram_quantile(0.9, sum(rate(state_store_get_duration_bucket{{{}}}[10m])) by (le, table_id)))",
711 self.prometheus_selector
712 );
713 self.write_instant_vector_impl(s, &query, vec!["table_id"])
714 .await;
715
716 let _ = writeln!(s);
717 let _ = writeln!(s, "top Hummock Iter Init by duration (second)");
718 let query = format!(
719 "topk(3, histogram_quantile(0.9, sum(rate(state_store_iter_init_duration_bucket{{{}}}[10m])) by (le, table_id)))",
720 self.prometheus_selector
721 );
722 self.write_instant_vector_impl(s, &query, vec!["table_id"])
723 .await;
724
725 let _ = writeln!(s);
726 let _ = writeln!(s, "top table commit flush by size (byte)");
727 let query = format!(
728 "topk(3, sum(rate(storage_commit_write_throughput{{{}}}[10m])) by (table_id))",
729 self.prometheus_selector
730 );
731 self.write_instant_vector_impl(s, &query, vec!["table_id"])
732 .await;
733
734 let _ = writeln!(s);
735 let _ = writeln!(s, "object store read throughput (bytes/s)");
736 let query = format!(
737 "sum(rate(object_store_read_bytes{{{}}}[10m])) by (job, instance)",
738 merge_prometheus_selector([&self.prometheus_selector, "job=~\"compute|compactor\""])
739 );
740 self.write_instant_vector_impl(s, &query, vec!["job", "instance"])
741 .await;
742
743 let _ = writeln!(s);
744 let _ = writeln!(s, "object store write throughput (bytes/s)");
745 let query = format!(
746 "sum(rate(object_store_write_bytes{{{}}}[10m])) by (job, instance)",
747 merge_prometheus_selector([&self.prometheus_selector, "job=~\"compute|compactor\""])
748 );
749 self.write_instant_vector_impl(s, &query, vec!["job", "instance"])
750 .await;
751
752 let _ = writeln!(s);
753 let _ = writeln!(s, "object store operation rate");
754 let query = format!(
755 "sum(rate(object_store_operation_latency_count{{{}}}[10m])) by (le, type, job, instance)",
756 merge_prometheus_selector([
757 &self.prometheus_selector,
758 "job=~\"compute|compactor\", type!~\"streaming_upload_write_bytes|streaming_read_read_bytes|streaming_read\""
759 ])
760 );
761 self.write_instant_vector_impl(s, &query, vec!["type", "job", "instance"])
762 .await;
763
764 let _ = writeln!(s);
765 let _ = writeln!(s, "object store operation duration (second)");
766 let query = format!(
767 "histogram_quantile(0.9, sum(rate(object_store_operation_latency_bucket{{{}}}[10m])) by (le, type, job, instance))",
768 merge_prometheus_selector([
769 &self.prometheus_selector,
770 "job=~\"compute|compactor\", type!~\"streaming_upload_write_bytes|streaming_read\""
771 ])
772 );
773 self.write_instant_vector_impl(s, &query, vec!["type", "job", "instance"])
774 .await;
775 }
776
777 async fn write_instant_vector_impl(&self, s: &mut String, query: &str, labels: Vec<&str>) {
778 let Some(ref client) = self.prometheus_client else {
779 return;
780 };
781 if let Ok(Vector(instant_vec)) = client
782 .query(query)
783 .get()
784 .await
785 .map(|result| result.into_inner().0)
786 {
787 for i in instant_vec {
788 let l = labels
789 .iter()
790 .map(|label| {
791 format!(
792 "{}={}",
793 *label,
794 i.metric()
795 .get(*label)
796 .map(|s| s.as_str())
797 .unwrap_or_default()
798 )
799 })
800 .join(",");
801 let _ = writeln!(s, "{}: {:.3}", l, i.sample().value());
802 }
803 }
804 }
805
806 async fn write_await_tree(&self, s: &mut String, actor_traces_format: ActorTracesFormat) {
807 let all = dump_cluster_await_tree(
808 &self.metadata_manager,
809 &self.await_tree_reg,
810 actor_traces_format,
811 )
812 .await;
813
814 if let Ok(all) = all {
815 write!(s, "{}", all.output()).unwrap();
816 } else {
817 tracing::warn!("failed to dump await tree");
818 }
819 }
820
821 async fn write_table_definition(&self, s: &mut String) -> MetaResult<()> {
822 let sources = self
823 .metadata_manager
824 .catalog_controller
825 .list_sources()
826 .await?
827 .into_iter()
828 .map(|s| {
829 (
830 s.id.into(),
831 (
832 s.name,
833 s.database_id,
834 s.schema_id,
835 s.definition,
836 s.created_at_epoch,
837 ),
838 )
839 })
840 .collect::<BTreeMap<_, _>>();
841 let mut user_tables = BTreeMap::new();
842 let mut mvs = BTreeMap::new();
843 let mut indexes = BTreeMap::new();
844 let mut internal_tables = BTreeMap::new();
845 {
846 let grouped = self
847 .metadata_manager
848 .catalog_controller
849 .list_all_state_tables()
850 .await?
851 .into_iter()
852 .chunk_by(|t| t.table_type());
853 for (table_type, tables) in &grouped {
854 let tables = tables.into_iter().map(|t| {
855 (
856 t.id.into(),
857 (
858 t.name,
859 t.database_id,
860 t.schema_id,
861 t.definition,
862 t.created_at_epoch,
863 ),
864 )
865 });
866 match table_type {
867 PbTableType::Table => user_tables.extend(tables),
868 PbTableType::MaterializedView => mvs.extend(tables),
869 PbTableType::Index | PbTableType::VectorIndex => indexes.extend(tables),
870 PbTableType::Internal => internal_tables.extend(tables),
871 PbTableType::Unspecified => {
872 tracing::error!("unspecified table type: {:?}", tables.collect_vec());
873 }
874 }
875 }
876 }
877 let sinks = self
878 .metadata_manager
879 .catalog_controller
880 .list_sinks()
881 .await?
882 .into_iter()
883 .map(|s| {
884 (
885 s.id.into(),
886 (
887 s.name,
888 s.database_id,
889 s.schema_id,
890 s.definition,
891 s.created_at_epoch,
892 ),
893 )
894 })
895 .collect::<BTreeMap<_, _>>();
896 let views = self
897 .metadata_manager
898 .catalog_controller
899 .list_views()
900 .await?
901 .into_iter()
902 .map(|v| {
903 (
904 v.id.into(),
905 (v.name, v.database_id, v.schema_id, v.sql, None),
906 )
907 })
908 .collect::<BTreeMap<_, _>>();
909 let mut streaming_jobs = self
910 .metadata_manager
911 .catalog_controller
912 .list_streaming_job_infos()
913 .await?;
914 streaming_jobs.sort_by_key(|info| (info.obj_type as usize, info.job_id));
915 {
916 use comfy_table::{Row, Table};
917 let mut table = Table::new();
918 table.set_header({
919 let mut row = Row::new();
920 row.add_cell("job_id".into());
921 row.add_cell("name".into());
922 row.add_cell("obj_type".into());
923 row.add_cell("state".into());
924 row.add_cell("parallelism".into());
925 row.add_cell("max_parallelism".into());
926 row.add_cell("resource_group".into());
927 row.add_cell("database_id".into());
928 row.add_cell("schema_id".into());
929 row.add_cell("config_override".into());
930 row
931 });
932 for job in streaming_jobs {
933 let mut row = Row::new();
934 row.add_cell(job.job_id.into());
935 row.add_cell(job.name.into());
936 row.add_cell(job.obj_type.as_str().into());
937 row.add_cell(format_job_status(job.job_status).into());
938 row.add_cell(format_streaming_parallelism(&job.parallelism).into());
939 row.add_cell(job.max_parallelism.into());
940 row.add_cell(job.resource_group.into());
941 row.add_cell(job.database_id.into());
942 row.add_cell(job.schema_id.into());
943 row.add_cell(job.config_override.into());
944 table.add_row(row);
945 }
946 let _ = writeln!(s);
947 let _ = writeln!(s, "STREAMING JOB");
948 let _ = writeln!(s, "{table}");
949 }
950 let catalogs = [
951 ("SOURCE", sources),
952 ("TABLE", user_tables),
953 ("MATERIALIZED VIEW", mvs),
954 ("INDEX", indexes),
955 ("SINK", sinks),
956 ("VIEW", views),
957 ("INTERNAL TABLE", internal_tables),
958 ];
959 let mut obj_id_to_name: HashMap<ObjectId, _> = HashMap::new();
960 for (title, items) in catalogs {
961 use comfy_table::{Row, Table};
962 let mut table = Table::new();
963 table.set_header({
964 let mut row = Row::new();
965 row.add_cell("id".into());
966 row.add_cell("name".into());
967 row.add_cell("database_id".into());
968 row.add_cell("schema_id".into());
969 row.add_cell("created_at".into());
970 row.add_cell("definition".into());
971 row
972 });
973 for (id, (name, database_id, schema_id, definition, created_at_epoch)) in items {
974 obj_id_to_name.insert(id, name.clone());
975 let mut row = Row::new();
976 let may_redact = redact_sql(&definition, self.redact_sql_option_keywords.clone())
977 .unwrap_or_else(|| "[REDACTED]".into());
978 let created_at = if let Some(created_at_epoch) = created_at_epoch {
979 format!("{}", Epoch::from(created_at_epoch).as_timestamptz())
980 } else {
981 "".into()
982 };
983 row.add_cell(id.into());
984 row.add_cell(name.into());
985 row.add_cell(database_id.into());
986 row.add_cell(schema_id.into());
987 row.add_cell(created_at.into());
988 row.add_cell(may_redact.into());
989 table.add_row(row);
990 }
991 let _ = writeln!(s);
992 let _ = writeln!(s, "{title}");
993 let _ = writeln!(s, "{table}");
994 }
995
996 let actors = self
997 .metadata_manager
998 .catalog_controller
999 .list_actor_info()
1000 .await?
1001 .into_iter()
1002 .map(|(actor_id, fragment_id, job_id, schema_id, obj_type)| {
1003 (
1004 actor_id,
1005 (
1006 fragment_id,
1007 job_id,
1008 schema_id,
1009 obj_type,
1010 obj_id_to_name.get(&job_id).cloned().unwrap_or_default(),
1011 ),
1012 )
1013 })
1014 .collect::<BTreeMap<_, _>>();
1015
1016 use comfy_table::{Row, Table};
1017 let mut table = Table::new();
1018 table.set_header({
1019 let mut row = Row::new();
1020 row.add_cell("id".into());
1021 row.add_cell("fragment_id".into());
1022 row.add_cell("job_id".into());
1023 row.add_cell("schema_id".into());
1024 row.add_cell("type".into());
1025 row.add_cell("name".into());
1026 row
1027 });
1028 for (actor_id, (fragment_id, job_id, schema_id, ddl_type, name)) in actors {
1029 let mut row = Row::new();
1030 row.add_cell(actor_id.into());
1031 row.add_cell(fragment_id.into());
1032 row.add_cell(job_id.into());
1033 row.add_cell(schema_id.into());
1034 row.add_cell(ddl_type.as_str().into());
1035 row.add_cell(name.into());
1036 table.add_row(row);
1037 }
1038 let _ = writeln!(s);
1039 let _ = writeln!(s, "ACTOR");
1040 let _ = writeln!(s, "{table}");
1041 Ok(())
1042 }
1043
1044 fn write_license(&self, s: &mut String) {
1045 use comfy_table::presets::ASCII_BORDERS_ONLY;
1046 use comfy_table::{ContentArrangement, Row, Table};
1047
1048 let mut table = Table::new();
1049 table.load_preset(ASCII_BORDERS_ONLY);
1050 table.set_content_arrangement(ContentArrangement::Dynamic);
1051 table.set_header({
1052 let mut row = Row::new();
1053 row.add_cell("field".into());
1054 row.add_cell("value".into());
1055 row
1056 });
1057
1058 match LicenseManager::get().license() {
1059 Ok(license) => {
1060 let fmt_option = |value: Option<u64>| match value {
1061 Some(v) => v.to_string(),
1062 None => "unlimited".to_owned(),
1063 };
1064
1065 let expires_at = if license.exp == u64::MAX {
1066 "never".to_owned()
1067 } else {
1068 let exp_i64 = license.exp as i64;
1069 chrono::DateTime::<chrono::Utc>::from_timestamp(exp_i64, 0)
1070 .map(|ts| ts.to_rfc3339())
1071 .unwrap_or_else(|| format!("invalid ({})", license.exp))
1072 };
1073
1074 let mut row = Row::new();
1075 row.add_cell("status".into());
1076 row.add_cell("valid".into());
1077 table.add_row(row);
1078
1079 let mut row = Row::new();
1080 row.add_cell("tier".into());
1081 row.add_cell(license.tier.name().into());
1082 table.add_row(row);
1083
1084 let mut row = Row::new();
1085 row.add_cell("expires_at".into());
1086 row.add_cell(expires_at.into());
1087 table.add_row(row);
1088
1089 let mut row = Row::new();
1090 row.add_cell("rwu_limit".into());
1091 row.add_cell(fmt_option(license.rwu_limit.map(|v| v.get())).into());
1092 table.add_row(row);
1093
1094 let mut row = Row::new();
1095 row.add_cell("cpu_core_limit".into());
1096 row.add_cell(fmt_option(license.cpu_core_limit()).into());
1097 table.add_row(row);
1098
1099 let mut row = Row::new();
1100 row.add_cell("memory_limit_bytes".into());
1101 row.add_cell(fmt_option(license.memory_limit()).into());
1102 table.add_row(row);
1103
1104 let mut features: Vec<_> = license
1105 .tier
1106 .available_features()
1107 .map(|f| f.name())
1108 .collect();
1109 features.sort_unstable();
1110 let feature_summary = format_features(&features);
1111
1112 let mut row = Row::new();
1113 row.add_cell("available_features".into());
1114 row.add_cell(feature_summary.into());
1115 table.add_row(row);
1116 }
1117 Err(error) => {
1118 let mut row = Row::new();
1119 row.add_cell("status".into());
1120 row.add_cell("invalid".into());
1121 table.add_row(row);
1122
1123 let mut row = Row::new();
1124 row.add_cell("error".into());
1125 row.add_cell(error.to_report_string().into());
1126 table.add_row(row);
1127 }
1128 }
1129
1130 let _ = writeln!(s, "LICENSE");
1131 let _ = writeln!(s, "{table}");
1132 }
1133
1134 async fn write_params(&self, s: &mut String) {
1135 let params = self.system_params_controller.get_params().await;
1136
1137 use comfy_table::{Row, Table};
1138 let mut table = Table::new();
1139 table.set_header({
1140 let mut row = Row::new();
1141 row.add_cell("key".into());
1142 row.add_cell("value".into());
1143 row
1144 });
1145
1146 let mut row = Row::new();
1147 row.add_cell("barrier_interval_ms".into());
1148 row.add_cell(params.barrier_interval_ms().to_string().into());
1149 table.add_row(row);
1150
1151 let mut row = Row::new();
1152 row.add_cell("checkpoint_frequency".into());
1153 row.add_cell(params.checkpoint_frequency().to_string().into());
1154 table.add_row(row);
1155
1156 let mut row = Row::new();
1157 row.add_cell("state_store".into());
1158 row.add_cell(params.state_store().to_owned().into());
1159 table.add_row(row);
1160
1161 let mut row = Row::new();
1162 row.add_cell("data_directory".into());
1163 row.add_cell(params.data_directory().to_owned().into());
1164 table.add_row(row);
1165
1166 let mut row = Row::new();
1167 row.add_cell("max_concurrent_creating_streaming_jobs".into());
1168 row.add_cell(
1169 params
1170 .max_concurrent_creating_streaming_jobs()
1171 .to_string()
1172 .into(),
1173 );
1174 table.add_row(row);
1175
1176 let mut row = Row::new();
1177 row.add_cell("time_travel_retention_ms".into());
1178 row.add_cell(params.time_travel_retention_ms().to_string().into());
1179 table.add_row(row);
1180
1181 let mut row = Row::new();
1182 row.add_cell("adaptive_parallelism_strategy".into());
1183 row.add_cell(params.adaptive_parallelism_strategy().to_string().into());
1184 table.add_row(row);
1185
1186 let mut row = Row::new();
1187 row.add_cell("per_database_isolation".into());
1188 row.add_cell(params.per_database_isolation().to_string().into());
1189 table.add_row(row);
1190
1191 let _ = writeln!(s, "SYSTEM PARAMS");
1192 let _ = writeln!(s, "{table}");
1193 }
1194}
1195
1196fn try_add_cell<T: Into<comfy_table::Cell>>(row: &mut comfy_table::Row, t: Option<T>) {
1197 match t {
1198 Some(t) => {
1199 row.add_cell(t.into());
1200 }
1201 None => {
1202 row.add_cell("".into());
1203 }
1204 }
1205}
1206
1207fn merge_prometheus_selector<'a>(selectors: impl IntoIterator<Item = &'a str>) -> String {
1208 selectors.into_iter().filter(|s| !s.is_empty()).join(",")
1209}
1210
1211fn redact_sql(sql: &str, keywords: RedactSqlOptionKeywordsRef) -> Option<String> {
1212 match Parser::parse_sql(sql) {
1213 Ok(sqls) => Some(
1214 sqls.into_iter()
1215 .map(|sql| sql.to_redacted_string(keywords.clone()))
1216 .join(";"),
1217 ),
1218 Err(_) => None,
1219 }
1220}
1221
1222fn format_features(features: &[&'static str]) -> String {
1223 if features.is_empty() {
1224 return "(none)".into();
1225 }
1226
1227 const PER_LINE: usize = 6;
1228 features
1229 .chunks(PER_LINE)
1230 .map(|chunk| format!(" {}", chunk.join(", ")))
1231 .collect::<Vec<_>>()
1232 .join("\n")
1233}
1234
1235fn format_job_status(status: JobStatus) -> &'static str {
1236 match status {
1237 JobStatus::Initial => "initial",
1238 JobStatus::Creating => "creating",
1239 JobStatus::Created => "created",
1240 }
1241}
1242
1243fn format_streaming_parallelism(parallelism: &StreamingParallelism) -> String {
1244 match parallelism {
1245 StreamingParallelism::Adaptive => "adaptive".into(),
1246 StreamingParallelism::Fixed(n) => format!("fixed({n})"),
1247 StreamingParallelism::Custom => "custom".into(),
1248 }
1249}