risingwave_meta/manager/
diagnose.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::{Ordering, Reverse};
16use std::collections::{BTreeMap, BinaryHeap, HashMap};
17use std::fmt::Write;
18use std::sync::Arc;
19
20use itertools::Itertools;
21use prometheus_http_query::response::Data::Vector;
22use risingwave_common::id::ObjectId;
23use risingwave_common::system_param::reader::SystemParamsRead;
24use risingwave_common::types::Timestamptz;
25use risingwave_common::util::StackTraceResponseExt;
26use risingwave_common::util::epoch::Epoch;
27use risingwave_hummock_sdk::level::Level;
28use risingwave_hummock_sdk::{CompactionGroupId, HummockSstableId};
29use risingwave_license::LicenseManager;
30use risingwave_meta_model::{JobStatus, StreamingParallelism};
31use risingwave_pb::catalog::table::PbTableType;
32use risingwave_pb::common::WorkerType;
33use risingwave_pb::meta::EventLog;
34use risingwave_pb::meta::event_log::Event;
35use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
36use risingwave_sqlparser::ast::RedactSqlOptionKeywordsRef;
37use risingwave_sqlparser::parser::Parser;
38use serde_json::json;
39use thiserror_ext::AsReport;
40
41use crate::MetaResult;
42use crate::controller::system_param::SystemParamsControllerRef;
43use crate::hummock::HummockManagerRef;
44use crate::manager::MetadataManager;
45use crate::manager::event_log::EventLogManagerRef;
46use crate::manager::iceberg_compaction::IcebergCompactionManagerRef;
47use crate::rpc::await_tree::dump_cluster_await_tree;
48
49pub type DiagnoseCommandRef = Arc<DiagnoseCommand>;
50
51pub struct DiagnoseCommand {
52    metadata_manager: MetadataManager,
53    await_tree_reg: await_tree::Registry,
54    hummock_manger: HummockManagerRef,
55    iceberg_compaction_manager: IcebergCompactionManagerRef,
56    event_log_manager: EventLogManagerRef,
57    prometheus_client: Option<prometheus_http_query::Client>,
58    prometheus_selector: String,
59    redact_sql_option_keywords: RedactSqlOptionKeywordsRef,
60    system_params_controller: SystemParamsControllerRef,
61}
62
63impl DiagnoseCommand {
64    pub fn new(
65        metadata_manager: MetadataManager,
66        await_tree_reg: await_tree::Registry,
67        hummock_manger: HummockManagerRef,
68        iceberg_compaction_manager: IcebergCompactionManagerRef,
69        event_log_manager: EventLogManagerRef,
70        prometheus_client: Option<prometheus_http_query::Client>,
71        prometheus_selector: String,
72        redact_sql_option_keywords: RedactSqlOptionKeywordsRef,
73        system_params_controller: SystemParamsControllerRef,
74    ) -> Self {
75        Self {
76            metadata_manager,
77            await_tree_reg,
78            hummock_manger,
79            iceberg_compaction_manager,
80            event_log_manager,
81            prometheus_client,
82            prometheus_selector,
83            redact_sql_option_keywords,
84            system_params_controller,
85        }
86    }
87
88    pub async fn report(&self, actor_traces_format: ActorTracesFormat) -> String {
89        let mut report = String::new();
90        let _ = writeln!(
91            report,
92            "report created at: {}\nversion: {}",
93            chrono::DateTime::<chrono::offset::Utc>::from(std::time::SystemTime::now()),
94            risingwave_common::current_cluster_version(),
95        );
96        let _ = writeln!(report);
97        self.write_license(&mut report);
98        let _ = writeln!(report);
99        self.write_catalog(&mut report).await;
100        let _ = writeln!(report);
101        self.write_worker_nodes(&mut report).await;
102        let _ = writeln!(report);
103        self.write_streaming_prometheus(&mut report).await;
104        let _ = writeln!(report);
105        self.write_storage(&mut report).await;
106        let _ = writeln!(report);
107        self.write_iceberg_compaction_schedule(&mut report).await;
108        let _ = writeln!(report);
109        self.write_event_logs(&mut report);
110        let _ = writeln!(report);
111        self.write_params(&mut report).await;
112        let _ = writeln!(report);
113        self.write_await_tree(&mut report, actor_traces_format)
114            .await;
115
116        report
117    }
118
119    async fn write_catalog(&self, s: &mut String) {
120        self.write_catalog_inner(s).await;
121        let _ = self.write_table_definition(s).await.inspect_err(|e| {
122            tracing::warn!(
123                error = e.to_report_string(),
124                "failed to display table definition"
125            )
126        });
127    }
128
129    async fn write_catalog_inner(&self, s: &mut String) {
130        let stats = self.metadata_manager.catalog_controller.stats().await;
131
132        let stat = match stats {
133            Ok(stat) => stat,
134            Err(err) => {
135                tracing::warn!(error=?err.as_report(), "failed to get catalog stats");
136                return;
137            }
138        };
139        let _ = writeln!(s, "number of database: {}", stat.database_num);
140        let _ = writeln!(s, "number of fragment: {}", stat.streaming_job_num);
141        let _ = writeln!(s, "number of actor: {}", stat.actor_num);
142        let _ = writeln!(s, "number of source: {}", stat.source_num);
143        let _ = writeln!(s, "number of table: {}", stat.table_num);
144        let _ = writeln!(s, "number of materialized view: {}", stat.mview_num);
145        let _ = writeln!(s, "number of sink: {}", stat.sink_num);
146        let _ = writeln!(s, "number of index: {}", stat.index_num);
147        let _ = writeln!(s, "number of function: {}", stat.function_num);
148
149        self.write_databases(s).await;
150        self.write_schemas(s).await;
151    }
152
153    async fn write_databases(&self, s: &mut String) {
154        let databases = match self
155            .metadata_manager
156            .catalog_controller
157            .list_databases()
158            .await
159        {
160            Ok(databases) => databases,
161            Err(err) => {
162                tracing::warn!(error=?err.as_report(), "failed to list databases");
163                return;
164            }
165        };
166
167        use comfy_table::{Row, Table};
168        let mut table = Table::new();
169        table.set_header({
170            let mut row = Row::new();
171            row.add_cell("id".into());
172            row.add_cell("name".into());
173            row.add_cell("resource_group".into());
174            row.add_cell("barrier_interval_ms".into());
175            row.add_cell("checkpoint_frequency".into());
176            row
177        });
178        for db in databases {
179            let mut row = Row::new();
180            row.add_cell(db.id.into());
181            row.add_cell(db.name.into());
182            row.add_cell(db.resource_group.into());
183            row.add_cell(
184                db.barrier_interval_ms
185                    .map(|v| v.to_string())
186                    .unwrap_or("default".into())
187                    .into(),
188            );
189            row.add_cell(
190                db.checkpoint_frequency
191                    .map(|v| v.to_string())
192                    .unwrap_or("default".into())
193                    .into(),
194            );
195            table.add_row(row);
196        }
197
198        let _ = writeln!(s, "DATABASE");
199        let _ = writeln!(s, "{table}");
200    }
201
202    async fn write_schemas(&self, s: &mut String) {
203        let schemas = match self
204            .metadata_manager
205            .catalog_controller
206            .list_schemas()
207            .await
208        {
209            Ok(schemas) => schemas,
210            Err(err) => {
211                tracing::warn!(error=?err.as_report(), "failed to list schemas");
212                return;
213            }
214        };
215
216        use comfy_table::{Row, Table};
217        let mut table = Table::new();
218        table.set_header({
219            let mut row = Row::new();
220            row.add_cell("id".into());
221            row.add_cell("database_id".into());
222            row.add_cell("name".into());
223            row
224        });
225        for schema in schemas {
226            let mut row = Row::new();
227            row.add_cell(schema.id.into());
228            row.add_cell(schema.database_id.into());
229            row.add_cell(schema.name.into());
230            table.add_row(row);
231        }
232
233        let _ = writeln!(s, "SCHEMA");
234        let _ = writeln!(s, "{table}");
235    }
236
237    async fn write_worker_nodes(&self, s: &mut String) {
238        let Ok(worker_actor_count) = self.metadata_manager.worker_actor_count() else {
239            tracing::warn!("failed to get worker actor count");
240            return;
241        };
242
243        use comfy_table::{Row, Table};
244        let Ok(worker_nodes) = self.metadata_manager.list_worker_node(None, None).await else {
245            tracing::warn!("failed to get worker nodes");
246            return;
247        };
248        let mut table = Table::new();
249        table.set_header({
250            let mut row = Row::new();
251            row.add_cell("id".into());
252            row.add_cell("host".into());
253            row.add_cell("hostname".into());
254            row.add_cell("type".into());
255            row.add_cell("state".into());
256            row.add_cell("parallelism".into());
257            row.add_cell("resource_group".into());
258            row.add_cell("is_streaming".into());
259            row.add_cell("is_serving".into());
260            row.add_cell("is_iceberg_compactor".into());
261            row.add_cell("rw_version".into());
262            row.add_cell("total_memory_bytes".into());
263            row.add_cell("total_cpu_cores".into());
264            row.add_cell("started_at".into());
265            row.add_cell("actor_count".into());
266            row
267        });
268        for worker_node in worker_nodes {
269            let mut row = Row::new();
270            row.add_cell(worker_node.id.into());
271            try_add_cell(
272                &mut row,
273                worker_node
274                    .host
275                    .as_ref()
276                    .map(|h| format!("{}:{}", h.host, h.port)),
277            );
278            try_add_cell(
279                &mut row,
280                worker_node.resource.as_ref().map(|r| r.hostname.clone()),
281            );
282            try_add_cell(
283                &mut row,
284                worker_node.get_type().ok().map(|t| t.as_str_name()),
285            );
286            try_add_cell(
287                &mut row,
288                worker_node.get_state().ok().map(|s| s.as_str_name()),
289            );
290            try_add_cell(&mut row, worker_node.parallelism());
291            try_add_cell(
292                &mut row,
293                worker_node
294                    .property
295                    .as_ref()
296                    .map(|p| p.resource_group.clone().unwrap_or("".to_owned())),
297            );
298            // is_streaming and is_serving are only meaningful for ComputeNode
299            let (is_streaming, is_serving) = {
300                if let Ok(t) = worker_node.get_type()
301                    && t == WorkerType::ComputeNode
302                {
303                    (
304                        worker_node.property.as_ref().map(|p| p.is_streaming),
305                        worker_node.property.as_ref().map(|p| p.is_serving),
306                    )
307                } else {
308                    (None, None)
309                }
310            };
311            try_add_cell(&mut row, is_streaming);
312            try_add_cell(&mut row, is_serving);
313            // is_iceberg_compactor is only meaningful for Compactor worker type
314            let is_iceberg_compactor = {
315                if let Ok(t) = worker_node.get_type()
316                    && t == WorkerType::Compactor
317                {
318                    worker_node
319                        .property
320                        .as_ref()
321                        .map(|p| p.is_iceberg_compactor)
322                } else {
323                    None
324                }
325            };
326            try_add_cell(&mut row, is_iceberg_compactor);
327            try_add_cell(
328                &mut row,
329                worker_node.resource.as_ref().map(|r| r.rw_version.clone()),
330            );
331            try_add_cell(
332                &mut row,
333                worker_node.resource.as_ref().map(|r| r.total_memory_bytes),
334            );
335            try_add_cell(
336                &mut row,
337                worker_node.resource.as_ref().map(|r| r.total_cpu_cores),
338            );
339            try_add_cell(
340                &mut row,
341                worker_node
342                    .started_at
343                    .and_then(|ts| Timestamptz::from_secs(ts as _).map(|t| t.to_string())),
344            );
345            let actor_count = {
346                if let Ok(t) = worker_node.get_type()
347                    && t != WorkerType::ComputeNode
348                {
349                    None
350                } else {
351                    match worker_actor_count.get(&worker_node.id) {
352                        None => Some(0),
353                        Some(c) => Some(*c),
354                    }
355                }
356            };
357            try_add_cell(&mut row, actor_count);
358            table.add_row(row);
359        }
360        let _ = writeln!(s, "{table}");
361    }
362
363    fn write_event_logs(&self, s: &mut String) {
364        let event_logs = self
365            .event_log_manager
366            .list_event_logs()
367            .into_iter()
368            .sorted_by(|a, b| {
369                a.timestamp
370                    .unwrap_or(0)
371                    .cmp(&b.timestamp.unwrap_or(0))
372                    .reverse()
373            })
374            .collect_vec();
375
376        let _ = writeln!(s, "latest barrier completions");
377        Self::write_event_logs_impl(
378            s,
379            event_logs.iter(),
380            |e| {
381                let Event::BarrierComplete(info) = e else {
382                    return None;
383                };
384                Some(json!(info).to_string())
385            },
386            Some(10),
387        );
388
389        let _ = writeln!(s);
390        let _ = writeln!(s, "latest barrier collection failures");
391        Self::write_event_logs_impl(
392            s,
393            event_logs.iter(),
394            |e| {
395                let Event::CollectBarrierFail(info) = e else {
396                    return None;
397                };
398                Some(json!(info).to_string())
399            },
400            Some(3),
401        );
402
403        let _ = writeln!(s);
404        let _ = writeln!(s, "latest barrier injection failures");
405        Self::write_event_logs_impl(
406            s,
407            event_logs.iter(),
408            |e| {
409                let Event::InjectBarrierFail(info) = e else {
410                    return None;
411                };
412                Some(json!(info).to_string())
413            },
414            Some(3),
415        );
416
417        let _ = writeln!(s);
418        let _ = writeln!(s, "latest worker node panics");
419        Self::write_event_logs_impl(
420            s,
421            event_logs.iter(),
422            |e| {
423                let Event::WorkerNodePanic(info) = e else {
424                    return None;
425                };
426                Some(json!(info).to_string())
427            },
428            Some(10),
429        );
430
431        let _ = writeln!(s);
432        let _ = writeln!(s, "latest create stream job failures");
433        Self::write_event_logs_impl(
434            s,
435            event_logs.iter(),
436            |e| {
437                let Event::CreateStreamJobFail(info) = e else {
438                    return None;
439                };
440                Some(json!(info).to_string())
441            },
442            Some(3),
443        );
444
445        let _ = writeln!(s);
446        let _ = writeln!(s, "latest dirty stream job clear-ups");
447        Self::write_event_logs_impl(
448            s,
449            event_logs.iter(),
450            |e| {
451                let Event::DirtyStreamJobClear(info) = e else {
452                    return None;
453                };
454                Some(json!(info).to_string())
455            },
456            Some(3),
457        );
458    }
459
460    fn write_event_logs_impl<'a, F>(
461        s: &mut String,
462        event_logs: impl Iterator<Item = &'a EventLog>,
463        get_event_info: F,
464        limit: Option<usize>,
465    ) where
466        F: Fn(&Event) -> Option<String>,
467    {
468        use comfy_table::{Row, Table};
469        let mut table = Table::new();
470        table.set_header({
471            let mut row = Row::new();
472            row.add_cell("created_at".into());
473            row.add_cell("info".into());
474            row
475        });
476        let mut row_count = 0;
477        for event_log in event_logs {
478            let Some(ref inner) = event_log.event else {
479                continue;
480            };
481            if let Some(limit) = limit
482                && row_count >= limit
483            {
484                break;
485            }
486            let mut row = Row::new();
487            let ts = event_log
488                .timestamp
489                .and_then(|ts| Timestamptz::from_millis(ts as _).map(|ts| ts.to_string()));
490            try_add_cell(&mut row, ts);
491            if let Some(info) = get_event_info(inner) {
492                row.add_cell(info.into());
493                row_count += 1;
494            } else {
495                continue;
496            }
497            table.add_row(row);
498        }
499        let _ = writeln!(s, "{table}");
500    }
501
502    async fn write_storage(&self, s: &mut String) {
503        let mut sst_num = 0;
504        let mut sst_total_file_size = 0;
505        let back_pressured_compaction_groups = self
506            .hummock_manger
507            .write_limits()
508            .await
509            .into_iter()
510            .filter_map(|(k, v)| {
511                if v.table_ids.is_empty() {
512                    None
513                } else {
514                    Some(k)
515                }
516            })
517            .join(",");
518        if !back_pressured_compaction_groups.is_empty() {
519            let _ = writeln!(
520                s,
521                "back pressured compaction groups: {back_pressured_compaction_groups}"
522            );
523        }
524
525        #[derive(PartialEq, Eq)]
526        struct SstableSort {
527            compaction_group_id: CompactionGroupId,
528            sst_id: HummockSstableId,
529            delete_ratio: u64,
530        }
531        impl PartialOrd for SstableSort {
532            fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
533                Some(self.cmp(other))
534            }
535        }
536        impl Ord for SstableSort {
537            fn cmp(&self, other: &Self) -> Ordering {
538                self.delete_ratio.cmp(&other.delete_ratio)
539            }
540        }
541        fn top_k_sstables(
542            top_k: usize,
543            heap: &mut BinaryHeap<Reverse<SstableSort>>,
544            e: SstableSort,
545        ) {
546            if heap.len() < top_k {
547                heap.push(Reverse(e));
548            } else if let Some(mut p) = heap.peek_mut()
549                && e.delete_ratio > p.0.delete_ratio
550            {
551                *p = Reverse(e);
552            }
553        }
554
555        let top_k = 10;
556        let mut top_tombstone_delete_sst = BinaryHeap::with_capacity(top_k);
557        let compaction_group_num = self
558            .hummock_manger
559            .on_current_version(|version| {
560                for compaction_group in version.levels.values() {
561                    let mut visit_level = |level: &Level| {
562                        sst_num += level.table_infos.len();
563                        sst_total_file_size +=
564                            level.table_infos.iter().map(|t| t.sst_size).sum::<u64>();
565                        for sst in &level.table_infos {
566                            if sst.total_key_count == 0 {
567                                continue;
568                            }
569                            let tombstone_delete_ratio =
570                                sst.stale_key_count * 10000 / sst.total_key_count;
571                            let e = SstableSort {
572                                compaction_group_id: compaction_group.group_id,
573                                sst_id: sst.sst_id,
574                                delete_ratio: tombstone_delete_ratio,
575                            };
576                            top_k_sstables(top_k, &mut top_tombstone_delete_sst, e);
577                        }
578                    };
579                    let l0 = &compaction_group.l0;
580                    // FIXME: why chaining levels iter leads to segmentation fault?
581                    for level in &l0.sub_levels {
582                        visit_level(level);
583                    }
584                    for level in &compaction_group.levels {
585                        visit_level(level);
586                    }
587                }
588                version.levels.len()
589            })
590            .await;
591
592        let _ = writeln!(s, "number of SSTables: {sst_num}");
593        let _ = writeln!(s, "total size of SSTables (byte): {sst_total_file_size}");
594        let _ = writeln!(s, "number of compaction groups: {compaction_group_num}");
595        use comfy_table::{Row, Table};
596        fn format_table(heap: BinaryHeap<Reverse<SstableSort>>) -> Table {
597            let mut table = Table::new();
598            table.set_header({
599                let mut row = Row::new();
600                row.add_cell("compaction group id".into());
601                row.add_cell("sstable id".into());
602                row.add_cell("delete ratio".into());
603                row
604            });
605            for sst in &heap.into_sorted_vec() {
606                let mut row = Row::new();
607                row.add_cell(sst.0.compaction_group_id.into());
608                row.add_cell(sst.0.sst_id.into());
609                row.add_cell(format!("{:.2}%", sst.0.delete_ratio as f64 / 100.0).into());
610                table.add_row(row);
611            }
612            table
613        }
614        let _ = writeln!(s);
615        let _ = writeln!(s, "top tombstone delete ratio");
616        let _ = writeln!(s, "{}", format_table(top_tombstone_delete_sst));
617        let _ = writeln!(s);
618
619        let _ = writeln!(s);
620        self.write_storage_prometheus(s).await;
621    }
622
623    async fn write_iceberg_compaction_schedule(&self, s: &mut String) {
624        use comfy_table::{Row, Table};
625
626        let sink_names = match self.metadata_manager.catalog_controller.list_sinks().await {
627            Ok(sinks) => sinks
628                .into_iter()
629                .map(|sink| (sink.id.as_raw_id(), sink.name))
630                .collect::<BTreeMap<u32, String>>(),
631            Err(err) => {
632                tracing::warn!(error=?err.as_report(), "failed to list sinks");
633                return;
634            }
635        };
636
637        let statuses = self.iceberg_compaction_manager.list_compaction_statuses();
638
639        let _ = writeln!(s, "ICEBERG COMPACTION SCHEDULE");
640
641        let mut table = Table::new();
642        table.set_header({
643            let mut row = Row::new();
644            row.add_cell("sink_id".into());
645            row.add_cell("sink_name".into());
646            row.add_cell("task_type".into());
647            row.add_cell("schedule_state".into());
648            row.add_cell("trigger_interval_sec".into());
649            row.add_cell("trigger_snapshot_count".into());
650            row.add_cell("pending_snapshot_count".into());
651            row.add_cell("next_compaction_after_sec".into());
652            row.add_cell("is_triggerable".into());
653            row
654        });
655
656        for status in statuses {
657            let mut row = Row::new();
658            row.add_cell(status.sink_id.as_raw_id().into());
659            try_add_cell(
660                &mut row,
661                sink_names.get(&status.sink_id.as_raw_id()).cloned(),
662            );
663            row.add_cell(status.task_type.into());
664            row.add_cell(status.schedule_state.into());
665            row.add_cell(status.trigger_interval_sec.into());
666            row.add_cell((status.trigger_snapshot_count as u64).into());
667            try_add_cell(
668                &mut row,
669                status.pending_snapshot_count.map(|count| count as u64),
670            );
671            try_add_cell(&mut row, status.next_compaction_after_sec);
672            row.add_cell(status.is_triggerable.into());
673            table.add_row(row);
674        }
675
676        let _ = writeln!(s, "{table}");
677    }
678
679    async fn write_streaming_prometheus(&self, s: &mut String) {
680        let _ = writeln!(s, "top sources by throughput (rows/s)");
681        let query = format!(
682            "topk(3, sum(rate(stream_source_output_rows_counts{{{}}}[10m]))by (source_name))",
683            self.prometheus_selector
684        );
685        self.write_instant_vector_impl(s, &query, vec!["source_name"])
686            .await;
687
688        let _ = writeln!(s);
689        let _ = writeln!(s, "top materialized views by throughput (rows/s)");
690        let query = format!(
691            "topk(3, sum(rate(stream_mview_input_row_count{{{}}}[10m]))by (table_id))",
692            self.prometheus_selector
693        );
694        self.write_instant_vector_impl(s, &query, vec!["table_id"])
695            .await;
696
697        let _ = writeln!(s);
698        let _ = writeln!(s, "top join executor by matched rows");
699        let query = format!(
700            "topk(3, histogram_quantile(0.9, sum(rate(stream_join_matched_join_keys_bucket{{{}}}[10m])) by (le, fragment_id, table_id)))",
701            self.prometheus_selector
702        );
703        self.write_instant_vector_impl(s, &query, vec!["table_id", "fragment_id"])
704            .await;
705    }
706
707    async fn write_storage_prometheus(&self, s: &mut String) {
708        let _ = writeln!(s, "top Hummock Get by duration (second)");
709        let query = format!(
710            "topk(3, histogram_quantile(0.9, sum(rate(state_store_get_duration_bucket{{{}}}[10m])) by (le, table_id)))",
711            self.prometheus_selector
712        );
713        self.write_instant_vector_impl(s, &query, vec!["table_id"])
714            .await;
715
716        let _ = writeln!(s);
717        let _ = writeln!(s, "top Hummock Iter Init by duration (second)");
718        let query = format!(
719            "topk(3, histogram_quantile(0.9, sum(rate(state_store_iter_init_duration_bucket{{{}}}[10m])) by (le, table_id)))",
720            self.prometheus_selector
721        );
722        self.write_instant_vector_impl(s, &query, vec!["table_id"])
723            .await;
724
725        let _ = writeln!(s);
726        let _ = writeln!(s, "top table commit flush by size (byte)");
727        let query = format!(
728            "topk(3, sum(rate(storage_commit_write_throughput{{{}}}[10m])) by (table_id))",
729            self.prometheus_selector
730        );
731        self.write_instant_vector_impl(s, &query, vec!["table_id"])
732            .await;
733
734        let _ = writeln!(s);
735        let _ = writeln!(s, "object store read throughput (bytes/s)");
736        let query = format!(
737            "sum(rate(object_store_read_bytes{{{}}}[10m])) by (job, instance)",
738            merge_prometheus_selector([&self.prometheus_selector, "job=~\"compute|compactor\""])
739        );
740        self.write_instant_vector_impl(s, &query, vec!["job", "instance"])
741            .await;
742
743        let _ = writeln!(s);
744        let _ = writeln!(s, "object store write throughput (bytes/s)");
745        let query = format!(
746            "sum(rate(object_store_write_bytes{{{}}}[10m])) by (job, instance)",
747            merge_prometheus_selector([&self.prometheus_selector, "job=~\"compute|compactor\""])
748        );
749        self.write_instant_vector_impl(s, &query, vec!["job", "instance"])
750            .await;
751
752        let _ = writeln!(s);
753        let _ = writeln!(s, "object store operation rate");
754        let query = format!(
755            "sum(rate(object_store_operation_latency_count{{{}}}[10m])) by (le, type, job, instance)",
756            merge_prometheus_selector([
757                &self.prometheus_selector,
758                "job=~\"compute|compactor\", type!~\"streaming_upload_write_bytes|streaming_read_read_bytes|streaming_read\""
759            ])
760        );
761        self.write_instant_vector_impl(s, &query, vec!["type", "job", "instance"])
762            .await;
763
764        let _ = writeln!(s);
765        let _ = writeln!(s, "object store operation duration (second)");
766        let query = format!(
767            "histogram_quantile(0.9, sum(rate(object_store_operation_latency_bucket{{{}}}[10m])) by (le, type, job, instance))",
768            merge_prometheus_selector([
769                &self.prometheus_selector,
770                "job=~\"compute|compactor\", type!~\"streaming_upload_write_bytes|streaming_read\""
771            ])
772        );
773        self.write_instant_vector_impl(s, &query, vec!["type", "job", "instance"])
774            .await;
775    }
776
777    async fn write_instant_vector_impl(&self, s: &mut String, query: &str, labels: Vec<&str>) {
778        let Some(ref client) = self.prometheus_client else {
779            return;
780        };
781        if let Ok(Vector(instant_vec)) = client
782            .query(query)
783            .get()
784            .await
785            .map(|result| result.into_inner().0)
786        {
787            for i in instant_vec {
788                let l = labels
789                    .iter()
790                    .map(|label| {
791                        format!(
792                            "{}={}",
793                            *label,
794                            i.metric()
795                                .get(*label)
796                                .map(|s| s.as_str())
797                                .unwrap_or_default()
798                        )
799                    })
800                    .join(",");
801                let _ = writeln!(s, "{}: {:.3}", l, i.sample().value());
802            }
803        }
804    }
805
806    async fn write_await_tree(&self, s: &mut String, actor_traces_format: ActorTracesFormat) {
807        let all = dump_cluster_await_tree(
808            &self.metadata_manager,
809            &self.await_tree_reg,
810            actor_traces_format,
811        )
812        .await;
813
814        if let Ok(all) = all {
815            write!(s, "{}", all.output()).unwrap();
816        } else {
817            tracing::warn!("failed to dump await tree");
818        }
819    }
820
821    async fn write_table_definition(&self, s: &mut String) -> MetaResult<()> {
822        let sources = self
823            .metadata_manager
824            .catalog_controller
825            .list_sources()
826            .await?
827            .into_iter()
828            .map(|s| {
829                (
830                    s.id.into(),
831                    (
832                        s.name,
833                        s.database_id,
834                        s.schema_id,
835                        s.definition,
836                        s.created_at_epoch,
837                    ),
838                )
839            })
840            .collect::<BTreeMap<_, _>>();
841        let mut user_tables = BTreeMap::new();
842        let mut mvs = BTreeMap::new();
843        let mut indexes = BTreeMap::new();
844        let mut internal_tables = BTreeMap::new();
845        {
846            let grouped = self
847                .metadata_manager
848                .catalog_controller
849                .list_all_state_tables()
850                .await?
851                .into_iter()
852                .chunk_by(|t| t.table_type());
853            for (table_type, tables) in &grouped {
854                let tables = tables.into_iter().map(|t| {
855                    (
856                        t.id.into(),
857                        (
858                            t.name,
859                            t.database_id,
860                            t.schema_id,
861                            t.definition,
862                            t.created_at_epoch,
863                        ),
864                    )
865                });
866                match table_type {
867                    PbTableType::Table => user_tables.extend(tables),
868                    PbTableType::MaterializedView => mvs.extend(tables),
869                    PbTableType::Index | PbTableType::VectorIndex => indexes.extend(tables),
870                    PbTableType::Internal => internal_tables.extend(tables),
871                    PbTableType::Unspecified => {
872                        tracing::error!("unspecified table type: {:?}", tables.collect_vec());
873                    }
874                }
875            }
876        }
877        let sinks = self
878            .metadata_manager
879            .catalog_controller
880            .list_sinks()
881            .await?
882            .into_iter()
883            .map(|s| {
884                (
885                    s.id.into(),
886                    (
887                        s.name,
888                        s.database_id,
889                        s.schema_id,
890                        s.definition,
891                        s.created_at_epoch,
892                    ),
893                )
894            })
895            .collect::<BTreeMap<_, _>>();
896        let views = self
897            .metadata_manager
898            .catalog_controller
899            .list_views()
900            .await?
901            .into_iter()
902            .map(|v| {
903                (
904                    v.id.into(),
905                    (v.name, v.database_id, v.schema_id, v.sql, None),
906                )
907            })
908            .collect::<BTreeMap<_, _>>();
909        let mut streaming_jobs = self
910            .metadata_manager
911            .catalog_controller
912            .list_streaming_job_infos()
913            .await?;
914        streaming_jobs.sort_by_key(|info| (info.obj_type as usize, info.job_id));
915        {
916            use comfy_table::{Row, Table};
917            let mut table = Table::new();
918            table.set_header({
919                let mut row = Row::new();
920                row.add_cell("job_id".into());
921                row.add_cell("name".into());
922                row.add_cell("obj_type".into());
923                row.add_cell("state".into());
924                row.add_cell("parallelism".into());
925                row.add_cell("max_parallelism".into());
926                row.add_cell("resource_group".into());
927                row.add_cell("database_id".into());
928                row.add_cell("schema_id".into());
929                row.add_cell("config_override".into());
930                row
931            });
932            for job in streaming_jobs {
933                let mut row = Row::new();
934                row.add_cell(job.job_id.into());
935                row.add_cell(job.name.into());
936                row.add_cell(job.obj_type.as_str().into());
937                row.add_cell(format_job_status(job.job_status).into());
938                row.add_cell(format_streaming_parallelism(&job.parallelism).into());
939                row.add_cell(job.max_parallelism.into());
940                row.add_cell(job.resource_group.into());
941                row.add_cell(job.database_id.into());
942                row.add_cell(job.schema_id.into());
943                row.add_cell(job.config_override.into());
944                table.add_row(row);
945            }
946            let _ = writeln!(s);
947            let _ = writeln!(s, "STREAMING JOB");
948            let _ = writeln!(s, "{table}");
949        }
950        let catalogs = [
951            ("SOURCE", sources),
952            ("TABLE", user_tables),
953            ("MATERIALIZED VIEW", mvs),
954            ("INDEX", indexes),
955            ("SINK", sinks),
956            ("VIEW", views),
957            ("INTERNAL TABLE", internal_tables),
958        ];
959        let mut obj_id_to_name: HashMap<ObjectId, _> = HashMap::new();
960        for (title, items) in catalogs {
961            use comfy_table::{Row, Table};
962            let mut table = Table::new();
963            table.set_header({
964                let mut row = Row::new();
965                row.add_cell("id".into());
966                row.add_cell("name".into());
967                row.add_cell("database_id".into());
968                row.add_cell("schema_id".into());
969                row.add_cell("created_at".into());
970                row.add_cell("definition".into());
971                row
972            });
973            for (id, (name, database_id, schema_id, definition, created_at_epoch)) in items {
974                obj_id_to_name.insert(id, name.clone());
975                let mut row = Row::new();
976                let may_redact = redact_sql(&definition, self.redact_sql_option_keywords.clone())
977                    .unwrap_or_else(|| "[REDACTED]".into());
978                let created_at = if let Some(created_at_epoch) = created_at_epoch {
979                    format!("{}", Epoch::from(created_at_epoch).as_timestamptz())
980                } else {
981                    "".into()
982                };
983                row.add_cell(id.into());
984                row.add_cell(name.into());
985                row.add_cell(database_id.into());
986                row.add_cell(schema_id.into());
987                row.add_cell(created_at.into());
988                row.add_cell(may_redact.into());
989                table.add_row(row);
990            }
991            let _ = writeln!(s);
992            let _ = writeln!(s, "{title}");
993            let _ = writeln!(s, "{table}");
994        }
995
996        let actors = self
997            .metadata_manager
998            .catalog_controller
999            .list_actor_info()
1000            .await?
1001            .into_iter()
1002            .map(|(actor_id, fragment_id, job_id, schema_id, obj_type)| {
1003                (
1004                    actor_id,
1005                    (
1006                        fragment_id,
1007                        job_id,
1008                        schema_id,
1009                        obj_type,
1010                        obj_id_to_name.get(&job_id).cloned().unwrap_or_default(),
1011                    ),
1012                )
1013            })
1014            .collect::<BTreeMap<_, _>>();
1015
1016        use comfy_table::{Row, Table};
1017        let mut table = Table::new();
1018        table.set_header({
1019            let mut row = Row::new();
1020            row.add_cell("id".into());
1021            row.add_cell("fragment_id".into());
1022            row.add_cell("job_id".into());
1023            row.add_cell("schema_id".into());
1024            row.add_cell("type".into());
1025            row.add_cell("name".into());
1026            row
1027        });
1028        for (actor_id, (fragment_id, job_id, schema_id, ddl_type, name)) in actors {
1029            let mut row = Row::new();
1030            row.add_cell(actor_id.into());
1031            row.add_cell(fragment_id.into());
1032            row.add_cell(job_id.into());
1033            row.add_cell(schema_id.into());
1034            row.add_cell(ddl_type.as_str().into());
1035            row.add_cell(name.into());
1036            table.add_row(row);
1037        }
1038        let _ = writeln!(s);
1039        let _ = writeln!(s, "ACTOR");
1040        let _ = writeln!(s, "{table}");
1041        Ok(())
1042    }
1043
1044    fn write_license(&self, s: &mut String) {
1045        use comfy_table::presets::ASCII_BORDERS_ONLY;
1046        use comfy_table::{ContentArrangement, Row, Table};
1047
1048        let mut table = Table::new();
1049        table.load_preset(ASCII_BORDERS_ONLY);
1050        table.set_content_arrangement(ContentArrangement::Dynamic);
1051        table.set_header({
1052            let mut row = Row::new();
1053            row.add_cell("field".into());
1054            row.add_cell("value".into());
1055            row
1056        });
1057
1058        match LicenseManager::get().license() {
1059            Ok(license) => {
1060                let fmt_option = |value: Option<u64>| match value {
1061                    Some(v) => v.to_string(),
1062                    None => "unlimited".to_owned(),
1063                };
1064
1065                let expires_at = if license.exp == u64::MAX {
1066                    "never".to_owned()
1067                } else {
1068                    let exp_i64 = license.exp as i64;
1069                    chrono::DateTime::<chrono::Utc>::from_timestamp(exp_i64, 0)
1070                        .map(|ts| ts.to_rfc3339())
1071                        .unwrap_or_else(|| format!("invalid ({})", license.exp))
1072                };
1073
1074                let mut row = Row::new();
1075                row.add_cell("status".into());
1076                row.add_cell("valid".into());
1077                table.add_row(row);
1078
1079                let mut row = Row::new();
1080                row.add_cell("tier".into());
1081                row.add_cell(license.tier.name().into());
1082                table.add_row(row);
1083
1084                let mut row = Row::new();
1085                row.add_cell("expires_at".into());
1086                row.add_cell(expires_at.into());
1087                table.add_row(row);
1088
1089                let mut row = Row::new();
1090                row.add_cell("rwu_limit".into());
1091                row.add_cell(fmt_option(license.rwu_limit.map(|v| v.get())).into());
1092                table.add_row(row);
1093
1094                let mut row = Row::new();
1095                row.add_cell("cpu_core_limit".into());
1096                row.add_cell(fmt_option(license.cpu_core_limit()).into());
1097                table.add_row(row);
1098
1099                let mut row = Row::new();
1100                row.add_cell("memory_limit_bytes".into());
1101                row.add_cell(fmt_option(license.memory_limit()).into());
1102                table.add_row(row);
1103
1104                let mut features: Vec<_> = license
1105                    .tier
1106                    .available_features()
1107                    .map(|f| f.name())
1108                    .collect();
1109                features.sort_unstable();
1110                let feature_summary = format_features(&features);
1111
1112                let mut row = Row::new();
1113                row.add_cell("available_features".into());
1114                row.add_cell(feature_summary.into());
1115                table.add_row(row);
1116            }
1117            Err(error) => {
1118                let mut row = Row::new();
1119                row.add_cell("status".into());
1120                row.add_cell("invalid".into());
1121                table.add_row(row);
1122
1123                let mut row = Row::new();
1124                row.add_cell("error".into());
1125                row.add_cell(error.to_report_string().into());
1126                table.add_row(row);
1127            }
1128        }
1129
1130        let _ = writeln!(s, "LICENSE");
1131        let _ = writeln!(s, "{table}");
1132    }
1133
1134    async fn write_params(&self, s: &mut String) {
1135        let params = self.system_params_controller.get_params().await;
1136
1137        use comfy_table::{Row, Table};
1138        let mut table = Table::new();
1139        table.set_header({
1140            let mut row = Row::new();
1141            row.add_cell("key".into());
1142            row.add_cell("value".into());
1143            row
1144        });
1145
1146        let mut row = Row::new();
1147        row.add_cell("barrier_interval_ms".into());
1148        row.add_cell(params.barrier_interval_ms().to_string().into());
1149        table.add_row(row);
1150
1151        let mut row = Row::new();
1152        row.add_cell("checkpoint_frequency".into());
1153        row.add_cell(params.checkpoint_frequency().to_string().into());
1154        table.add_row(row);
1155
1156        let mut row = Row::new();
1157        row.add_cell("state_store".into());
1158        row.add_cell(params.state_store().to_owned().into());
1159        table.add_row(row);
1160
1161        let mut row = Row::new();
1162        row.add_cell("data_directory".into());
1163        row.add_cell(params.data_directory().to_owned().into());
1164        table.add_row(row);
1165
1166        let mut row = Row::new();
1167        row.add_cell("max_concurrent_creating_streaming_jobs".into());
1168        row.add_cell(
1169            params
1170                .max_concurrent_creating_streaming_jobs()
1171                .to_string()
1172                .into(),
1173        );
1174        table.add_row(row);
1175
1176        let mut row = Row::new();
1177        row.add_cell("time_travel_retention_ms".into());
1178        row.add_cell(params.time_travel_retention_ms().to_string().into());
1179        table.add_row(row);
1180
1181        let mut row = Row::new();
1182        row.add_cell("adaptive_parallelism_strategy".into());
1183        row.add_cell(params.adaptive_parallelism_strategy().to_string().into());
1184        table.add_row(row);
1185
1186        let mut row = Row::new();
1187        row.add_cell("per_database_isolation".into());
1188        row.add_cell(params.per_database_isolation().to_string().into());
1189        table.add_row(row);
1190
1191        let _ = writeln!(s, "SYSTEM PARAMS");
1192        let _ = writeln!(s, "{table}");
1193    }
1194}
1195
1196fn try_add_cell<T: Into<comfy_table::Cell>>(row: &mut comfy_table::Row, t: Option<T>) {
1197    match t {
1198        Some(t) => {
1199            row.add_cell(t.into());
1200        }
1201        None => {
1202            row.add_cell("".into());
1203        }
1204    }
1205}
1206
1207fn merge_prometheus_selector<'a>(selectors: impl IntoIterator<Item = &'a str>) -> String {
1208    selectors.into_iter().filter(|s| !s.is_empty()).join(",")
1209}
1210
1211fn redact_sql(sql: &str, keywords: RedactSqlOptionKeywordsRef) -> Option<String> {
1212    match Parser::parse_sql(sql) {
1213        Ok(sqls) => Some(
1214            sqls.into_iter()
1215                .map(|sql| sql.to_redacted_string(keywords.clone()))
1216                .join(";"),
1217        ),
1218        Err(_) => None,
1219    }
1220}
1221
1222fn format_features(features: &[&'static str]) -> String {
1223    if features.is_empty() {
1224        return "(none)".into();
1225    }
1226
1227    const PER_LINE: usize = 6;
1228    features
1229        .chunks(PER_LINE)
1230        .map(|chunk| format!("  {}", chunk.join(", ")))
1231        .collect::<Vec<_>>()
1232        .join("\n")
1233}
1234
1235fn format_job_status(status: JobStatus) -> &'static str {
1236    match status {
1237        JobStatus::Initial => "initial",
1238        JobStatus::Creating => "creating",
1239        JobStatus::Created => "created",
1240    }
1241}
1242
1243fn format_streaming_parallelism(parallelism: &StreamingParallelism) -> String {
1244    match parallelism {
1245        StreamingParallelism::Adaptive => "adaptive".into(),
1246        StreamingParallelism::Fixed(n) => format!("fixed({n})"),
1247        StreamingParallelism::Custom => "custom".into(),
1248    }
1249}