risingwave_meta/manager/
diagnose.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::{Ordering, Reverse};
16use std::collections::{BTreeMap, BinaryHeap, HashMap};
17use std::fmt::Write;
18use std::sync::Arc;
19
20use itertools::Itertools;
21use prometheus_http_query::response::Data::Vector;
22use risingwave_common::id::ObjectId;
23use risingwave_common::system_param::reader::SystemParamsRead;
24use risingwave_common::types::Timestamptz;
25use risingwave_common::util::StackTraceResponseExt;
26use risingwave_common::util::epoch::Epoch;
27use risingwave_hummock_sdk::HummockSstableId;
28use risingwave_hummock_sdk::level::Level;
29use risingwave_license::LicenseManager;
30use risingwave_meta_model::{JobStatus, StreamingParallelism};
31use risingwave_pb::catalog::table::PbTableType;
32use risingwave_pb::common::WorkerType;
33use risingwave_pb::meta::EventLog;
34use risingwave_pb::meta::event_log::Event;
35use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
36use risingwave_sqlparser::ast::RedactSqlOptionKeywordsRef;
37use risingwave_sqlparser::parser::Parser;
38use serde_json::json;
39use thiserror_ext::AsReport;
40
41use crate::MetaResult;
42use crate::controller::system_param::SystemParamsControllerRef;
43use crate::hummock::HummockManagerRef;
44use crate::manager::MetadataManager;
45use crate::manager::event_log::EventLogManagerRef;
46use crate::rpc::await_tree::dump_cluster_await_tree;
47
48pub type DiagnoseCommandRef = Arc<DiagnoseCommand>;
49
50pub struct DiagnoseCommand {
51    metadata_manager: MetadataManager,
52    await_tree_reg: await_tree::Registry,
53    hummock_manger: HummockManagerRef,
54    event_log_manager: EventLogManagerRef,
55    prometheus_client: Option<prometheus_http_query::Client>,
56    prometheus_selector: String,
57    redact_sql_option_keywords: RedactSqlOptionKeywordsRef,
58    system_params_controller: SystemParamsControllerRef,
59}
60
61impl DiagnoseCommand {
62    pub fn new(
63        metadata_manager: MetadataManager,
64        await_tree_reg: await_tree::Registry,
65        hummock_manger: HummockManagerRef,
66        event_log_manager: EventLogManagerRef,
67        prometheus_client: Option<prometheus_http_query::Client>,
68        prometheus_selector: String,
69        redact_sql_option_keywords: RedactSqlOptionKeywordsRef,
70        system_params_controller: SystemParamsControllerRef,
71    ) -> Self {
72        Self {
73            metadata_manager,
74            await_tree_reg,
75            hummock_manger,
76            event_log_manager,
77            prometheus_client,
78            prometheus_selector,
79            redact_sql_option_keywords,
80            system_params_controller,
81        }
82    }
83
84    pub async fn report(&self, actor_traces_format: ActorTracesFormat) -> String {
85        let mut report = String::new();
86        let _ = writeln!(
87            report,
88            "report created at: {}\nversion: {}",
89            chrono::DateTime::<chrono::offset::Utc>::from(std::time::SystemTime::now()),
90            risingwave_common::current_cluster_version(),
91        );
92        let _ = writeln!(report);
93        self.write_license(&mut report);
94        let _ = writeln!(report);
95        self.write_catalog(&mut report).await;
96        let _ = writeln!(report);
97        self.write_worker_nodes(&mut report).await;
98        let _ = writeln!(report);
99        self.write_streaming_prometheus(&mut report).await;
100        let _ = writeln!(report);
101        self.write_storage(&mut report).await;
102        let _ = writeln!(report);
103        self.write_event_logs(&mut report);
104        let _ = writeln!(report);
105        self.write_params(&mut report).await;
106        let _ = writeln!(report);
107        self.write_await_tree(&mut report, actor_traces_format)
108            .await;
109
110        report
111    }
112
113    async fn write_catalog(&self, s: &mut String) {
114        self.write_catalog_inner(s).await;
115        let _ = self.write_table_definition(s).await.inspect_err(|e| {
116            tracing::warn!(
117                error = e.to_report_string(),
118                "failed to display table definition"
119            )
120        });
121    }
122
123    async fn write_catalog_inner(&self, s: &mut String) {
124        let stats = self.metadata_manager.catalog_controller.stats().await;
125
126        let stat = match stats {
127            Ok(stat) => stat,
128            Err(err) => {
129                tracing::warn!(error=?err.as_report(), "failed to get catalog stats");
130                return;
131            }
132        };
133        let _ = writeln!(s, "number of database: {}", stat.database_num);
134        let _ = writeln!(s, "number of fragment: {}", stat.streaming_job_num);
135        let _ = writeln!(s, "number of actor: {}", stat.actor_num);
136        let _ = writeln!(s, "number of source: {}", stat.source_num);
137        let _ = writeln!(s, "number of table: {}", stat.table_num);
138        let _ = writeln!(s, "number of materialized view: {}", stat.mview_num);
139        let _ = writeln!(s, "number of sink: {}", stat.sink_num);
140        let _ = writeln!(s, "number of index: {}", stat.index_num);
141        let _ = writeln!(s, "number of function: {}", stat.function_num);
142
143        self.write_databases(s).await;
144        self.write_schemas(s).await;
145    }
146
147    async fn write_databases(&self, s: &mut String) {
148        let databases = match self
149            .metadata_manager
150            .catalog_controller
151            .list_databases()
152            .await
153        {
154            Ok(databases) => databases,
155            Err(err) => {
156                tracing::warn!(error=?err.as_report(), "failed to list databases");
157                return;
158            }
159        };
160
161        use comfy_table::{Row, Table};
162        let mut table = Table::new();
163        table.set_header({
164            let mut row = Row::new();
165            row.add_cell("id".into());
166            row.add_cell("name".into());
167            row.add_cell("resource_group".into());
168            row.add_cell("barrier_interval_ms".into());
169            row.add_cell("checkpoint_frequency".into());
170            row
171        });
172        for db in databases {
173            let mut row = Row::new();
174            row.add_cell(db.id.into());
175            row.add_cell(db.name.into());
176            row.add_cell(db.resource_group.into());
177            row.add_cell(
178                db.barrier_interval_ms
179                    .map(|v| v.to_string())
180                    .unwrap_or("default".into())
181                    .into(),
182            );
183            row.add_cell(
184                db.checkpoint_frequency
185                    .map(|v| v.to_string())
186                    .unwrap_or("default".into())
187                    .into(),
188            );
189            table.add_row(row);
190        }
191
192        let _ = writeln!(s, "DATABASE");
193        let _ = writeln!(s, "{table}");
194    }
195
196    async fn write_schemas(&self, s: &mut String) {
197        let schemas = match self
198            .metadata_manager
199            .catalog_controller
200            .list_schemas()
201            .await
202        {
203            Ok(schemas) => schemas,
204            Err(err) => {
205                tracing::warn!(error=?err.as_report(), "failed to list schemas");
206                return;
207            }
208        };
209
210        use comfy_table::{Row, Table};
211        let mut table = Table::new();
212        table.set_header({
213            let mut row = Row::new();
214            row.add_cell("id".into());
215            row.add_cell("database_id".into());
216            row.add_cell("name".into());
217            row
218        });
219        for schema in schemas {
220            let mut row = Row::new();
221            row.add_cell(schema.id.into());
222            row.add_cell(schema.database_id.into());
223            row.add_cell(schema.name.into());
224            table.add_row(row);
225        }
226
227        let _ = writeln!(s, "SCHEMA");
228        let _ = writeln!(s, "{table}");
229    }
230
231    async fn write_worker_nodes(&self, s: &mut String) {
232        let Ok(worker_actor_count) = self.metadata_manager.worker_actor_count() else {
233            tracing::warn!("failed to get worker actor count");
234            return;
235        };
236
237        use comfy_table::{Row, Table};
238        let Ok(worker_nodes) = self.metadata_manager.list_worker_node(None, None).await else {
239            tracing::warn!("failed to get worker nodes");
240            return;
241        };
242        let mut table = Table::new();
243        table.set_header({
244            let mut row = Row::new();
245            row.add_cell("id".into());
246            row.add_cell("host".into());
247            row.add_cell("type".into());
248            row.add_cell("state".into());
249            row.add_cell("parallelism".into());
250            row.add_cell("resource_group".into());
251            row.add_cell("is_streaming".into());
252            row.add_cell("is_serving".into());
253            row.add_cell("is_iceberg_compactor".into());
254            row.add_cell("rw_version".into());
255            row.add_cell("total_memory_bytes".into());
256            row.add_cell("total_cpu_cores".into());
257            row.add_cell("started_at".into());
258            row.add_cell("actor_count".into());
259            row
260        });
261        for worker_node in worker_nodes {
262            let mut row = Row::new();
263            row.add_cell(worker_node.id.into());
264            try_add_cell(
265                &mut row,
266                worker_node
267                    .host
268                    .as_ref()
269                    .map(|h| format!("{}:{}", h.host, h.port)),
270            );
271            try_add_cell(
272                &mut row,
273                worker_node.get_type().ok().map(|t| t.as_str_name()),
274            );
275            try_add_cell(
276                &mut row,
277                worker_node.get_state().ok().map(|s| s.as_str_name()),
278            );
279            try_add_cell(&mut row, worker_node.parallelism());
280            try_add_cell(
281                &mut row,
282                worker_node
283                    .property
284                    .as_ref()
285                    .map(|p| p.resource_group.clone().unwrap_or("".to_owned())),
286            );
287            // is_streaming and is_serving are only meaningful for ComputeNode
288            let (is_streaming, is_serving) = {
289                if let Ok(t) = worker_node.get_type()
290                    && t == WorkerType::ComputeNode
291                {
292                    (
293                        worker_node.property.as_ref().map(|p| p.is_streaming),
294                        worker_node.property.as_ref().map(|p| p.is_serving),
295                    )
296                } else {
297                    (None, None)
298                }
299            };
300            try_add_cell(&mut row, is_streaming);
301            try_add_cell(&mut row, is_serving);
302            // is_iceberg_compactor is only meaningful for Compactor worker type
303            let is_iceberg_compactor = {
304                if let Ok(t) = worker_node.get_type()
305                    && t == WorkerType::Compactor
306                {
307                    worker_node
308                        .property
309                        .as_ref()
310                        .map(|p| p.is_iceberg_compactor)
311                } else {
312                    None
313                }
314            };
315            try_add_cell(&mut row, is_iceberg_compactor);
316            try_add_cell(
317                &mut row,
318                worker_node.resource.as_ref().map(|r| r.rw_version.clone()),
319            );
320            try_add_cell(
321                &mut row,
322                worker_node.resource.as_ref().map(|r| r.total_memory_bytes),
323            );
324            try_add_cell(
325                &mut row,
326                worker_node.resource.as_ref().map(|r| r.total_cpu_cores),
327            );
328            try_add_cell(
329                &mut row,
330                worker_node
331                    .started_at
332                    .and_then(|ts| Timestamptz::from_secs(ts as _).map(|t| t.to_string())),
333            );
334            let actor_count = {
335                if let Ok(t) = worker_node.get_type()
336                    && t != WorkerType::ComputeNode
337                {
338                    None
339                } else {
340                    match worker_actor_count.get(&worker_node.id) {
341                        None => Some(0),
342                        Some(c) => Some(*c),
343                    }
344                }
345            };
346            try_add_cell(&mut row, actor_count);
347            table.add_row(row);
348        }
349        let _ = writeln!(s, "{table}");
350    }
351
352    fn write_event_logs(&self, s: &mut String) {
353        let event_logs = self
354            .event_log_manager
355            .list_event_logs()
356            .into_iter()
357            .sorted_by(|a, b| {
358                a.timestamp
359                    .unwrap_or(0)
360                    .cmp(&b.timestamp.unwrap_or(0))
361                    .reverse()
362            })
363            .collect_vec();
364
365        let _ = writeln!(s, "latest barrier completions");
366        Self::write_event_logs_impl(
367            s,
368            event_logs.iter(),
369            |e| {
370                let Event::BarrierComplete(info) = e else {
371                    return None;
372                };
373                Some(json!(info).to_string())
374            },
375            Some(10),
376        );
377
378        let _ = writeln!(s);
379        let _ = writeln!(s, "latest barrier collection failures");
380        Self::write_event_logs_impl(
381            s,
382            event_logs.iter(),
383            |e| {
384                let Event::CollectBarrierFail(info) = e else {
385                    return None;
386                };
387                Some(json!(info).to_string())
388            },
389            Some(3),
390        );
391
392        let _ = writeln!(s);
393        let _ = writeln!(s, "latest barrier injection failures");
394        Self::write_event_logs_impl(
395            s,
396            event_logs.iter(),
397            |e| {
398                let Event::InjectBarrierFail(info) = e else {
399                    return None;
400                };
401                Some(json!(info).to_string())
402            },
403            Some(3),
404        );
405
406        let _ = writeln!(s);
407        let _ = writeln!(s, "latest worker node panics");
408        Self::write_event_logs_impl(
409            s,
410            event_logs.iter(),
411            |e| {
412                let Event::WorkerNodePanic(info) = e else {
413                    return None;
414                };
415                Some(json!(info).to_string())
416            },
417            Some(10),
418        );
419
420        let _ = writeln!(s);
421        let _ = writeln!(s, "latest create stream job failures");
422        Self::write_event_logs_impl(
423            s,
424            event_logs.iter(),
425            |e| {
426                let Event::CreateStreamJobFail(info) = e else {
427                    return None;
428                };
429                Some(json!(info).to_string())
430            },
431            Some(3),
432        );
433
434        let _ = writeln!(s);
435        let _ = writeln!(s, "latest dirty stream job clear-ups");
436        Self::write_event_logs_impl(
437            s,
438            event_logs.iter(),
439            |e| {
440                let Event::DirtyStreamJobClear(info) = e else {
441                    return None;
442                };
443                Some(json!(info).to_string())
444            },
445            Some(3),
446        );
447    }
448
449    fn write_event_logs_impl<'a, F>(
450        s: &mut String,
451        event_logs: impl Iterator<Item = &'a EventLog>,
452        get_event_info: F,
453        limit: Option<usize>,
454    ) where
455        F: Fn(&Event) -> Option<String>,
456    {
457        use comfy_table::{Row, Table};
458        let mut table = Table::new();
459        table.set_header({
460            let mut row = Row::new();
461            row.add_cell("created_at".into());
462            row.add_cell("info".into());
463            row
464        });
465        let mut row_count = 0;
466        for event_log in event_logs {
467            let Some(ref inner) = event_log.event else {
468                continue;
469            };
470            if let Some(limit) = limit
471                && row_count >= limit
472            {
473                break;
474            }
475            let mut row = Row::new();
476            let ts = event_log
477                .timestamp
478                .and_then(|ts| Timestamptz::from_millis(ts as _).map(|ts| ts.to_string()));
479            try_add_cell(&mut row, ts);
480            if let Some(info) = get_event_info(inner) {
481                row.add_cell(info.into());
482                row_count += 1;
483            } else {
484                continue;
485            }
486            table.add_row(row);
487        }
488        let _ = writeln!(s, "{table}");
489    }
490
491    async fn write_storage(&self, s: &mut String) {
492        let mut sst_num = 0;
493        let mut sst_total_file_size = 0;
494        let back_pressured_compaction_groups = self
495            .hummock_manger
496            .write_limits()
497            .await
498            .into_iter()
499            .filter_map(|(k, v)| {
500                if v.table_ids.is_empty() {
501                    None
502                } else {
503                    Some(k)
504                }
505            })
506            .join(",");
507        if !back_pressured_compaction_groups.is_empty() {
508            let _ = writeln!(
509                s,
510                "back pressured compaction groups: {back_pressured_compaction_groups}"
511            );
512        }
513
514        #[derive(PartialEq, Eq)]
515        struct SstableSort {
516            compaction_group_id: u64,
517            sst_id: HummockSstableId,
518            delete_ratio: u64,
519        }
520        impl PartialOrd for SstableSort {
521            fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
522                Some(self.cmp(other))
523            }
524        }
525        impl Ord for SstableSort {
526            fn cmp(&self, other: &Self) -> Ordering {
527                self.delete_ratio.cmp(&other.delete_ratio)
528            }
529        }
530        fn top_k_sstables(
531            top_k: usize,
532            heap: &mut BinaryHeap<Reverse<SstableSort>>,
533            e: SstableSort,
534        ) {
535            if heap.len() < top_k {
536                heap.push(Reverse(e));
537            } else if let Some(mut p) = heap.peek_mut()
538                && e.delete_ratio > p.0.delete_ratio
539            {
540                *p = Reverse(e);
541            }
542        }
543
544        let top_k = 10;
545        let mut top_tombstone_delete_sst = BinaryHeap::with_capacity(top_k);
546        let compaction_group_num = self
547            .hummock_manger
548            .on_current_version(|version| {
549                for compaction_group in version.levels.values() {
550                    let mut visit_level = |level: &Level| {
551                        sst_num += level.table_infos.len();
552                        sst_total_file_size +=
553                            level.table_infos.iter().map(|t| t.sst_size).sum::<u64>();
554                        for sst in &level.table_infos {
555                            if sst.total_key_count == 0 {
556                                continue;
557                            }
558                            let tombstone_delete_ratio =
559                                sst.stale_key_count * 10000 / sst.total_key_count;
560                            let e = SstableSort {
561                                compaction_group_id: compaction_group.group_id,
562                                sst_id: sst.sst_id,
563                                delete_ratio: tombstone_delete_ratio,
564                            };
565                            top_k_sstables(top_k, &mut top_tombstone_delete_sst, e);
566                        }
567                    };
568                    let l0 = &compaction_group.l0;
569                    // FIXME: why chaining levels iter leads to segmentation fault?
570                    for level in &l0.sub_levels {
571                        visit_level(level);
572                    }
573                    for level in &compaction_group.levels {
574                        visit_level(level);
575                    }
576                }
577                version.levels.len()
578            })
579            .await;
580
581        let _ = writeln!(s, "number of SSTables: {sst_num}");
582        let _ = writeln!(s, "total size of SSTables (byte): {sst_total_file_size}");
583        let _ = writeln!(s, "number of compaction groups: {compaction_group_num}");
584        use comfy_table::{Row, Table};
585        fn format_table(heap: BinaryHeap<Reverse<SstableSort>>) -> Table {
586            let mut table = Table::new();
587            table.set_header({
588                let mut row = Row::new();
589                row.add_cell("compaction group id".into());
590                row.add_cell("sstable id".into());
591                row.add_cell("delete ratio".into());
592                row
593            });
594            for sst in &heap.into_sorted_vec() {
595                let mut row = Row::new();
596                row.add_cell(sst.0.compaction_group_id.into());
597                row.add_cell(sst.0.sst_id.into());
598                row.add_cell(format!("{:.2}%", sst.0.delete_ratio as f64 / 100.0).into());
599                table.add_row(row);
600            }
601            table
602        }
603        let _ = writeln!(s);
604        let _ = writeln!(s, "top tombstone delete ratio");
605        let _ = writeln!(s, "{}", format_table(top_tombstone_delete_sst));
606        let _ = writeln!(s);
607
608        let _ = writeln!(s);
609        self.write_storage_prometheus(s).await;
610    }
611
612    async fn write_streaming_prometheus(&self, s: &mut String) {
613        let _ = writeln!(s, "top sources by throughput (rows/s)");
614        let query = format!(
615            "topk(3, sum(rate(stream_source_output_rows_counts{{{}}}[10m]))by (source_name))",
616            self.prometheus_selector
617        );
618        self.write_instant_vector_impl(s, &query, vec!["source_name"])
619            .await;
620
621        let _ = writeln!(s);
622        let _ = writeln!(s, "top materialized views by throughput (rows/s)");
623        let query = format!(
624            "topk(3, sum(rate(stream_mview_input_row_count{{{}}}[10m]))by (table_id))",
625            self.prometheus_selector
626        );
627        self.write_instant_vector_impl(s, &query, vec!["table_id"])
628            .await;
629
630        let _ = writeln!(s);
631        let _ = writeln!(s, "top join executor by matched rows");
632        let query = format!(
633            "topk(3, histogram_quantile(0.9, sum(rate(stream_join_matched_join_keys_bucket{{{}}}[10m])) by (le, fragment_id, table_id)))",
634            self.prometheus_selector
635        );
636        self.write_instant_vector_impl(s, &query, vec!["table_id", "fragment_id"])
637            .await;
638    }
639
640    async fn write_storage_prometheus(&self, s: &mut String) {
641        let _ = writeln!(s, "top Hummock Get by duration (second)");
642        let query = format!(
643            "topk(3, histogram_quantile(0.9, sum(rate(state_store_get_duration_bucket{{{}}}[10m])) by (le, table_id)))",
644            self.prometheus_selector
645        );
646        self.write_instant_vector_impl(s, &query, vec!["table_id"])
647            .await;
648
649        let _ = writeln!(s);
650        let _ = writeln!(s, "top Hummock Iter Init by duration (second)");
651        let query = format!(
652            "topk(3, histogram_quantile(0.9, sum(rate(state_store_iter_init_duration_bucket{{{}}}[10m])) by (le, table_id)))",
653            self.prometheus_selector
654        );
655        self.write_instant_vector_impl(s, &query, vec!["table_id"])
656            .await;
657
658        let _ = writeln!(s);
659        let _ = writeln!(s, "top table commit flush by size (byte)");
660        let query = format!(
661            "topk(3, sum(rate(storage_commit_write_throughput{{{}}}[10m])) by (table_id))",
662            self.prometheus_selector
663        );
664        self.write_instant_vector_impl(s, &query, vec!["table_id"])
665            .await;
666
667        let _ = writeln!(s);
668        let _ = writeln!(s, "object store read throughput (bytes/s)");
669        let query = format!(
670            "sum(rate(object_store_read_bytes{{{}}}[10m])) by (job, instance)",
671            merge_prometheus_selector([&self.prometheus_selector, "job=~\"compute|compactor\""])
672        );
673        self.write_instant_vector_impl(s, &query, vec!["job", "instance"])
674            .await;
675
676        let _ = writeln!(s);
677        let _ = writeln!(s, "object store write throughput (bytes/s)");
678        let query = format!(
679            "sum(rate(object_store_write_bytes{{{}}}[10m])) by (job, instance)",
680            merge_prometheus_selector([&self.prometheus_selector, "job=~\"compute|compactor\""])
681        );
682        self.write_instant_vector_impl(s, &query, vec!["job", "instance"])
683            .await;
684
685        let _ = writeln!(s);
686        let _ = writeln!(s, "object store operation rate");
687        let query = format!(
688            "sum(rate(object_store_operation_latency_count{{{}}}[10m])) by (le, type, job, instance)",
689            merge_prometheus_selector([
690                &self.prometheus_selector,
691                "job=~\"compute|compactor\", type!~\"streaming_upload_write_bytes|streaming_read_read_bytes|streaming_read\""
692            ])
693        );
694        self.write_instant_vector_impl(s, &query, vec!["type", "job", "instance"])
695            .await;
696
697        let _ = writeln!(s);
698        let _ = writeln!(s, "object store operation duration (second)");
699        let query = format!(
700            "histogram_quantile(0.9, sum(rate(object_store_operation_latency_bucket{{{}}}[10m])) by (le, type, job, instance))",
701            merge_prometheus_selector([
702                &self.prometheus_selector,
703                "job=~\"compute|compactor\", type!~\"streaming_upload_write_bytes|streaming_read\""
704            ])
705        );
706        self.write_instant_vector_impl(s, &query, vec!["type", "job", "instance"])
707            .await;
708    }
709
710    async fn write_instant_vector_impl(&self, s: &mut String, query: &str, labels: Vec<&str>) {
711        let Some(ref client) = self.prometheus_client else {
712            return;
713        };
714        if let Ok(Vector(instant_vec)) = client
715            .query(query)
716            .get()
717            .await
718            .map(|result| result.into_inner().0)
719        {
720            for i in instant_vec {
721                let l = labels
722                    .iter()
723                    .map(|label| {
724                        format!(
725                            "{}={}",
726                            *label,
727                            i.metric()
728                                .get(*label)
729                                .map(|s| s.as_str())
730                                .unwrap_or_default()
731                        )
732                    })
733                    .join(",");
734                let _ = writeln!(s, "{}: {:.3}", l, i.sample().value());
735            }
736        }
737    }
738
739    async fn write_await_tree(&self, s: &mut String, actor_traces_format: ActorTracesFormat) {
740        let all = dump_cluster_await_tree(
741            &self.metadata_manager,
742            &self.await_tree_reg,
743            actor_traces_format,
744        )
745        .await;
746
747        if let Ok(all) = all {
748            write!(s, "{}", all.output()).unwrap();
749        } else {
750            tracing::warn!("failed to dump await tree");
751        }
752    }
753
754    async fn write_table_definition(&self, s: &mut String) -> MetaResult<()> {
755        let sources = self
756            .metadata_manager
757            .catalog_controller
758            .list_sources()
759            .await?
760            .into_iter()
761            .map(|s| {
762                (
763                    s.id.into(),
764                    (
765                        s.name,
766                        s.database_id,
767                        s.schema_id,
768                        s.definition,
769                        s.created_at_epoch,
770                    ),
771                )
772            })
773            .collect::<BTreeMap<_, _>>();
774        let mut user_tables = BTreeMap::new();
775        let mut mvs = BTreeMap::new();
776        let mut indexes = BTreeMap::new();
777        let mut internal_tables = BTreeMap::new();
778        {
779            let grouped = self
780                .metadata_manager
781                .catalog_controller
782                .list_all_state_tables()
783                .await?
784                .into_iter()
785                .chunk_by(|t| t.table_type());
786            for (table_type, tables) in &grouped {
787                let tables = tables.into_iter().map(|t| {
788                    (
789                        t.id.into(),
790                        (
791                            t.name,
792                            t.database_id,
793                            t.schema_id,
794                            t.definition,
795                            t.created_at_epoch,
796                        ),
797                    )
798                });
799                match table_type {
800                    PbTableType::Table => user_tables.extend(tables),
801                    PbTableType::MaterializedView => mvs.extend(tables),
802                    PbTableType::Index | PbTableType::VectorIndex => indexes.extend(tables),
803                    PbTableType::Internal => internal_tables.extend(tables),
804                    PbTableType::Unspecified => {
805                        tracing::error!("unspecified table type: {:?}", tables.collect_vec());
806                    }
807                }
808            }
809        }
810        let sinks = self
811            .metadata_manager
812            .catalog_controller
813            .list_sinks()
814            .await?
815            .into_iter()
816            .map(|s| {
817                (
818                    s.id.into(),
819                    (
820                        s.name,
821                        s.database_id,
822                        s.schema_id,
823                        s.definition,
824                        s.created_at_epoch,
825                    ),
826                )
827            })
828            .collect::<BTreeMap<_, _>>();
829        let views = self
830            .metadata_manager
831            .catalog_controller
832            .list_views()
833            .await?
834            .into_iter()
835            .map(|v| {
836                (
837                    v.id.into(),
838                    (v.name, v.database_id, v.schema_id, v.sql, None),
839                )
840            })
841            .collect::<BTreeMap<_, _>>();
842        let mut streaming_jobs = self
843            .metadata_manager
844            .catalog_controller
845            .list_streaming_job_infos()
846            .await?;
847        streaming_jobs.sort_by_key(|info| (info.obj_type as usize, info.job_id));
848        {
849            use comfy_table::{Row, Table};
850            let mut table = Table::new();
851            table.set_header({
852                let mut row = Row::new();
853                row.add_cell("job_id".into());
854                row.add_cell("name".into());
855                row.add_cell("obj_type".into());
856                row.add_cell("state".into());
857                row.add_cell("parallelism".into());
858                row.add_cell("max_parallelism".into());
859                row.add_cell("resource_group".into());
860                row.add_cell("database_id".into());
861                row.add_cell("schema_id".into());
862                row.add_cell("config_override".into());
863                row
864            });
865            for job in streaming_jobs {
866                let mut row = Row::new();
867                row.add_cell(job.job_id.into());
868                row.add_cell(job.name.into());
869                row.add_cell(job.obj_type.as_str().into());
870                row.add_cell(format_job_status(job.job_status).into());
871                row.add_cell(format_streaming_parallelism(&job.parallelism).into());
872                row.add_cell(job.max_parallelism.into());
873                row.add_cell(job.resource_group.into());
874                row.add_cell(job.database_id.into());
875                row.add_cell(job.schema_id.into());
876                row.add_cell(job.config_override.into());
877                table.add_row(row);
878            }
879            let _ = writeln!(s);
880            let _ = writeln!(s, "STREAMING JOB");
881            let _ = writeln!(s, "{table}");
882        }
883        let catalogs = [
884            ("SOURCE", sources),
885            ("TABLE", user_tables),
886            ("MATERIALIZED VIEW", mvs),
887            ("INDEX", indexes),
888            ("SINK", sinks),
889            ("VIEW", views),
890            ("INTERNAL TABLE", internal_tables),
891        ];
892        let mut obj_id_to_name: HashMap<ObjectId, _> = HashMap::new();
893        for (title, items) in catalogs {
894            use comfy_table::{Row, Table};
895            let mut table = Table::new();
896            table.set_header({
897                let mut row = Row::new();
898                row.add_cell("id".into());
899                row.add_cell("name".into());
900                row.add_cell("database_id".into());
901                row.add_cell("schema_id".into());
902                row.add_cell("created_at".into());
903                row.add_cell("definition".into());
904                row
905            });
906            for (id, (name, database_id, schema_id, definition, created_at_epoch)) in items {
907                obj_id_to_name.insert(id, name.clone());
908                let mut row = Row::new();
909                let may_redact = redact_sql(&definition, self.redact_sql_option_keywords.clone())
910                    .unwrap_or_else(|| "[REDACTED]".into());
911                let created_at = if let Some(created_at_epoch) = created_at_epoch {
912                    format!("{}", Epoch::from(created_at_epoch).as_timestamptz())
913                } else {
914                    "".into()
915                };
916                row.add_cell(id.into());
917                row.add_cell(name.into());
918                row.add_cell(database_id.into());
919                row.add_cell(schema_id.into());
920                row.add_cell(created_at.into());
921                row.add_cell(may_redact.into());
922                table.add_row(row);
923            }
924            let _ = writeln!(s);
925            let _ = writeln!(s, "{title}");
926            let _ = writeln!(s, "{table}");
927        }
928
929        let actors = self
930            .metadata_manager
931            .catalog_controller
932            .list_actor_info()
933            .await?
934            .into_iter()
935            .map(|(actor_id, fragment_id, job_id, schema_id, obj_type)| {
936                (
937                    actor_id,
938                    (
939                        fragment_id,
940                        job_id,
941                        schema_id,
942                        obj_type,
943                        obj_id_to_name.get(&job_id).cloned().unwrap_or_default(),
944                    ),
945                )
946            })
947            .collect::<BTreeMap<_, _>>();
948
949        use comfy_table::{Row, Table};
950        let mut table = Table::new();
951        table.set_header({
952            let mut row = Row::new();
953            row.add_cell("id".into());
954            row.add_cell("fragment_id".into());
955            row.add_cell("job_id".into());
956            row.add_cell("schema_id".into());
957            row.add_cell("type".into());
958            row.add_cell("name".into());
959            row
960        });
961        for (actor_id, (fragment_id, job_id, schema_id, ddl_type, name)) in actors {
962            let mut row = Row::new();
963            row.add_cell(actor_id.into());
964            row.add_cell(fragment_id.into());
965            row.add_cell(job_id.into());
966            row.add_cell(schema_id.into());
967            row.add_cell(ddl_type.as_str().into());
968            row.add_cell(name.into());
969            table.add_row(row);
970        }
971        let _ = writeln!(s);
972        let _ = writeln!(s, "ACTOR");
973        let _ = writeln!(s, "{table}");
974        Ok(())
975    }
976
977    fn write_license(&self, s: &mut String) {
978        use comfy_table::presets::ASCII_BORDERS_ONLY;
979        use comfy_table::{ContentArrangement, Row, Table};
980
981        let mut table = Table::new();
982        table.load_preset(ASCII_BORDERS_ONLY);
983        table.set_content_arrangement(ContentArrangement::Dynamic);
984        table.set_header({
985            let mut row = Row::new();
986            row.add_cell("field".into());
987            row.add_cell("value".into());
988            row
989        });
990
991        match LicenseManager::get().license() {
992            Ok(license) => {
993                let fmt_option = |value: Option<u64>| match value {
994                    Some(v) => v.to_string(),
995                    None => "unlimited".to_owned(),
996                };
997
998                let expires_at = if license.exp == u64::MAX {
999                    "never".to_owned()
1000                } else {
1001                    let exp_i64 = license.exp as i64;
1002                    chrono::DateTime::<chrono::Utc>::from_timestamp(exp_i64, 0)
1003                        .map(|ts| ts.to_rfc3339())
1004                        .unwrap_or_else(|| format!("invalid ({})", license.exp))
1005                };
1006
1007                let mut row = Row::new();
1008                row.add_cell("status".into());
1009                row.add_cell("valid".into());
1010                table.add_row(row);
1011
1012                let mut row = Row::new();
1013                row.add_cell("tier".into());
1014                row.add_cell(license.tier.name().into());
1015                table.add_row(row);
1016
1017                let mut row = Row::new();
1018                row.add_cell("expires_at".into());
1019                row.add_cell(expires_at.into());
1020                table.add_row(row);
1021
1022                let mut row = Row::new();
1023                row.add_cell("rwu_limit".into());
1024                row.add_cell(fmt_option(license.rwu_limit.map(|v| v.get())).into());
1025                table.add_row(row);
1026
1027                let mut row = Row::new();
1028                row.add_cell("cpu_core_limit".into());
1029                row.add_cell(fmt_option(license.cpu_core_limit()).into());
1030                table.add_row(row);
1031
1032                let mut row = Row::new();
1033                row.add_cell("memory_limit_bytes".into());
1034                row.add_cell(fmt_option(license.memory_limit()).into());
1035                table.add_row(row);
1036
1037                let mut features: Vec<_> = license
1038                    .tier
1039                    .available_features()
1040                    .map(|f| f.name())
1041                    .collect();
1042                features.sort_unstable();
1043                let feature_summary = format_features(&features);
1044
1045                let mut row = Row::new();
1046                row.add_cell("available_features".into());
1047                row.add_cell(feature_summary.into());
1048                table.add_row(row);
1049            }
1050            Err(error) => {
1051                let mut row = Row::new();
1052                row.add_cell("status".into());
1053                row.add_cell("invalid".into());
1054                table.add_row(row);
1055
1056                let mut row = Row::new();
1057                row.add_cell("error".into());
1058                row.add_cell(error.to_report_string().into());
1059                table.add_row(row);
1060            }
1061        }
1062
1063        let _ = writeln!(s, "LICENSE");
1064        let _ = writeln!(s, "{table}");
1065    }
1066
1067    async fn write_params(&self, s: &mut String) {
1068        let params = self.system_params_controller.get_params().await;
1069
1070        use comfy_table::{Row, Table};
1071        let mut table = Table::new();
1072        table.set_header({
1073            let mut row = Row::new();
1074            row.add_cell("key".into());
1075            row.add_cell("value".into());
1076            row
1077        });
1078
1079        let mut row = Row::new();
1080        row.add_cell("barrier_interval_ms".into());
1081        row.add_cell(params.barrier_interval_ms().to_string().into());
1082        table.add_row(row);
1083
1084        let mut row = Row::new();
1085        row.add_cell("checkpoint_frequency".into());
1086        row.add_cell(params.checkpoint_frequency().to_string().into());
1087        table.add_row(row);
1088
1089        let mut row = Row::new();
1090        row.add_cell("state_store".into());
1091        row.add_cell(params.state_store().to_owned().into());
1092        table.add_row(row);
1093
1094        let mut row = Row::new();
1095        row.add_cell("data_directory".into());
1096        row.add_cell(params.data_directory().to_owned().into());
1097        table.add_row(row);
1098
1099        let mut row = Row::new();
1100        row.add_cell("max_concurrent_creating_streaming_jobs".into());
1101        row.add_cell(
1102            params
1103                .max_concurrent_creating_streaming_jobs()
1104                .to_string()
1105                .into(),
1106        );
1107        table.add_row(row);
1108
1109        let mut row = Row::new();
1110        row.add_cell("time_travel_retention_ms".into());
1111        row.add_cell(params.time_travel_retention_ms().to_string().into());
1112        table.add_row(row);
1113
1114        let mut row = Row::new();
1115        row.add_cell("adaptive_parallelism_strategy".into());
1116        row.add_cell(params.adaptive_parallelism_strategy().to_string().into());
1117        table.add_row(row);
1118
1119        let mut row = Row::new();
1120        row.add_cell("per_database_isolation".into());
1121        row.add_cell(params.per_database_isolation().to_string().into());
1122        table.add_row(row);
1123
1124        let _ = writeln!(s, "SYSTEM PARAMS");
1125        let _ = writeln!(s, "{table}");
1126    }
1127}
1128
1129fn try_add_cell<T: Into<comfy_table::Cell>>(row: &mut comfy_table::Row, t: Option<T>) {
1130    match t {
1131        Some(t) => {
1132            row.add_cell(t.into());
1133        }
1134        None => {
1135            row.add_cell("".into());
1136        }
1137    }
1138}
1139
1140fn merge_prometheus_selector<'a>(selectors: impl IntoIterator<Item = &'a str>) -> String {
1141    selectors.into_iter().filter(|s| !s.is_empty()).join(",")
1142}
1143
1144fn redact_sql(sql: &str, keywords: RedactSqlOptionKeywordsRef) -> Option<String> {
1145    match Parser::parse_sql(sql) {
1146        Ok(sqls) => Some(
1147            sqls.into_iter()
1148                .map(|sql| sql.to_redacted_string(keywords.clone()))
1149                .join(";"),
1150        ),
1151        Err(_) => None,
1152    }
1153}
1154
1155fn format_features(features: &[&'static str]) -> String {
1156    if features.is_empty() {
1157        return "(none)".into();
1158    }
1159
1160    const PER_LINE: usize = 6;
1161    features
1162        .chunks(PER_LINE)
1163        .map(|chunk| format!("  {}", chunk.join(", ")))
1164        .collect::<Vec<_>>()
1165        .join("\n")
1166}
1167
1168fn format_job_status(status: JobStatus) -> &'static str {
1169    match status {
1170        JobStatus::Initial => "initial",
1171        JobStatus::Creating => "creating",
1172        JobStatus::Created => "created",
1173    }
1174}
1175
1176fn format_streaming_parallelism(parallelism: &StreamingParallelism) -> String {
1177    match parallelism {
1178        StreamingParallelism::Adaptive => "adaptive".into(),
1179        StreamingParallelism::Fixed(n) => format!("fixed({n})"),
1180        StreamingParallelism::Custom => "custom".into(),
1181    }
1182}