risingwave_meta/manager/
diagnose.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::{Ordering, Reverse};
16use std::collections::{BTreeMap, BinaryHeap, HashMap};
17use std::fmt::Write;
18use std::sync::Arc;
19
20use itertools::Itertools;
21use prometheus_http_query::response::Data::Vector;
22use risingwave_common::id::ObjectId;
23use risingwave_common::system_param::reader::SystemParamsRead;
24use risingwave_common::types::Timestamptz;
25use risingwave_common::util::StackTraceResponseExt;
26use risingwave_common::util::epoch::Epoch;
27use risingwave_hummock_sdk::HummockSstableId;
28use risingwave_hummock_sdk::level::Level;
29use risingwave_license::LicenseManager;
30use risingwave_meta_model::{JobStatus, StreamingParallelism};
31use risingwave_pb::catalog::table::PbTableType;
32use risingwave_pb::common::WorkerType;
33use risingwave_pb::meta::EventLog;
34use risingwave_pb::meta::event_log::Event;
35use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
36use risingwave_sqlparser::ast::RedactSqlOptionKeywordsRef;
37use risingwave_sqlparser::parser::Parser;
38use serde_json::json;
39use thiserror_ext::AsReport;
40
41use crate::MetaResult;
42use crate::controller::system_param::SystemParamsControllerRef;
43use crate::hummock::HummockManagerRef;
44use crate::manager::MetadataManager;
45use crate::manager::event_log::EventLogManagerRef;
46use crate::rpc::await_tree::dump_cluster_await_tree;
47
48pub type DiagnoseCommandRef = Arc<DiagnoseCommand>;
49
50pub struct DiagnoseCommand {
51    metadata_manager: MetadataManager,
52    await_tree_reg: await_tree::Registry,
53    hummock_manger: HummockManagerRef,
54    event_log_manager: EventLogManagerRef,
55    prometheus_client: Option<prometheus_http_query::Client>,
56    prometheus_selector: String,
57    redact_sql_option_keywords: RedactSqlOptionKeywordsRef,
58    system_params_controller: SystemParamsControllerRef,
59}
60
61impl DiagnoseCommand {
62    pub fn new(
63        metadata_manager: MetadataManager,
64        await_tree_reg: await_tree::Registry,
65        hummock_manger: HummockManagerRef,
66        event_log_manager: EventLogManagerRef,
67        prometheus_client: Option<prometheus_http_query::Client>,
68        prometheus_selector: String,
69        redact_sql_option_keywords: RedactSqlOptionKeywordsRef,
70        system_params_controller: SystemParamsControllerRef,
71    ) -> Self {
72        Self {
73            metadata_manager,
74            await_tree_reg,
75            hummock_manger,
76            event_log_manager,
77            prometheus_client,
78            prometheus_selector,
79            redact_sql_option_keywords,
80            system_params_controller,
81        }
82    }
83
84    pub async fn report(&self, actor_traces_format: ActorTracesFormat) -> String {
85        let mut report = String::new();
86        let _ = writeln!(
87            report,
88            "report created at: {}\nversion: {}",
89            chrono::DateTime::<chrono::offset::Utc>::from(std::time::SystemTime::now()),
90            risingwave_common::current_cluster_version(),
91        );
92        let _ = writeln!(report);
93        self.write_license(&mut report);
94        let _ = writeln!(report);
95        self.write_catalog(&mut report).await;
96        let _ = writeln!(report);
97        self.write_worker_nodes(&mut report).await;
98        let _ = writeln!(report);
99        self.write_streaming_prometheus(&mut report).await;
100        let _ = writeln!(report);
101        self.write_storage(&mut report).await;
102        let _ = writeln!(report);
103        self.write_event_logs(&mut report);
104        let _ = writeln!(report);
105        self.write_params(&mut report).await;
106        let _ = writeln!(report);
107        self.write_await_tree(&mut report, actor_traces_format)
108            .await;
109
110        report
111    }
112
113    async fn write_catalog(&self, s: &mut String) {
114        self.write_catalog_inner(s).await;
115        let _ = self.write_table_definition(s).await.inspect_err(|e| {
116            tracing::warn!(
117                error = e.to_report_string(),
118                "failed to display table definition"
119            )
120        });
121    }
122
123    async fn write_catalog_inner(&self, s: &mut String) {
124        let stats = self.metadata_manager.catalog_controller.stats().await;
125
126        let stat = match stats {
127            Ok(stat) => stat,
128            Err(err) => {
129                tracing::warn!(error=?err.as_report(), "failed to get catalog stats");
130                return;
131            }
132        };
133        let _ = writeln!(s, "number of database: {}", stat.database_num);
134        let _ = writeln!(s, "number of fragment: {}", stat.streaming_job_num);
135        let _ = writeln!(s, "number of actor: {}", stat.actor_num);
136        let _ = writeln!(s, "number of source: {}", stat.source_num);
137        let _ = writeln!(s, "number of table: {}", stat.table_num);
138        let _ = writeln!(s, "number of materialized view: {}", stat.mview_num);
139        let _ = writeln!(s, "number of sink: {}", stat.sink_num);
140        let _ = writeln!(s, "number of index: {}", stat.index_num);
141        let _ = writeln!(s, "number of function: {}", stat.function_num);
142
143        self.write_databases(s).await;
144        self.write_schemas(s).await;
145    }
146
147    async fn write_databases(&self, s: &mut String) {
148        let databases = match self
149            .metadata_manager
150            .catalog_controller
151            .list_databases()
152            .await
153        {
154            Ok(databases) => databases,
155            Err(err) => {
156                tracing::warn!(error=?err.as_report(), "failed to list databases");
157                return;
158            }
159        };
160
161        use comfy_table::{Row, Table};
162        let mut table = Table::new();
163        table.set_header({
164            let mut row = Row::new();
165            row.add_cell("id".into());
166            row.add_cell("name".into());
167            row.add_cell("resource_group".into());
168            row.add_cell("barrier_interval_ms".into());
169            row.add_cell("checkpoint_frequency".into());
170            row
171        });
172        for db in databases {
173            let mut row = Row::new();
174            row.add_cell(db.id.into());
175            row.add_cell(db.name.into());
176            row.add_cell(db.resource_group.into());
177            row.add_cell(
178                db.barrier_interval_ms
179                    .map(|v| v.to_string())
180                    .unwrap_or("default".into())
181                    .into(),
182            );
183            row.add_cell(
184                db.checkpoint_frequency
185                    .map(|v| v.to_string())
186                    .unwrap_or("default".into())
187                    .into(),
188            );
189            table.add_row(row);
190        }
191
192        let _ = writeln!(s, "DATABASE");
193        let _ = writeln!(s, "{table}");
194    }
195
196    async fn write_schemas(&self, s: &mut String) {
197        let schemas = match self
198            .metadata_manager
199            .catalog_controller
200            .list_schemas()
201            .await
202        {
203            Ok(schemas) => schemas,
204            Err(err) => {
205                tracing::warn!(error=?err.as_report(), "failed to list schemas");
206                return;
207            }
208        };
209
210        use comfy_table::{Row, Table};
211        let mut table = Table::new();
212        table.set_header({
213            let mut row = Row::new();
214            row.add_cell("id".into());
215            row.add_cell("database_id".into());
216            row.add_cell("name".into());
217            row
218        });
219        for schema in schemas {
220            let mut row = Row::new();
221            row.add_cell(schema.id.into());
222            row.add_cell(schema.database_id.into());
223            row.add_cell(schema.name.into());
224            table.add_row(row);
225        }
226
227        let _ = writeln!(s, "SCHEMA");
228        let _ = writeln!(s, "{table}");
229    }
230
231    async fn write_worker_nodes(&self, s: &mut String) {
232        let Ok(worker_actor_count) = self.metadata_manager.worker_actor_count() else {
233            tracing::warn!("failed to get worker actor count");
234            return;
235        };
236
237        use comfy_table::{Row, Table};
238        let Ok(worker_nodes) = self.metadata_manager.list_worker_node(None, None).await else {
239            tracing::warn!("failed to get worker nodes");
240            return;
241        };
242        let mut table = Table::new();
243        table.set_header({
244            let mut row = Row::new();
245            row.add_cell("id".into());
246            row.add_cell("host".into());
247            row.add_cell("hostname".into());
248            row.add_cell("type".into());
249            row.add_cell("state".into());
250            row.add_cell("parallelism".into());
251            row.add_cell("resource_group".into());
252            row.add_cell("is_streaming".into());
253            row.add_cell("is_serving".into());
254            row.add_cell("is_iceberg_compactor".into());
255            row.add_cell("rw_version".into());
256            row.add_cell("total_memory_bytes".into());
257            row.add_cell("total_cpu_cores".into());
258            row.add_cell("started_at".into());
259            row.add_cell("actor_count".into());
260            row
261        });
262        for worker_node in worker_nodes {
263            let mut row = Row::new();
264            row.add_cell(worker_node.id.into());
265            try_add_cell(
266                &mut row,
267                worker_node
268                    .host
269                    .as_ref()
270                    .map(|h| format!("{}:{}", h.host, h.port)),
271            );
272            try_add_cell(
273                &mut row,
274                worker_node.resource.as_ref().map(|r| r.hostname.clone()),
275            );
276            try_add_cell(
277                &mut row,
278                worker_node.get_type().ok().map(|t| t.as_str_name()),
279            );
280            try_add_cell(
281                &mut row,
282                worker_node.get_state().ok().map(|s| s.as_str_name()),
283            );
284            try_add_cell(&mut row, worker_node.parallelism());
285            try_add_cell(
286                &mut row,
287                worker_node
288                    .property
289                    .as_ref()
290                    .map(|p| p.resource_group.clone().unwrap_or("".to_owned())),
291            );
292            // is_streaming and is_serving are only meaningful for ComputeNode
293            let (is_streaming, is_serving) = {
294                if let Ok(t) = worker_node.get_type()
295                    && t == WorkerType::ComputeNode
296                {
297                    (
298                        worker_node.property.as_ref().map(|p| p.is_streaming),
299                        worker_node.property.as_ref().map(|p| p.is_serving),
300                    )
301                } else {
302                    (None, None)
303                }
304            };
305            try_add_cell(&mut row, is_streaming);
306            try_add_cell(&mut row, is_serving);
307            // is_iceberg_compactor is only meaningful for Compactor worker type
308            let is_iceberg_compactor = {
309                if let Ok(t) = worker_node.get_type()
310                    && t == WorkerType::Compactor
311                {
312                    worker_node
313                        .property
314                        .as_ref()
315                        .map(|p| p.is_iceberg_compactor)
316                } else {
317                    None
318                }
319            };
320            try_add_cell(&mut row, is_iceberg_compactor);
321            try_add_cell(
322                &mut row,
323                worker_node.resource.as_ref().map(|r| r.rw_version.clone()),
324            );
325            try_add_cell(
326                &mut row,
327                worker_node.resource.as_ref().map(|r| r.total_memory_bytes),
328            );
329            try_add_cell(
330                &mut row,
331                worker_node.resource.as_ref().map(|r| r.total_cpu_cores),
332            );
333            try_add_cell(
334                &mut row,
335                worker_node
336                    .started_at
337                    .and_then(|ts| Timestamptz::from_secs(ts as _).map(|t| t.to_string())),
338            );
339            let actor_count = {
340                if let Ok(t) = worker_node.get_type()
341                    && t != WorkerType::ComputeNode
342                {
343                    None
344                } else {
345                    match worker_actor_count.get(&worker_node.id) {
346                        None => Some(0),
347                        Some(c) => Some(*c),
348                    }
349                }
350            };
351            try_add_cell(&mut row, actor_count);
352            table.add_row(row);
353        }
354        let _ = writeln!(s, "{table}");
355    }
356
357    fn write_event_logs(&self, s: &mut String) {
358        let event_logs = self
359            .event_log_manager
360            .list_event_logs()
361            .into_iter()
362            .sorted_by(|a, b| {
363                a.timestamp
364                    .unwrap_or(0)
365                    .cmp(&b.timestamp.unwrap_or(0))
366                    .reverse()
367            })
368            .collect_vec();
369
370        let _ = writeln!(s, "latest barrier completions");
371        Self::write_event_logs_impl(
372            s,
373            event_logs.iter(),
374            |e| {
375                let Event::BarrierComplete(info) = e else {
376                    return None;
377                };
378                Some(json!(info).to_string())
379            },
380            Some(10),
381        );
382
383        let _ = writeln!(s);
384        let _ = writeln!(s, "latest barrier collection failures");
385        Self::write_event_logs_impl(
386            s,
387            event_logs.iter(),
388            |e| {
389                let Event::CollectBarrierFail(info) = e else {
390                    return None;
391                };
392                Some(json!(info).to_string())
393            },
394            Some(3),
395        );
396
397        let _ = writeln!(s);
398        let _ = writeln!(s, "latest barrier injection failures");
399        Self::write_event_logs_impl(
400            s,
401            event_logs.iter(),
402            |e| {
403                let Event::InjectBarrierFail(info) = e else {
404                    return None;
405                };
406                Some(json!(info).to_string())
407            },
408            Some(3),
409        );
410
411        let _ = writeln!(s);
412        let _ = writeln!(s, "latest worker node panics");
413        Self::write_event_logs_impl(
414            s,
415            event_logs.iter(),
416            |e| {
417                let Event::WorkerNodePanic(info) = e else {
418                    return None;
419                };
420                Some(json!(info).to_string())
421            },
422            Some(10),
423        );
424
425        let _ = writeln!(s);
426        let _ = writeln!(s, "latest create stream job failures");
427        Self::write_event_logs_impl(
428            s,
429            event_logs.iter(),
430            |e| {
431                let Event::CreateStreamJobFail(info) = e else {
432                    return None;
433                };
434                Some(json!(info).to_string())
435            },
436            Some(3),
437        );
438
439        let _ = writeln!(s);
440        let _ = writeln!(s, "latest dirty stream job clear-ups");
441        Self::write_event_logs_impl(
442            s,
443            event_logs.iter(),
444            |e| {
445                let Event::DirtyStreamJobClear(info) = e else {
446                    return None;
447                };
448                Some(json!(info).to_string())
449            },
450            Some(3),
451        );
452    }
453
454    fn write_event_logs_impl<'a, F>(
455        s: &mut String,
456        event_logs: impl Iterator<Item = &'a EventLog>,
457        get_event_info: F,
458        limit: Option<usize>,
459    ) where
460        F: Fn(&Event) -> Option<String>,
461    {
462        use comfy_table::{Row, Table};
463        let mut table = Table::new();
464        table.set_header({
465            let mut row = Row::new();
466            row.add_cell("created_at".into());
467            row.add_cell("info".into());
468            row
469        });
470        let mut row_count = 0;
471        for event_log in event_logs {
472            let Some(ref inner) = event_log.event else {
473                continue;
474            };
475            if let Some(limit) = limit
476                && row_count >= limit
477            {
478                break;
479            }
480            let mut row = Row::new();
481            let ts = event_log
482                .timestamp
483                .and_then(|ts| Timestamptz::from_millis(ts as _).map(|ts| ts.to_string()));
484            try_add_cell(&mut row, ts);
485            if let Some(info) = get_event_info(inner) {
486                row.add_cell(info.into());
487                row_count += 1;
488            } else {
489                continue;
490            }
491            table.add_row(row);
492        }
493        let _ = writeln!(s, "{table}");
494    }
495
496    async fn write_storage(&self, s: &mut String) {
497        let mut sst_num = 0;
498        let mut sst_total_file_size = 0;
499        let back_pressured_compaction_groups = self
500            .hummock_manger
501            .write_limits()
502            .await
503            .into_iter()
504            .filter_map(|(k, v)| {
505                if v.table_ids.is_empty() {
506                    None
507                } else {
508                    Some(k)
509                }
510            })
511            .join(",");
512        if !back_pressured_compaction_groups.is_empty() {
513            let _ = writeln!(
514                s,
515                "back pressured compaction groups: {back_pressured_compaction_groups}"
516            );
517        }
518
519        #[derive(PartialEq, Eq)]
520        struct SstableSort {
521            compaction_group_id: u64,
522            sst_id: HummockSstableId,
523            delete_ratio: u64,
524        }
525        impl PartialOrd for SstableSort {
526            fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
527                Some(self.cmp(other))
528            }
529        }
530        impl Ord for SstableSort {
531            fn cmp(&self, other: &Self) -> Ordering {
532                self.delete_ratio.cmp(&other.delete_ratio)
533            }
534        }
535        fn top_k_sstables(
536            top_k: usize,
537            heap: &mut BinaryHeap<Reverse<SstableSort>>,
538            e: SstableSort,
539        ) {
540            if heap.len() < top_k {
541                heap.push(Reverse(e));
542            } else if let Some(mut p) = heap.peek_mut()
543                && e.delete_ratio > p.0.delete_ratio
544            {
545                *p = Reverse(e);
546            }
547        }
548
549        let top_k = 10;
550        let mut top_tombstone_delete_sst = BinaryHeap::with_capacity(top_k);
551        let compaction_group_num = self
552            .hummock_manger
553            .on_current_version(|version| {
554                for compaction_group in version.levels.values() {
555                    let mut visit_level = |level: &Level| {
556                        sst_num += level.table_infos.len();
557                        sst_total_file_size +=
558                            level.table_infos.iter().map(|t| t.sst_size).sum::<u64>();
559                        for sst in &level.table_infos {
560                            if sst.total_key_count == 0 {
561                                continue;
562                            }
563                            let tombstone_delete_ratio =
564                                sst.stale_key_count * 10000 / sst.total_key_count;
565                            let e = SstableSort {
566                                compaction_group_id: compaction_group.group_id,
567                                sst_id: sst.sst_id,
568                                delete_ratio: tombstone_delete_ratio,
569                            };
570                            top_k_sstables(top_k, &mut top_tombstone_delete_sst, e);
571                        }
572                    };
573                    let l0 = &compaction_group.l0;
574                    // FIXME: why chaining levels iter leads to segmentation fault?
575                    for level in &l0.sub_levels {
576                        visit_level(level);
577                    }
578                    for level in &compaction_group.levels {
579                        visit_level(level);
580                    }
581                }
582                version.levels.len()
583            })
584            .await;
585
586        let _ = writeln!(s, "number of SSTables: {sst_num}");
587        let _ = writeln!(s, "total size of SSTables (byte): {sst_total_file_size}");
588        let _ = writeln!(s, "number of compaction groups: {compaction_group_num}");
589        use comfy_table::{Row, Table};
590        fn format_table(heap: BinaryHeap<Reverse<SstableSort>>) -> Table {
591            let mut table = Table::new();
592            table.set_header({
593                let mut row = Row::new();
594                row.add_cell("compaction group id".into());
595                row.add_cell("sstable id".into());
596                row.add_cell("delete ratio".into());
597                row
598            });
599            for sst in &heap.into_sorted_vec() {
600                let mut row = Row::new();
601                row.add_cell(sst.0.compaction_group_id.into());
602                row.add_cell(sst.0.sst_id.into());
603                row.add_cell(format!("{:.2}%", sst.0.delete_ratio as f64 / 100.0).into());
604                table.add_row(row);
605            }
606            table
607        }
608        let _ = writeln!(s);
609        let _ = writeln!(s, "top tombstone delete ratio");
610        let _ = writeln!(s, "{}", format_table(top_tombstone_delete_sst));
611        let _ = writeln!(s);
612
613        let _ = writeln!(s);
614        self.write_storage_prometheus(s).await;
615    }
616
617    async fn write_streaming_prometheus(&self, s: &mut String) {
618        let _ = writeln!(s, "top sources by throughput (rows/s)");
619        let query = format!(
620            "topk(3, sum(rate(stream_source_output_rows_counts{{{}}}[10m]))by (source_name))",
621            self.prometheus_selector
622        );
623        self.write_instant_vector_impl(s, &query, vec!["source_name"])
624            .await;
625
626        let _ = writeln!(s);
627        let _ = writeln!(s, "top materialized views by throughput (rows/s)");
628        let query = format!(
629            "topk(3, sum(rate(stream_mview_input_row_count{{{}}}[10m]))by (table_id))",
630            self.prometheus_selector
631        );
632        self.write_instant_vector_impl(s, &query, vec!["table_id"])
633            .await;
634
635        let _ = writeln!(s);
636        let _ = writeln!(s, "top join executor by matched rows");
637        let query = format!(
638            "topk(3, histogram_quantile(0.9, sum(rate(stream_join_matched_join_keys_bucket{{{}}}[10m])) by (le, fragment_id, table_id)))",
639            self.prometheus_selector
640        );
641        self.write_instant_vector_impl(s, &query, vec!["table_id", "fragment_id"])
642            .await;
643    }
644
645    async fn write_storage_prometheus(&self, s: &mut String) {
646        let _ = writeln!(s, "top Hummock Get by duration (second)");
647        let query = format!(
648            "topk(3, histogram_quantile(0.9, sum(rate(state_store_get_duration_bucket{{{}}}[10m])) by (le, table_id)))",
649            self.prometheus_selector
650        );
651        self.write_instant_vector_impl(s, &query, vec!["table_id"])
652            .await;
653
654        let _ = writeln!(s);
655        let _ = writeln!(s, "top Hummock Iter Init by duration (second)");
656        let query = format!(
657            "topk(3, histogram_quantile(0.9, sum(rate(state_store_iter_init_duration_bucket{{{}}}[10m])) by (le, table_id)))",
658            self.prometheus_selector
659        );
660        self.write_instant_vector_impl(s, &query, vec!["table_id"])
661            .await;
662
663        let _ = writeln!(s);
664        let _ = writeln!(s, "top table commit flush by size (byte)");
665        let query = format!(
666            "topk(3, sum(rate(storage_commit_write_throughput{{{}}}[10m])) by (table_id))",
667            self.prometheus_selector
668        );
669        self.write_instant_vector_impl(s, &query, vec!["table_id"])
670            .await;
671
672        let _ = writeln!(s);
673        let _ = writeln!(s, "object store read throughput (bytes/s)");
674        let query = format!(
675            "sum(rate(object_store_read_bytes{{{}}}[10m])) by (job, instance)",
676            merge_prometheus_selector([&self.prometheus_selector, "job=~\"compute|compactor\""])
677        );
678        self.write_instant_vector_impl(s, &query, vec!["job", "instance"])
679            .await;
680
681        let _ = writeln!(s);
682        let _ = writeln!(s, "object store write throughput (bytes/s)");
683        let query = format!(
684            "sum(rate(object_store_write_bytes{{{}}}[10m])) by (job, instance)",
685            merge_prometheus_selector([&self.prometheus_selector, "job=~\"compute|compactor\""])
686        );
687        self.write_instant_vector_impl(s, &query, vec!["job", "instance"])
688            .await;
689
690        let _ = writeln!(s);
691        let _ = writeln!(s, "object store operation rate");
692        let query = format!(
693            "sum(rate(object_store_operation_latency_count{{{}}}[10m])) by (le, type, job, instance)",
694            merge_prometheus_selector([
695                &self.prometheus_selector,
696                "job=~\"compute|compactor\", type!~\"streaming_upload_write_bytes|streaming_read_read_bytes|streaming_read\""
697            ])
698        );
699        self.write_instant_vector_impl(s, &query, vec!["type", "job", "instance"])
700            .await;
701
702        let _ = writeln!(s);
703        let _ = writeln!(s, "object store operation duration (second)");
704        let query = format!(
705            "histogram_quantile(0.9, sum(rate(object_store_operation_latency_bucket{{{}}}[10m])) by (le, type, job, instance))",
706            merge_prometheus_selector([
707                &self.prometheus_selector,
708                "job=~\"compute|compactor\", type!~\"streaming_upload_write_bytes|streaming_read\""
709            ])
710        );
711        self.write_instant_vector_impl(s, &query, vec!["type", "job", "instance"])
712            .await;
713    }
714
715    async fn write_instant_vector_impl(&self, s: &mut String, query: &str, labels: Vec<&str>) {
716        let Some(ref client) = self.prometheus_client else {
717            return;
718        };
719        if let Ok(Vector(instant_vec)) = client
720            .query(query)
721            .get()
722            .await
723            .map(|result| result.into_inner().0)
724        {
725            for i in instant_vec {
726                let l = labels
727                    .iter()
728                    .map(|label| {
729                        format!(
730                            "{}={}",
731                            *label,
732                            i.metric()
733                                .get(*label)
734                                .map(|s| s.as_str())
735                                .unwrap_or_default()
736                        )
737                    })
738                    .join(",");
739                let _ = writeln!(s, "{}: {:.3}", l, i.sample().value());
740            }
741        }
742    }
743
744    async fn write_await_tree(&self, s: &mut String, actor_traces_format: ActorTracesFormat) {
745        let all = dump_cluster_await_tree(
746            &self.metadata_manager,
747            &self.await_tree_reg,
748            actor_traces_format,
749        )
750        .await;
751
752        if let Ok(all) = all {
753            write!(s, "{}", all.output()).unwrap();
754        } else {
755            tracing::warn!("failed to dump await tree");
756        }
757    }
758
759    async fn write_table_definition(&self, s: &mut String) -> MetaResult<()> {
760        let sources = self
761            .metadata_manager
762            .catalog_controller
763            .list_sources()
764            .await?
765            .into_iter()
766            .map(|s| {
767                (
768                    s.id.into(),
769                    (
770                        s.name,
771                        s.database_id,
772                        s.schema_id,
773                        s.definition,
774                        s.created_at_epoch,
775                    ),
776                )
777            })
778            .collect::<BTreeMap<_, _>>();
779        let mut user_tables = BTreeMap::new();
780        let mut mvs = BTreeMap::new();
781        let mut indexes = BTreeMap::new();
782        let mut internal_tables = BTreeMap::new();
783        {
784            let grouped = self
785                .metadata_manager
786                .catalog_controller
787                .list_all_state_tables()
788                .await?
789                .into_iter()
790                .chunk_by(|t| t.table_type());
791            for (table_type, tables) in &grouped {
792                let tables = tables.into_iter().map(|t| {
793                    (
794                        t.id.into(),
795                        (
796                            t.name,
797                            t.database_id,
798                            t.schema_id,
799                            t.definition,
800                            t.created_at_epoch,
801                        ),
802                    )
803                });
804                match table_type {
805                    PbTableType::Table => user_tables.extend(tables),
806                    PbTableType::MaterializedView => mvs.extend(tables),
807                    PbTableType::Index | PbTableType::VectorIndex => indexes.extend(tables),
808                    PbTableType::Internal => internal_tables.extend(tables),
809                    PbTableType::Unspecified => {
810                        tracing::error!("unspecified table type: {:?}", tables.collect_vec());
811                    }
812                }
813            }
814        }
815        let sinks = self
816            .metadata_manager
817            .catalog_controller
818            .list_sinks()
819            .await?
820            .into_iter()
821            .map(|s| {
822                (
823                    s.id.into(),
824                    (
825                        s.name,
826                        s.database_id,
827                        s.schema_id,
828                        s.definition,
829                        s.created_at_epoch,
830                    ),
831                )
832            })
833            .collect::<BTreeMap<_, _>>();
834        let views = self
835            .metadata_manager
836            .catalog_controller
837            .list_views()
838            .await?
839            .into_iter()
840            .map(|v| {
841                (
842                    v.id.into(),
843                    (v.name, v.database_id, v.schema_id, v.sql, None),
844                )
845            })
846            .collect::<BTreeMap<_, _>>();
847        let mut streaming_jobs = self
848            .metadata_manager
849            .catalog_controller
850            .list_streaming_job_infos()
851            .await?;
852        streaming_jobs.sort_by_key(|info| (info.obj_type as usize, info.job_id));
853        {
854            use comfy_table::{Row, Table};
855            let mut table = Table::new();
856            table.set_header({
857                let mut row = Row::new();
858                row.add_cell("job_id".into());
859                row.add_cell("name".into());
860                row.add_cell("obj_type".into());
861                row.add_cell("state".into());
862                row.add_cell("parallelism".into());
863                row.add_cell("max_parallelism".into());
864                row.add_cell("resource_group".into());
865                row.add_cell("database_id".into());
866                row.add_cell("schema_id".into());
867                row.add_cell("config_override".into());
868                row
869            });
870            for job in streaming_jobs {
871                let mut row = Row::new();
872                row.add_cell(job.job_id.into());
873                row.add_cell(job.name.into());
874                row.add_cell(job.obj_type.as_str().into());
875                row.add_cell(format_job_status(job.job_status).into());
876                row.add_cell(format_streaming_parallelism(&job.parallelism).into());
877                row.add_cell(job.max_parallelism.into());
878                row.add_cell(job.resource_group.into());
879                row.add_cell(job.database_id.into());
880                row.add_cell(job.schema_id.into());
881                row.add_cell(job.config_override.into());
882                table.add_row(row);
883            }
884            let _ = writeln!(s);
885            let _ = writeln!(s, "STREAMING JOB");
886            let _ = writeln!(s, "{table}");
887        }
888        let catalogs = [
889            ("SOURCE", sources),
890            ("TABLE", user_tables),
891            ("MATERIALIZED VIEW", mvs),
892            ("INDEX", indexes),
893            ("SINK", sinks),
894            ("VIEW", views),
895            ("INTERNAL TABLE", internal_tables),
896        ];
897        let mut obj_id_to_name: HashMap<ObjectId, _> = HashMap::new();
898        for (title, items) in catalogs {
899            use comfy_table::{Row, Table};
900            let mut table = Table::new();
901            table.set_header({
902                let mut row = Row::new();
903                row.add_cell("id".into());
904                row.add_cell("name".into());
905                row.add_cell("database_id".into());
906                row.add_cell("schema_id".into());
907                row.add_cell("created_at".into());
908                row.add_cell("definition".into());
909                row
910            });
911            for (id, (name, database_id, schema_id, definition, created_at_epoch)) in items {
912                obj_id_to_name.insert(id, name.clone());
913                let mut row = Row::new();
914                let may_redact = redact_sql(&definition, self.redact_sql_option_keywords.clone())
915                    .unwrap_or_else(|| "[REDACTED]".into());
916                let created_at = if let Some(created_at_epoch) = created_at_epoch {
917                    format!("{}", Epoch::from(created_at_epoch).as_timestamptz())
918                } else {
919                    "".into()
920                };
921                row.add_cell(id.into());
922                row.add_cell(name.into());
923                row.add_cell(database_id.into());
924                row.add_cell(schema_id.into());
925                row.add_cell(created_at.into());
926                row.add_cell(may_redact.into());
927                table.add_row(row);
928            }
929            let _ = writeln!(s);
930            let _ = writeln!(s, "{title}");
931            let _ = writeln!(s, "{table}");
932        }
933
934        let actors = self
935            .metadata_manager
936            .catalog_controller
937            .list_actor_info()
938            .await?
939            .into_iter()
940            .map(|(actor_id, fragment_id, job_id, schema_id, obj_type)| {
941                (
942                    actor_id,
943                    (
944                        fragment_id,
945                        job_id,
946                        schema_id,
947                        obj_type,
948                        obj_id_to_name.get(&job_id).cloned().unwrap_or_default(),
949                    ),
950                )
951            })
952            .collect::<BTreeMap<_, _>>();
953
954        use comfy_table::{Row, Table};
955        let mut table = Table::new();
956        table.set_header({
957            let mut row = Row::new();
958            row.add_cell("id".into());
959            row.add_cell("fragment_id".into());
960            row.add_cell("job_id".into());
961            row.add_cell("schema_id".into());
962            row.add_cell("type".into());
963            row.add_cell("name".into());
964            row
965        });
966        for (actor_id, (fragment_id, job_id, schema_id, ddl_type, name)) in actors {
967            let mut row = Row::new();
968            row.add_cell(actor_id.into());
969            row.add_cell(fragment_id.into());
970            row.add_cell(job_id.into());
971            row.add_cell(schema_id.into());
972            row.add_cell(ddl_type.as_str().into());
973            row.add_cell(name.into());
974            table.add_row(row);
975        }
976        let _ = writeln!(s);
977        let _ = writeln!(s, "ACTOR");
978        let _ = writeln!(s, "{table}");
979        Ok(())
980    }
981
982    fn write_license(&self, s: &mut String) {
983        use comfy_table::presets::ASCII_BORDERS_ONLY;
984        use comfy_table::{ContentArrangement, Row, Table};
985
986        let mut table = Table::new();
987        table.load_preset(ASCII_BORDERS_ONLY);
988        table.set_content_arrangement(ContentArrangement::Dynamic);
989        table.set_header({
990            let mut row = Row::new();
991            row.add_cell("field".into());
992            row.add_cell("value".into());
993            row
994        });
995
996        match LicenseManager::get().license() {
997            Ok(license) => {
998                let fmt_option = |value: Option<u64>| match value {
999                    Some(v) => v.to_string(),
1000                    None => "unlimited".to_owned(),
1001                };
1002
1003                let expires_at = if license.exp == u64::MAX {
1004                    "never".to_owned()
1005                } else {
1006                    let exp_i64 = license.exp as i64;
1007                    chrono::DateTime::<chrono::Utc>::from_timestamp(exp_i64, 0)
1008                        .map(|ts| ts.to_rfc3339())
1009                        .unwrap_or_else(|| format!("invalid ({})", license.exp))
1010                };
1011
1012                let mut row = Row::new();
1013                row.add_cell("status".into());
1014                row.add_cell("valid".into());
1015                table.add_row(row);
1016
1017                let mut row = Row::new();
1018                row.add_cell("tier".into());
1019                row.add_cell(license.tier.name().into());
1020                table.add_row(row);
1021
1022                let mut row = Row::new();
1023                row.add_cell("expires_at".into());
1024                row.add_cell(expires_at.into());
1025                table.add_row(row);
1026
1027                let mut row = Row::new();
1028                row.add_cell("rwu_limit".into());
1029                row.add_cell(fmt_option(license.rwu_limit.map(|v| v.get())).into());
1030                table.add_row(row);
1031
1032                let mut row = Row::new();
1033                row.add_cell("cpu_core_limit".into());
1034                row.add_cell(fmt_option(license.cpu_core_limit()).into());
1035                table.add_row(row);
1036
1037                let mut row = Row::new();
1038                row.add_cell("memory_limit_bytes".into());
1039                row.add_cell(fmt_option(license.memory_limit()).into());
1040                table.add_row(row);
1041
1042                let mut features: Vec<_> = license
1043                    .tier
1044                    .available_features()
1045                    .map(|f| f.name())
1046                    .collect();
1047                features.sort_unstable();
1048                let feature_summary = format_features(&features);
1049
1050                let mut row = Row::new();
1051                row.add_cell("available_features".into());
1052                row.add_cell(feature_summary.into());
1053                table.add_row(row);
1054            }
1055            Err(error) => {
1056                let mut row = Row::new();
1057                row.add_cell("status".into());
1058                row.add_cell("invalid".into());
1059                table.add_row(row);
1060
1061                let mut row = Row::new();
1062                row.add_cell("error".into());
1063                row.add_cell(error.to_report_string().into());
1064                table.add_row(row);
1065            }
1066        }
1067
1068        let _ = writeln!(s, "LICENSE");
1069        let _ = writeln!(s, "{table}");
1070    }
1071
1072    async fn write_params(&self, s: &mut String) {
1073        let params = self.system_params_controller.get_params().await;
1074
1075        use comfy_table::{Row, Table};
1076        let mut table = Table::new();
1077        table.set_header({
1078            let mut row = Row::new();
1079            row.add_cell("key".into());
1080            row.add_cell("value".into());
1081            row
1082        });
1083
1084        let mut row = Row::new();
1085        row.add_cell("barrier_interval_ms".into());
1086        row.add_cell(params.barrier_interval_ms().to_string().into());
1087        table.add_row(row);
1088
1089        let mut row = Row::new();
1090        row.add_cell("checkpoint_frequency".into());
1091        row.add_cell(params.checkpoint_frequency().to_string().into());
1092        table.add_row(row);
1093
1094        let mut row = Row::new();
1095        row.add_cell("state_store".into());
1096        row.add_cell(params.state_store().to_owned().into());
1097        table.add_row(row);
1098
1099        let mut row = Row::new();
1100        row.add_cell("data_directory".into());
1101        row.add_cell(params.data_directory().to_owned().into());
1102        table.add_row(row);
1103
1104        let mut row = Row::new();
1105        row.add_cell("max_concurrent_creating_streaming_jobs".into());
1106        row.add_cell(
1107            params
1108                .max_concurrent_creating_streaming_jobs()
1109                .to_string()
1110                .into(),
1111        );
1112        table.add_row(row);
1113
1114        let mut row = Row::new();
1115        row.add_cell("time_travel_retention_ms".into());
1116        row.add_cell(params.time_travel_retention_ms().to_string().into());
1117        table.add_row(row);
1118
1119        let mut row = Row::new();
1120        row.add_cell("adaptive_parallelism_strategy".into());
1121        row.add_cell(params.adaptive_parallelism_strategy().to_string().into());
1122        table.add_row(row);
1123
1124        let mut row = Row::new();
1125        row.add_cell("per_database_isolation".into());
1126        row.add_cell(params.per_database_isolation().to_string().into());
1127        table.add_row(row);
1128
1129        let _ = writeln!(s, "SYSTEM PARAMS");
1130        let _ = writeln!(s, "{table}");
1131    }
1132}
1133
1134fn try_add_cell<T: Into<comfy_table::Cell>>(row: &mut comfy_table::Row, t: Option<T>) {
1135    match t {
1136        Some(t) => {
1137            row.add_cell(t.into());
1138        }
1139        None => {
1140            row.add_cell("".into());
1141        }
1142    }
1143}
1144
1145fn merge_prometheus_selector<'a>(selectors: impl IntoIterator<Item = &'a str>) -> String {
1146    selectors.into_iter().filter(|s| !s.is_empty()).join(",")
1147}
1148
1149fn redact_sql(sql: &str, keywords: RedactSqlOptionKeywordsRef) -> Option<String> {
1150    match Parser::parse_sql(sql) {
1151        Ok(sqls) => Some(
1152            sqls.into_iter()
1153                .map(|sql| sql.to_redacted_string(keywords.clone()))
1154                .join(";"),
1155        ),
1156        Err(_) => None,
1157    }
1158}
1159
1160fn format_features(features: &[&'static str]) -> String {
1161    if features.is_empty() {
1162        return "(none)".into();
1163    }
1164
1165    const PER_LINE: usize = 6;
1166    features
1167        .chunks(PER_LINE)
1168        .map(|chunk| format!("  {}", chunk.join(", ")))
1169        .collect::<Vec<_>>()
1170        .join("\n")
1171}
1172
1173fn format_job_status(status: JobStatus) -> &'static str {
1174    match status {
1175        JobStatus::Initial => "initial",
1176        JobStatus::Creating => "creating",
1177        JobStatus::Created => "created",
1178    }
1179}
1180
1181fn format_streaming_parallelism(parallelism: &StreamingParallelism) -> String {
1182    match parallelism {
1183        StreamingParallelism::Adaptive => "adaptive".into(),
1184        StreamingParallelism::Fixed(n) => format!("fixed({n})"),
1185        StreamingParallelism::Custom => "custom".into(),
1186    }
1187}