risingwave_meta/manager/
diagnose.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::{Ordering, Reverse};
16use std::collections::{BTreeMap, BinaryHeap, HashMap};
17use std::fmt::Write;
18use std::sync::Arc;
19
20use itertools::Itertools;
21use prometheus_http_query::response::Data::Vector;
22use risingwave_common::id::ObjectId;
23use risingwave_common::system_param::reader::SystemParamsRead;
24use risingwave_common::types::Timestamptz;
25use risingwave_common::util::StackTraceResponseExt;
26use risingwave_common::util::epoch::Epoch;
27use risingwave_hummock_sdk::level::Level;
28use risingwave_hummock_sdk::{CompactionGroupId, HummockSstableId};
29use risingwave_license::LicenseManager;
30use risingwave_meta_model::{JobStatus, StreamingParallelism};
31use risingwave_pb::catalog::table::PbTableType;
32use risingwave_pb::common::WorkerType;
33use risingwave_pb::meta::EventLog;
34use risingwave_pb::meta::event_log::Event;
35use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
36use risingwave_sqlparser::ast::RedactSqlOptionKeywordsRef;
37use risingwave_sqlparser::parser::Parser;
38use serde_json::json;
39use thiserror_ext::AsReport;
40
41use crate::MetaResult;
42use crate::controller::system_param::SystemParamsControllerRef;
43use crate::hummock::HummockManagerRef;
44use crate::manager::MetadataManager;
45use crate::manager::event_log::EventLogManagerRef;
46use crate::manager::iceberg_compaction::IcebergCompactionManagerRef;
47use crate::rpc::await_tree::dump_cluster_await_tree;
48
49pub type DiagnoseCommandRef = Arc<DiagnoseCommand>;
50
51pub struct DiagnoseCommand {
52    metadata_manager: MetadataManager,
53    await_tree_reg: await_tree::Registry,
54    hummock_manger: HummockManagerRef,
55    iceberg_compaction_manager: IcebergCompactionManagerRef,
56    event_log_manager: EventLogManagerRef,
57    prometheus_client: Option<prometheus_http_query::Client>,
58    prometheus_selector: String,
59    redact_sql_option_keywords: RedactSqlOptionKeywordsRef,
60    system_params_controller: SystemParamsControllerRef,
61}
62
63impl DiagnoseCommand {
64    pub fn new(
65        metadata_manager: MetadataManager,
66        await_tree_reg: await_tree::Registry,
67        hummock_manger: HummockManagerRef,
68        iceberg_compaction_manager: IcebergCompactionManagerRef,
69        event_log_manager: EventLogManagerRef,
70        prometheus_client: Option<prometheus_http_query::Client>,
71        prometheus_selector: String,
72        redact_sql_option_keywords: RedactSqlOptionKeywordsRef,
73        system_params_controller: SystemParamsControllerRef,
74    ) -> Self {
75        Self {
76            metadata_manager,
77            await_tree_reg,
78            hummock_manger,
79            iceberg_compaction_manager,
80            event_log_manager,
81            prometheus_client,
82            prometheus_selector,
83            redact_sql_option_keywords,
84            system_params_controller,
85        }
86    }
87
88    pub async fn report(&self, actor_traces_format: ActorTracesFormat) -> String {
89        let mut report = String::new();
90        let _ = writeln!(
91            report,
92            "report created at: {}\nversion: {}",
93            chrono::DateTime::<chrono::offset::Utc>::from(std::time::SystemTime::now()),
94            risingwave_common::current_cluster_version(),
95        );
96        let _ = writeln!(report);
97        self.write_license(&mut report);
98        let _ = writeln!(report);
99        self.write_catalog(&mut report).await;
100        let _ = writeln!(report);
101        self.write_worker_nodes(&mut report).await;
102        let _ = writeln!(report);
103        self.write_streaming_prometheus(&mut report).await;
104        let _ = writeln!(report);
105        self.write_storage(&mut report).await;
106        let _ = writeln!(report);
107        self.write_iceberg_compaction_schedule(&mut report).await;
108        let _ = writeln!(report);
109        self.write_event_logs(&mut report);
110        let _ = writeln!(report);
111        self.write_params(&mut report).await;
112        let _ = writeln!(report);
113        self.write_await_tree(&mut report, actor_traces_format)
114            .await;
115
116        report
117    }
118
119    async fn write_catalog(&self, s: &mut String) {
120        self.write_catalog_inner(s).await;
121        let _ = self.write_table_definition(s).await.inspect_err(|e| {
122            tracing::warn!(
123                error = e.to_report_string(),
124                "failed to display table definition"
125            )
126        });
127    }
128
129    async fn write_catalog_inner(&self, s: &mut String) {
130        let stats = self.metadata_manager.catalog_controller.stats().await;
131
132        let stat = match stats {
133            Ok(stat) => stat,
134            Err(err) => {
135                tracing::warn!(error=?err.as_report(), "failed to get catalog stats");
136                return;
137            }
138        };
139        let _ = writeln!(s, "number of database: {}", stat.database_num);
140        let _ = writeln!(s, "number of fragment: {}", stat.streaming_job_num);
141        let _ = writeln!(s, "number of actor: {}", stat.actor_num);
142        let _ = writeln!(s, "number of source: {}", stat.source_num);
143        let _ = writeln!(s, "number of table: {}", stat.table_num);
144        let _ = writeln!(s, "number of materialized view: {}", stat.mview_num);
145        let _ = writeln!(s, "number of sink: {}", stat.sink_num);
146        let _ = writeln!(s, "number of index: {}", stat.index_num);
147        let _ = writeln!(s, "number of function: {}", stat.function_num);
148
149        self.write_databases(s).await;
150        self.write_schemas(s).await;
151    }
152
153    async fn write_databases(&self, s: &mut String) {
154        let databases = match self
155            .metadata_manager
156            .catalog_controller
157            .list_databases()
158            .await
159        {
160            Ok(databases) => databases,
161            Err(err) => {
162                tracing::warn!(error=?err.as_report(), "failed to list databases");
163                return;
164            }
165        };
166
167        use comfy_table::{Row, Table};
168        let mut table = Table::new();
169        table.set_header({
170            let mut row = Row::new();
171            row.add_cell("id".into());
172            row.add_cell("name".into());
173            row.add_cell("resource_group".into());
174            row.add_cell("barrier_interval_ms".into());
175            row.add_cell("checkpoint_frequency".into());
176            row
177        });
178        for db in databases {
179            let mut row = Row::new();
180            row.add_cell(db.id.into());
181            row.add_cell(db.name.into());
182            row.add_cell(db.resource_group.into());
183            row.add_cell(
184                db.barrier_interval_ms
185                    .map(|v| v.to_string())
186                    .unwrap_or("default".into())
187                    .into(),
188            );
189            row.add_cell(
190                db.checkpoint_frequency
191                    .map(|v| v.to_string())
192                    .unwrap_or("default".into())
193                    .into(),
194            );
195            table.add_row(row);
196        }
197
198        let _ = writeln!(s, "DATABASE");
199        let _ = writeln!(s, "{table}");
200    }
201
202    async fn write_schemas(&self, s: &mut String) {
203        let schemas = match self
204            .metadata_manager
205            .catalog_controller
206            .list_schemas()
207            .await
208        {
209            Ok(schemas) => schemas,
210            Err(err) => {
211                tracing::warn!(error=?err.as_report(), "failed to list schemas");
212                return;
213            }
214        };
215
216        use comfy_table::{Row, Table};
217        let mut table = Table::new();
218        table.set_header({
219            let mut row = Row::new();
220            row.add_cell("id".into());
221            row.add_cell("database_id".into());
222            row.add_cell("name".into());
223            row
224        });
225        for schema in schemas {
226            let mut row = Row::new();
227            row.add_cell(schema.id.into());
228            row.add_cell(schema.database_id.into());
229            row.add_cell(schema.name.into());
230            table.add_row(row);
231        }
232
233        let _ = writeln!(s, "SCHEMA");
234        let _ = writeln!(s, "{table}");
235    }
236
237    async fn write_worker_nodes(&self, s: &mut String) {
238        let Ok(worker_actor_count) = self.metadata_manager.worker_actor_count() else {
239            tracing::warn!("failed to get worker actor count");
240            return;
241        };
242
243        use comfy_table::{Row, Table};
244        let Ok(worker_nodes) = self.metadata_manager.list_worker_node(None, None).await else {
245            tracing::warn!("failed to get worker nodes");
246            return;
247        };
248        let mut table = Table::new();
249        table.set_header({
250            let mut row = Row::new();
251            row.add_cell("id".into());
252            row.add_cell("host".into());
253            row.add_cell("hostname".into());
254            row.add_cell("type".into());
255            row.add_cell("state".into());
256            row.add_cell("parallelism".into());
257            row.add_cell("resource_group".into());
258            row.add_cell("is_streaming".into());
259            row.add_cell("is_serving".into());
260            row.add_cell("is_iceberg_compactor".into());
261            row.add_cell("rw_version".into());
262            row.add_cell("total_memory_bytes".into());
263            row.add_cell("total_cpu_cores".into());
264            row.add_cell("started_at".into());
265            row.add_cell("actor_count".into());
266            row
267        });
268        for worker_node in worker_nodes {
269            let mut row = Row::new();
270            row.add_cell(worker_node.id.into());
271            try_add_cell(
272                &mut row,
273                worker_node
274                    .host
275                    .as_ref()
276                    .map(|h| format!("{}:{}", h.host, h.port)),
277            );
278            try_add_cell(
279                &mut row,
280                worker_node.resource.as_ref().map(|r| r.hostname.clone()),
281            );
282            try_add_cell(
283                &mut row,
284                worker_node.get_type().ok().map(|t| t.as_str_name()),
285            );
286            try_add_cell(
287                &mut row,
288                worker_node.get_state().ok().map(|s| s.as_str_name()),
289            );
290            try_add_cell(&mut row, worker_node.parallelism());
291            try_add_cell(
292                &mut row,
293                worker_node
294                    .property
295                    .as_ref()
296                    .map(|p| p.resource_group.clone().unwrap_or("".to_owned())),
297            );
298            // is_streaming and is_serving are only meaningful for ComputeNode
299            let (is_streaming, is_serving) = {
300                if let Ok(t) = worker_node.get_type()
301                    && t == WorkerType::ComputeNode
302                {
303                    (
304                        worker_node.property.as_ref().map(|p| p.is_streaming),
305                        worker_node.property.as_ref().map(|p| p.is_serving),
306                    )
307                } else {
308                    (None, None)
309                }
310            };
311            try_add_cell(&mut row, is_streaming);
312            try_add_cell(&mut row, is_serving);
313            // is_iceberg_compactor is only meaningful for Compactor worker type
314            let is_iceberg_compactor = {
315                if let Ok(t) = worker_node.get_type()
316                    && t == WorkerType::Compactor
317                {
318                    worker_node
319                        .property
320                        .as_ref()
321                        .map(|p| p.is_iceberg_compactor)
322                } else {
323                    None
324                }
325            };
326            try_add_cell(&mut row, is_iceberg_compactor);
327            try_add_cell(
328                &mut row,
329                worker_node.resource.as_ref().map(|r| r.rw_version.clone()),
330            );
331            try_add_cell(
332                &mut row,
333                worker_node.resource.as_ref().map(|r| r.total_memory_bytes),
334            );
335            try_add_cell(
336                &mut row,
337                worker_node.resource.as_ref().map(|r| r.total_cpu_cores),
338            );
339            try_add_cell(
340                &mut row,
341                worker_node
342                    .started_at
343                    .and_then(|ts| Timestamptz::from_secs(ts as _).map(|t| t.to_string())),
344            );
345            let actor_count = {
346                if let Ok(t) = worker_node.get_type()
347                    && t != WorkerType::ComputeNode
348                {
349                    None
350                } else {
351                    match worker_actor_count.get(&worker_node.id) {
352                        None => Some(0),
353                        Some(c) => Some(*c),
354                    }
355                }
356            };
357            try_add_cell(&mut row, actor_count);
358            table.add_row(row);
359        }
360        let _ = writeln!(s, "{table}");
361    }
362
363    fn write_event_logs(&self, s: &mut String) {
364        let event_logs = self
365            .event_log_manager
366            .list_event_logs()
367            .into_iter()
368            .sorted_by(|a, b| {
369                a.timestamp
370                    .unwrap_or(0)
371                    .cmp(&b.timestamp.unwrap_or(0))
372                    .reverse()
373            })
374            .collect_vec();
375
376        let _ = writeln!(s, "latest barrier completions");
377        Self::write_event_logs_impl(
378            s,
379            event_logs.iter(),
380            |e| {
381                let Event::BarrierComplete(info) = e else {
382                    return None;
383                };
384                Some(json!(info).to_string())
385            },
386            Some(10),
387        );
388
389        let _ = writeln!(s);
390        let _ = writeln!(s, "latest barrier collection failures");
391        Self::write_event_logs_impl(
392            s,
393            event_logs.iter(),
394            |e| {
395                let Event::CollectBarrierFail(info) = e else {
396                    return None;
397                };
398                Some(json!(info).to_string())
399            },
400            Some(3),
401        );
402
403        let _ = writeln!(s);
404        let _ = writeln!(s, "latest barrier injection failures");
405        Self::write_event_logs_impl(
406            s,
407            event_logs.iter(),
408            |e| {
409                let Event::InjectBarrierFail(info) = e else {
410                    return None;
411                };
412                Some(json!(info).to_string())
413            },
414            Some(3),
415        );
416
417        let _ = writeln!(s);
418        let _ = writeln!(s, "latest worker node panics");
419        Self::write_event_logs_impl(
420            s,
421            event_logs.iter(),
422            |e| {
423                let Event::WorkerNodePanic(info) = e else {
424                    return None;
425                };
426                Some(json!(info).to_string())
427            },
428            Some(10),
429        );
430
431        let _ = writeln!(s);
432        let _ = writeln!(s, "latest create stream job failures");
433        Self::write_event_logs_impl(
434            s,
435            event_logs.iter(),
436            |e| {
437                let Event::CreateStreamJobFail(info) = e else {
438                    return None;
439                };
440                Some(json!(info).to_string())
441            },
442            Some(3),
443        );
444
445        let _ = writeln!(s);
446        let _ = writeln!(s, "latest dirty stream job clear-ups");
447        Self::write_event_logs_impl(
448            s,
449            event_logs.iter(),
450            |e| {
451                let Event::DirtyStreamJobClear(info) = e else {
452                    return None;
453                };
454                Some(json!(info).to_string())
455            },
456            Some(3),
457        );
458    }
459
460    fn write_event_logs_impl<'a, F>(
461        s: &mut String,
462        event_logs: impl Iterator<Item = &'a EventLog>,
463        get_event_info: F,
464        limit: Option<usize>,
465    ) where
466        F: Fn(&Event) -> Option<String>,
467    {
468        use comfy_table::{Row, Table};
469        let mut table = Table::new();
470        table.set_header({
471            let mut row = Row::new();
472            row.add_cell("created_at".into());
473            row.add_cell("info".into());
474            row
475        });
476        let mut row_count = 0;
477        for event_log in event_logs {
478            let Some(ref inner) = event_log.event else {
479                continue;
480            };
481            if let Some(limit) = limit
482                && row_count >= limit
483            {
484                break;
485            }
486            let mut row = Row::new();
487            let ts = event_log
488                .timestamp
489                .and_then(|ts| Timestamptz::from_millis(ts as _).map(|ts| ts.to_string()));
490            try_add_cell(&mut row, ts);
491            if let Some(info) = get_event_info(inner) {
492                row.add_cell(info.into());
493                row_count += 1;
494            } else {
495                continue;
496            }
497            table.add_row(row);
498        }
499        let _ = writeln!(s, "{table}");
500    }
501
502    async fn write_storage(&self, s: &mut String) {
503        let mut sst_num = 0;
504        let mut sst_total_file_size = 0;
505        let back_pressured_compaction_groups = self
506            .hummock_manger
507            .write_limits()
508            .await
509            .into_iter()
510            .filter_map(|(k, v)| {
511                if v.table_ids.is_empty() {
512                    None
513                } else {
514                    Some(k)
515                }
516            })
517            .join(",");
518        if !back_pressured_compaction_groups.is_empty() {
519            let _ = writeln!(
520                s,
521                "back pressured compaction groups: {back_pressured_compaction_groups}"
522            );
523        }
524
525        #[derive(PartialEq, Eq)]
526        struct SstableSort {
527            compaction_group_id: CompactionGroupId,
528            sst_id: HummockSstableId,
529            delete_ratio: u64,
530        }
531        impl PartialOrd for SstableSort {
532            fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
533                Some(self.cmp(other))
534            }
535        }
536        impl Ord for SstableSort {
537            fn cmp(&self, other: &Self) -> Ordering {
538                self.delete_ratio.cmp(&other.delete_ratio)
539            }
540        }
541        fn top_k_sstables(
542            top_k: usize,
543            heap: &mut BinaryHeap<Reverse<SstableSort>>,
544            e: SstableSort,
545        ) {
546            if heap.len() < top_k {
547                heap.push(Reverse(e));
548            } else if let Some(mut p) = heap.peek_mut()
549                && e.delete_ratio > p.0.delete_ratio
550            {
551                *p = Reverse(e);
552            }
553        }
554
555        let top_k = 10;
556        let mut top_tombstone_delete_sst = BinaryHeap::with_capacity(top_k);
557        let compaction_group_num = self
558            .hummock_manger
559            .on_current_version(|version| {
560                for compaction_group in version.levels.values() {
561                    let mut visit_level = |level: &Level| {
562                        sst_num += level.table_infos.len();
563                        sst_total_file_size +=
564                            level.table_infos.iter().map(|t| t.sst_size).sum::<u64>();
565                        for sst in &level.table_infos {
566                            if sst.total_key_count == 0 {
567                                continue;
568                            }
569                            let tombstone_delete_ratio =
570                                sst.stale_key_count * 10000 / sst.total_key_count;
571                            let e = SstableSort {
572                                compaction_group_id: compaction_group.group_id,
573                                sst_id: sst.sst_id,
574                                delete_ratio: tombstone_delete_ratio,
575                            };
576                            top_k_sstables(top_k, &mut top_tombstone_delete_sst, e);
577                        }
578                    };
579                    let l0 = &compaction_group.l0;
580                    // FIXME: why chaining levels iter leads to segmentation fault?
581                    for level in &l0.sub_levels {
582                        visit_level(level);
583                    }
584                    for level in &compaction_group.levels {
585                        visit_level(level);
586                    }
587                }
588                version.levels.len()
589            })
590            .await;
591
592        let _ = writeln!(s, "number of SSTables: {sst_num}");
593        let _ = writeln!(s, "total size of SSTables (byte): {sst_total_file_size}");
594        let _ = writeln!(s, "number of compaction groups: {compaction_group_num}");
595        use comfy_table::{Row, Table};
596        fn format_table(heap: BinaryHeap<Reverse<SstableSort>>) -> Table {
597            let mut table = Table::new();
598            table.set_header({
599                let mut row = Row::new();
600                row.add_cell("compaction group id".into());
601                row.add_cell("sstable id".into());
602                row.add_cell("delete ratio".into());
603                row
604            });
605            for sst in &heap.into_sorted_vec() {
606                let mut row = Row::new();
607                row.add_cell(sst.0.compaction_group_id.into());
608                row.add_cell(sst.0.sst_id.into());
609                row.add_cell(format!("{:.2}%", sst.0.delete_ratio as f64 / 100.0).into());
610                table.add_row(row);
611            }
612            table
613        }
614        let _ = writeln!(s);
615        let _ = writeln!(s, "top tombstone delete ratio");
616        let _ = writeln!(s, "{}", format_table(top_tombstone_delete_sst));
617        let _ = writeln!(s);
618
619        let _ = writeln!(s);
620        self.write_storage_prometheus(s).await;
621    }
622
623    async fn write_iceberg_compaction_schedule(&self, s: &mut String) {
624        use comfy_table::{Row, Table};
625
626        let sink_names = match self.metadata_manager.catalog_controller.list_sinks().await {
627            Ok(sinks) => sinks
628                .into_iter()
629                .map(|sink| (sink.id.as_raw_id(), sink.name))
630                .collect::<BTreeMap<u32, String>>(),
631            Err(err) => {
632                tracing::warn!(error=?err.as_report(), "failed to list sinks");
633                return;
634            }
635        };
636
637        let statuses = self
638            .iceberg_compaction_manager
639            .list_compaction_statuses()
640            .await;
641
642        let _ = writeln!(s, "ICEBERG COMPACTION SCHEDULE");
643
644        let mut table = Table::new();
645        table.set_header({
646            let mut row = Row::new();
647            row.add_cell("sink_id".into());
648            row.add_cell("sink_name".into());
649            row.add_cell("task_type".into());
650            row.add_cell("schedule_state".into());
651            row.add_cell("trigger_interval_sec".into());
652            row.add_cell("trigger_snapshot_count".into());
653            row.add_cell("pending_snapshot_count".into());
654            row.add_cell("next_compaction_after_sec".into());
655            row.add_cell("is_triggerable".into());
656            row
657        });
658
659        for status in statuses {
660            let mut row = Row::new();
661            row.add_cell(status.sink_id.as_raw_id().into());
662            try_add_cell(
663                &mut row,
664                sink_names.get(&status.sink_id.as_raw_id()).cloned(),
665            );
666            row.add_cell(status.task_type.into());
667            row.add_cell(status.schedule_state.into());
668            row.add_cell(status.trigger_interval_sec.into());
669            row.add_cell((status.trigger_snapshot_count as u64).into());
670            try_add_cell(
671                &mut row,
672                status.pending_snapshot_count.map(|count| count as u64),
673            );
674            try_add_cell(&mut row, status.next_compaction_after_sec);
675            row.add_cell(status.is_triggerable.into());
676            table.add_row(row);
677        }
678
679        let _ = writeln!(s, "{table}");
680    }
681
682    async fn write_streaming_prometheus(&self, s: &mut String) {
683        let _ = writeln!(s, "top sources by throughput (rows/s)");
684        let query = format!(
685            "topk(3, sum(rate(stream_source_output_rows_counts{{{}}}[10m]))by (source_name))",
686            self.prometheus_selector
687        );
688        self.write_instant_vector_impl(s, &query, vec!["source_name"])
689            .await;
690
691        let _ = writeln!(s);
692        let _ = writeln!(s, "top materialized views by throughput (rows/s)");
693        let query = format!(
694            "topk(3, sum(rate(stream_mview_input_row_count{{{}}}[10m]))by (table_id))",
695            self.prometheus_selector
696        );
697        self.write_instant_vector_impl(s, &query, vec!["table_id"])
698            .await;
699
700        let _ = writeln!(s);
701        let _ = writeln!(s, "top join executor by matched rows");
702        let query = format!(
703            "topk(3, histogram_quantile(0.9, sum(rate(stream_join_matched_join_keys_bucket{{{}}}[10m])) by (le, fragment_id, table_id)))",
704            self.prometheus_selector
705        );
706        self.write_instant_vector_impl(s, &query, vec!["table_id", "fragment_id"])
707            .await;
708    }
709
710    async fn write_storage_prometheus(&self, s: &mut String) {
711        let _ = writeln!(s, "top Hummock Get by duration (second)");
712        let query = format!(
713            "topk(3, histogram_quantile(0.9, sum(rate(state_store_get_duration_bucket{{{}}}[10m])) by (le, table_id)))",
714            self.prometheus_selector
715        );
716        self.write_instant_vector_impl(s, &query, vec!["table_id"])
717            .await;
718
719        let _ = writeln!(s);
720        let _ = writeln!(s, "top Hummock Iter Init by duration (second)");
721        let query = format!(
722            "topk(3, histogram_quantile(0.9, sum(rate(state_store_iter_init_duration_bucket{{{}}}[10m])) by (le, table_id)))",
723            self.prometheus_selector
724        );
725        self.write_instant_vector_impl(s, &query, vec!["table_id"])
726            .await;
727
728        let _ = writeln!(s);
729        let _ = writeln!(s, "top table commit flush by size (byte)");
730        let query = format!(
731            "topk(3, sum(rate(storage_commit_write_throughput{{{}}}[10m])) by (table_id))",
732            self.prometheus_selector
733        );
734        self.write_instant_vector_impl(s, &query, vec!["table_id"])
735            .await;
736
737        let _ = writeln!(s);
738        let _ = writeln!(s, "object store read throughput (bytes/s)");
739        let query = format!(
740            "sum(rate(object_store_read_bytes{{{}}}[10m])) by (job, instance)",
741            merge_prometheus_selector([&self.prometheus_selector, "job=~\"compute|compactor\""])
742        );
743        self.write_instant_vector_impl(s, &query, vec!["job", "instance"])
744            .await;
745
746        let _ = writeln!(s);
747        let _ = writeln!(s, "object store write throughput (bytes/s)");
748        let query = format!(
749            "sum(rate(object_store_write_bytes{{{}}}[10m])) by (job, instance)",
750            merge_prometheus_selector([&self.prometheus_selector, "job=~\"compute|compactor\""])
751        );
752        self.write_instant_vector_impl(s, &query, vec!["job", "instance"])
753            .await;
754
755        let _ = writeln!(s);
756        let _ = writeln!(s, "object store operation rate");
757        let query = format!(
758            "sum(rate(object_store_operation_latency_count{{{}}}[10m])) by (le, type, job, instance)",
759            merge_prometheus_selector([
760                &self.prometheus_selector,
761                "job=~\"compute|compactor\", type!~\"streaming_upload_write_bytes|streaming_read_read_bytes|streaming_read\""
762            ])
763        );
764        self.write_instant_vector_impl(s, &query, vec!["type", "job", "instance"])
765            .await;
766
767        let _ = writeln!(s);
768        let _ = writeln!(s, "object store operation duration (second)");
769        let query = format!(
770            "histogram_quantile(0.9, sum(rate(object_store_operation_latency_bucket{{{}}}[10m])) by (le, type, job, instance))",
771            merge_prometheus_selector([
772                &self.prometheus_selector,
773                "job=~\"compute|compactor\", type!~\"streaming_upload_write_bytes|streaming_read\""
774            ])
775        );
776        self.write_instant_vector_impl(s, &query, vec!["type", "job", "instance"])
777            .await;
778    }
779
780    async fn write_instant_vector_impl(&self, s: &mut String, query: &str, labels: Vec<&str>) {
781        let Some(ref client) = self.prometheus_client else {
782            return;
783        };
784        if let Ok(Vector(instant_vec)) = client
785            .query(query)
786            .get()
787            .await
788            .map(|result| result.into_inner().0)
789        {
790            for i in instant_vec {
791                let l = labels
792                    .iter()
793                    .map(|label| {
794                        format!(
795                            "{}={}",
796                            *label,
797                            i.metric()
798                                .get(*label)
799                                .map(|s| s.as_str())
800                                .unwrap_or_default()
801                        )
802                    })
803                    .join(",");
804                let _ = writeln!(s, "{}: {:.3}", l, i.sample().value());
805            }
806        }
807    }
808
809    async fn write_await_tree(&self, s: &mut String, actor_traces_format: ActorTracesFormat) {
810        let all = dump_cluster_await_tree(
811            &self.metadata_manager,
812            &self.await_tree_reg,
813            actor_traces_format,
814        )
815        .await;
816
817        if let Ok(all) = all {
818            write!(s, "{}", all.output()).unwrap();
819        } else {
820            tracing::warn!("failed to dump await tree");
821        }
822    }
823
824    async fn write_table_definition(&self, s: &mut String) -> MetaResult<()> {
825        let sources = self
826            .metadata_manager
827            .catalog_controller
828            .list_sources()
829            .await?
830            .into_iter()
831            .map(|s| {
832                (
833                    s.id.into(),
834                    (
835                        s.name,
836                        s.database_id,
837                        s.schema_id,
838                        s.definition,
839                        s.created_at_epoch,
840                    ),
841                )
842            })
843            .collect::<BTreeMap<_, _>>();
844        let mut user_tables = BTreeMap::new();
845        let mut mvs = BTreeMap::new();
846        let mut indexes = BTreeMap::new();
847        let mut internal_tables = BTreeMap::new();
848        {
849            let grouped = self
850                .metadata_manager
851                .catalog_controller
852                .list_all_state_tables()
853                .await?
854                .into_iter()
855                .chunk_by(|t| t.table_type());
856            for (table_type, tables) in &grouped {
857                let tables = tables.into_iter().map(|t| {
858                    (
859                        t.id.into(),
860                        (
861                            t.name,
862                            t.database_id,
863                            t.schema_id,
864                            t.definition,
865                            t.created_at_epoch,
866                        ),
867                    )
868                });
869                match table_type {
870                    PbTableType::Table => user_tables.extend(tables),
871                    PbTableType::MaterializedView => mvs.extend(tables),
872                    PbTableType::Index | PbTableType::VectorIndex => indexes.extend(tables),
873                    PbTableType::Internal => internal_tables.extend(tables),
874                    PbTableType::Unspecified => {
875                        tracing::error!("unspecified table type: {:?}", tables.collect_vec());
876                    }
877                }
878            }
879        }
880        let sinks = self
881            .metadata_manager
882            .catalog_controller
883            .list_sinks()
884            .await?
885            .into_iter()
886            .map(|s| {
887                (
888                    s.id.into(),
889                    (
890                        s.name,
891                        s.database_id,
892                        s.schema_id,
893                        s.definition,
894                        s.created_at_epoch,
895                    ),
896                )
897            })
898            .collect::<BTreeMap<_, _>>();
899        let views = self
900            .metadata_manager
901            .catalog_controller
902            .list_views()
903            .await?
904            .into_iter()
905            .map(|v| {
906                (
907                    v.id.into(),
908                    (v.name, v.database_id, v.schema_id, v.sql, None),
909                )
910            })
911            .collect::<BTreeMap<_, _>>();
912        let mut streaming_jobs = self
913            .metadata_manager
914            .catalog_controller
915            .list_streaming_job_infos()
916            .await?;
917        streaming_jobs.sort_by_key(|info| (info.obj_type as usize, info.job_id));
918        {
919            use comfy_table::{Row, Table};
920            let mut table = Table::new();
921            table.set_header({
922                let mut row = Row::new();
923                row.add_cell("job_id".into());
924                row.add_cell("name".into());
925                row.add_cell("obj_type".into());
926                row.add_cell("state".into());
927                row.add_cell("parallelism".into());
928                row.add_cell("max_parallelism".into());
929                row.add_cell("resource_group".into());
930                row.add_cell("database_id".into());
931                row.add_cell("schema_id".into());
932                row.add_cell("config_override".into());
933                row
934            });
935            for job in streaming_jobs {
936                let mut row = Row::new();
937                row.add_cell(job.job_id.into());
938                row.add_cell(job.name.into());
939                row.add_cell(job.obj_type.as_str().into());
940                row.add_cell(format_job_status(job.job_status).into());
941                row.add_cell(format_streaming_parallelism(&job.parallelism).into());
942                row.add_cell(job.max_parallelism.into());
943                row.add_cell(job.resource_group.into());
944                row.add_cell(job.database_id.into());
945                row.add_cell(job.schema_id.into());
946                row.add_cell(job.config_override.into());
947                table.add_row(row);
948            }
949            let _ = writeln!(s);
950            let _ = writeln!(s, "STREAMING JOB");
951            let _ = writeln!(s, "{table}");
952        }
953        let catalogs = [
954            ("SOURCE", sources),
955            ("TABLE", user_tables),
956            ("MATERIALIZED VIEW", mvs),
957            ("INDEX", indexes),
958            ("SINK", sinks),
959            ("VIEW", views),
960            ("INTERNAL TABLE", internal_tables),
961        ];
962        let mut obj_id_to_name: HashMap<ObjectId, _> = HashMap::new();
963        for (title, items) in catalogs {
964            use comfy_table::{Row, Table};
965            let mut table = Table::new();
966            table.set_header({
967                let mut row = Row::new();
968                row.add_cell("id".into());
969                row.add_cell("name".into());
970                row.add_cell("database_id".into());
971                row.add_cell("schema_id".into());
972                row.add_cell("created_at".into());
973                row.add_cell("definition".into());
974                row
975            });
976            for (id, (name, database_id, schema_id, definition, created_at_epoch)) in items {
977                obj_id_to_name.insert(id, name.clone());
978                let mut row = Row::new();
979                let may_redact = redact_sql(&definition, self.redact_sql_option_keywords.clone())
980                    .unwrap_or_else(|| "[REDACTED]".into());
981                let created_at = if let Some(created_at_epoch) = created_at_epoch {
982                    format!("{}", Epoch::from(created_at_epoch).as_timestamptz())
983                } else {
984                    "".into()
985                };
986                row.add_cell(id.into());
987                row.add_cell(name.into());
988                row.add_cell(database_id.into());
989                row.add_cell(schema_id.into());
990                row.add_cell(created_at.into());
991                row.add_cell(may_redact.into());
992                table.add_row(row);
993            }
994            let _ = writeln!(s);
995            let _ = writeln!(s, "{title}");
996            let _ = writeln!(s, "{table}");
997        }
998
999        let actors = self
1000            .metadata_manager
1001            .catalog_controller
1002            .list_actor_info()
1003            .await?
1004            .into_iter()
1005            .map(|(actor_id, fragment_id, job_id, schema_id, obj_type)| {
1006                (
1007                    actor_id,
1008                    (
1009                        fragment_id,
1010                        job_id,
1011                        schema_id,
1012                        obj_type,
1013                        obj_id_to_name.get(&job_id).cloned().unwrap_or_default(),
1014                    ),
1015                )
1016            })
1017            .collect::<BTreeMap<_, _>>();
1018
1019        use comfy_table::{Row, Table};
1020        let mut table = Table::new();
1021        table.set_header({
1022            let mut row = Row::new();
1023            row.add_cell("id".into());
1024            row.add_cell("fragment_id".into());
1025            row.add_cell("job_id".into());
1026            row.add_cell("schema_id".into());
1027            row.add_cell("type".into());
1028            row.add_cell("name".into());
1029            row
1030        });
1031        for (actor_id, (fragment_id, job_id, schema_id, ddl_type, name)) in actors {
1032            let mut row = Row::new();
1033            row.add_cell(actor_id.into());
1034            row.add_cell(fragment_id.into());
1035            row.add_cell(job_id.into());
1036            row.add_cell(schema_id.into());
1037            row.add_cell(ddl_type.as_str().into());
1038            row.add_cell(name.into());
1039            table.add_row(row);
1040        }
1041        let _ = writeln!(s);
1042        let _ = writeln!(s, "ACTOR");
1043        let _ = writeln!(s, "{table}");
1044        Ok(())
1045    }
1046
1047    fn write_license(&self, s: &mut String) {
1048        use comfy_table::presets::ASCII_BORDERS_ONLY;
1049        use comfy_table::{ContentArrangement, Row, Table};
1050
1051        let mut table = Table::new();
1052        table.load_preset(ASCII_BORDERS_ONLY);
1053        table.set_content_arrangement(ContentArrangement::Dynamic);
1054        table.set_header({
1055            let mut row = Row::new();
1056            row.add_cell("field".into());
1057            row.add_cell("value".into());
1058            row
1059        });
1060
1061        match LicenseManager::get().license() {
1062            Ok(license) => {
1063                let fmt_option = |value: Option<u64>| match value {
1064                    Some(v) => v.to_string(),
1065                    None => "unlimited".to_owned(),
1066                };
1067
1068                let expires_at = if license.exp == u64::MAX {
1069                    "never".to_owned()
1070                } else {
1071                    let exp_i64 = license.exp as i64;
1072                    chrono::DateTime::<chrono::Utc>::from_timestamp(exp_i64, 0)
1073                        .map(|ts| ts.to_rfc3339())
1074                        .unwrap_or_else(|| format!("invalid ({})", license.exp))
1075                };
1076
1077                let mut row = Row::new();
1078                row.add_cell("status".into());
1079                row.add_cell("valid".into());
1080                table.add_row(row);
1081
1082                let mut row = Row::new();
1083                row.add_cell("tier".into());
1084                row.add_cell(license.tier.name().into());
1085                table.add_row(row);
1086
1087                let mut row = Row::new();
1088                row.add_cell("expires_at".into());
1089                row.add_cell(expires_at.into());
1090                table.add_row(row);
1091
1092                let mut row = Row::new();
1093                row.add_cell("rwu_limit".into());
1094                row.add_cell(fmt_option(license.rwu_limit.map(|v| v.get())).into());
1095                table.add_row(row);
1096
1097                let mut row = Row::new();
1098                row.add_cell("cpu_core_limit".into());
1099                row.add_cell(fmt_option(license.cpu_core_limit()).into());
1100                table.add_row(row);
1101
1102                let mut row = Row::new();
1103                row.add_cell("memory_limit_bytes".into());
1104                row.add_cell(fmt_option(license.memory_limit()).into());
1105                table.add_row(row);
1106
1107                let mut features: Vec<_> = license
1108                    .tier
1109                    .available_features()
1110                    .map(|f| f.name())
1111                    .collect();
1112                features.sort_unstable();
1113                let feature_summary = format_features(&features);
1114
1115                let mut row = Row::new();
1116                row.add_cell("available_features".into());
1117                row.add_cell(feature_summary.into());
1118                table.add_row(row);
1119            }
1120            Err(error) => {
1121                let mut row = Row::new();
1122                row.add_cell("status".into());
1123                row.add_cell("invalid".into());
1124                table.add_row(row);
1125
1126                let mut row = Row::new();
1127                row.add_cell("error".into());
1128                row.add_cell(error.to_report_string().into());
1129                table.add_row(row);
1130            }
1131        }
1132
1133        let _ = writeln!(s, "LICENSE");
1134        let _ = writeln!(s, "{table}");
1135    }
1136
1137    async fn write_params(&self, s: &mut String) {
1138        let params = self.system_params_controller.get_params().await;
1139
1140        use comfy_table::{Row, Table};
1141        let mut table = Table::new();
1142        table.set_header({
1143            let mut row = Row::new();
1144            row.add_cell("key".into());
1145            row.add_cell("value".into());
1146            row
1147        });
1148
1149        let mut row = Row::new();
1150        row.add_cell("barrier_interval_ms".into());
1151        row.add_cell(params.barrier_interval_ms().to_string().into());
1152        table.add_row(row);
1153
1154        let mut row = Row::new();
1155        row.add_cell("checkpoint_frequency".into());
1156        row.add_cell(params.checkpoint_frequency().to_string().into());
1157        table.add_row(row);
1158
1159        let mut row = Row::new();
1160        row.add_cell("state_store".into());
1161        row.add_cell(params.state_store().to_owned().into());
1162        table.add_row(row);
1163
1164        let mut row = Row::new();
1165        row.add_cell("data_directory".into());
1166        row.add_cell(params.data_directory().to_owned().into());
1167        table.add_row(row);
1168
1169        let mut row = Row::new();
1170        row.add_cell("max_concurrent_creating_streaming_jobs".into());
1171        row.add_cell(
1172            params
1173                .max_concurrent_creating_streaming_jobs()
1174                .to_string()
1175                .into(),
1176        );
1177        table.add_row(row);
1178
1179        let mut row = Row::new();
1180        row.add_cell("time_travel_retention_ms".into());
1181        row.add_cell(params.time_travel_retention_ms().to_string().into());
1182        table.add_row(row);
1183
1184        let mut row = Row::new();
1185        row.add_cell("adaptive_parallelism_strategy".into());
1186        row.add_cell(params.adaptive_parallelism_strategy().to_string().into());
1187        table.add_row(row);
1188
1189        let mut row = Row::new();
1190        row.add_cell("per_database_isolation".into());
1191        row.add_cell(params.per_database_isolation().to_string().into());
1192        table.add_row(row);
1193
1194        let _ = writeln!(s, "SYSTEM PARAMS");
1195        let _ = writeln!(s, "{table}");
1196    }
1197}
1198
1199fn try_add_cell<T: Into<comfy_table::Cell>>(row: &mut comfy_table::Row, t: Option<T>) {
1200    match t {
1201        Some(t) => {
1202            row.add_cell(t.into());
1203        }
1204        None => {
1205            row.add_cell("".into());
1206        }
1207    }
1208}
1209
1210fn merge_prometheus_selector<'a>(selectors: impl IntoIterator<Item = &'a str>) -> String {
1211    selectors.into_iter().filter(|s| !s.is_empty()).join(",")
1212}
1213
1214fn redact_sql(sql: &str, keywords: RedactSqlOptionKeywordsRef) -> Option<String> {
1215    match Parser::parse_sql(sql) {
1216        Ok(sqls) => Some(
1217            sqls.into_iter()
1218                .map(|sql| sql.to_redacted_string(keywords.clone()))
1219                .join(";"),
1220        ),
1221        Err(_) => None,
1222    }
1223}
1224
1225fn format_features(features: &[&'static str]) -> String {
1226    if features.is_empty() {
1227        return "(none)".into();
1228    }
1229
1230    const PER_LINE: usize = 6;
1231    features
1232        .chunks(PER_LINE)
1233        .map(|chunk| format!("  {}", chunk.join(", ")))
1234        .collect::<Vec<_>>()
1235        .join("\n")
1236}
1237
1238fn format_job_status(status: JobStatus) -> &'static str {
1239    match status {
1240        JobStatus::Initial => "initial",
1241        JobStatus::Creating => "creating",
1242        JobStatus::Created => "created",
1243    }
1244}
1245
1246fn format_streaming_parallelism(parallelism: &StreamingParallelism) -> String {
1247    match parallelism {
1248        StreamingParallelism::Adaptive => "adaptive".into(),
1249        StreamingParallelism::Fixed(n) => format!("fixed({n})"),
1250        StreamingParallelism::Custom => "custom".into(),
1251    }
1252}