risingwave_meta/manager/
diagnose.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::{Ordering, Reverse};
16use std::collections::{BTreeMap, BinaryHeap, HashMap};
17use std::fmt::Write;
18use std::sync::Arc;
19
20use itertools::Itertools;
21use prometheus_http_query::response::Data::Vector;
22use risingwave_common::id::ObjectId;
23use risingwave_common::system_param::AdaptiveParallelismStrategy;
24use risingwave_common::system_param::adaptive_parallelism_strategy::parse_strategy;
25use risingwave_common::system_param::reader::SystemParamsRead;
26use risingwave_common::types::Timestamptz;
27use risingwave_common::util::StackTraceResponseExt;
28use risingwave_common::util::epoch::Epoch;
29use risingwave_hummock_sdk::level::Level;
30use risingwave_hummock_sdk::{CompactionGroupId, HummockSstableId};
31use risingwave_license::LicenseManager;
32use risingwave_meta_model::{JobStatus, StreamingParallelism};
33use risingwave_pb::catalog::table::PbTableType;
34use risingwave_pb::common::WorkerType;
35use risingwave_pb::meta::EventLog;
36use risingwave_pb::meta::event_log::Event;
37use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
38use risingwave_sqlparser::ast::RedactSqlOptionKeywordsRef;
39use risingwave_sqlparser::parser::Parser;
40use serde_json::json;
41use thiserror_ext::AsReport;
42
43use crate::MetaResult;
44use crate::controller::system_param::SystemParamsControllerRef;
45use crate::hummock::HummockManagerRef;
46use crate::manager::MetadataManager;
47use crate::manager::event_log::EventLogManagerRef;
48use crate::manager::iceberg_compaction::IcebergCompactionManagerRef;
49use crate::rpc::await_tree::dump_cluster_await_tree;
50
51pub type DiagnoseCommandRef = Arc<DiagnoseCommand>;
52
53pub struct DiagnoseCommand {
54    metadata_manager: MetadataManager,
55    await_tree_reg: await_tree::Registry,
56    hummock_manger: HummockManagerRef,
57    iceberg_compaction_manager: IcebergCompactionManagerRef,
58    event_log_manager: EventLogManagerRef,
59    prometheus_client: Option<prometheus_http_query::Client>,
60    prometheus_selector: String,
61    redact_sql_option_keywords: RedactSqlOptionKeywordsRef,
62    system_params_controller: SystemParamsControllerRef,
63}
64
65impl DiagnoseCommand {
66    pub fn new(
67        metadata_manager: MetadataManager,
68        await_tree_reg: await_tree::Registry,
69        hummock_manger: HummockManagerRef,
70        iceberg_compaction_manager: IcebergCompactionManagerRef,
71        event_log_manager: EventLogManagerRef,
72        prometheus_client: Option<prometheus_http_query::Client>,
73        prometheus_selector: String,
74        redact_sql_option_keywords: RedactSqlOptionKeywordsRef,
75        system_params_controller: SystemParamsControllerRef,
76    ) -> Self {
77        Self {
78            metadata_manager,
79            await_tree_reg,
80            hummock_manger,
81            iceberg_compaction_manager,
82            event_log_manager,
83            prometheus_client,
84            prometheus_selector,
85            redact_sql_option_keywords,
86            system_params_controller,
87        }
88    }
89
90    pub async fn report(&self, actor_traces_format: ActorTracesFormat) -> String {
91        let mut report = String::new();
92        let _ = writeln!(
93            report,
94            "report created at: {}\nversion: {}",
95            chrono::DateTime::<chrono::offset::Utc>::from(std::time::SystemTime::now()),
96            risingwave_common::current_cluster_version(),
97        );
98        let _ = writeln!(report);
99        self.write_license(&mut report);
100        let _ = writeln!(report);
101        self.write_catalog(&mut report).await;
102        let _ = writeln!(report);
103        self.write_worker_nodes(&mut report).await;
104        let _ = writeln!(report);
105        self.write_streaming_prometheus(&mut report).await;
106        let _ = writeln!(report);
107        self.write_storage(&mut report).await;
108        let _ = writeln!(report);
109        self.write_iceberg_compaction_schedule(&mut report).await;
110        let _ = writeln!(report);
111        self.write_event_logs(&mut report);
112        let _ = writeln!(report);
113        self.write_params(&mut report).await;
114        let _ = writeln!(report);
115        self.write_await_tree(&mut report, actor_traces_format)
116            .await;
117
118        report
119    }
120
121    async fn write_catalog(&self, s: &mut String) {
122        self.write_catalog_inner(s).await;
123        let _ = self.write_table_definition(s).await.inspect_err(|e| {
124            tracing::warn!(
125                error = e.to_report_string(),
126                "failed to display table definition"
127            )
128        });
129    }
130
131    async fn write_catalog_inner(&self, s: &mut String) {
132        let stats = self.metadata_manager.catalog_controller.stats().await;
133
134        let stat = match stats {
135            Ok(stat) => stat,
136            Err(err) => {
137                tracing::warn!(error=?err.as_report(), "failed to get catalog stats");
138                return;
139            }
140        };
141        let _ = writeln!(s, "number of database: {}", stat.database_num);
142        let _ = writeln!(s, "number of fragment: {}", stat.streaming_job_num);
143        let _ = writeln!(s, "number of actor: {}", stat.actor_num);
144        let _ = writeln!(s, "number of source: {}", stat.source_num);
145        let _ = writeln!(s, "number of table: {}", stat.table_num);
146        let _ = writeln!(s, "number of materialized view: {}", stat.mview_num);
147        let _ = writeln!(s, "number of sink: {}", stat.sink_num);
148        let _ = writeln!(s, "number of index: {}", stat.index_num);
149        let _ = writeln!(s, "number of function: {}", stat.function_num);
150
151        self.write_databases(s).await;
152        self.write_schemas(s).await;
153    }
154
155    async fn write_databases(&self, s: &mut String) {
156        let databases = match self
157            .metadata_manager
158            .catalog_controller
159            .list_databases()
160            .await
161        {
162            Ok(databases) => databases,
163            Err(err) => {
164                tracing::warn!(error=?err.as_report(), "failed to list databases");
165                return;
166            }
167        };
168
169        use comfy_table::{Row, Table};
170        let mut table = Table::new();
171        table.set_header({
172            let mut row = Row::new();
173            row.add_cell("id".into());
174            row.add_cell("name".into());
175            row.add_cell("resource_group".into());
176            row.add_cell("barrier_interval_ms".into());
177            row.add_cell("checkpoint_frequency".into());
178            row
179        });
180        for db in databases {
181            let mut row = Row::new();
182            row.add_cell(db.id.into());
183            row.add_cell(db.name.into());
184            row.add_cell(db.resource_group.into());
185            row.add_cell(
186                db.barrier_interval_ms
187                    .map(|v| v.to_string())
188                    .unwrap_or("default".into())
189                    .into(),
190            );
191            row.add_cell(
192                db.checkpoint_frequency
193                    .map(|v| v.to_string())
194                    .unwrap_or("default".into())
195                    .into(),
196            );
197            table.add_row(row);
198        }
199
200        let _ = writeln!(s, "DATABASE");
201        let _ = writeln!(s, "{table}");
202    }
203
204    async fn write_schemas(&self, s: &mut String) {
205        let schemas = match self
206            .metadata_manager
207            .catalog_controller
208            .list_schemas()
209            .await
210        {
211            Ok(schemas) => schemas,
212            Err(err) => {
213                tracing::warn!(error=?err.as_report(), "failed to list schemas");
214                return;
215            }
216        };
217
218        use comfy_table::{Row, Table};
219        let mut table = Table::new();
220        table.set_header({
221            let mut row = Row::new();
222            row.add_cell("id".into());
223            row.add_cell("database_id".into());
224            row.add_cell("name".into());
225            row
226        });
227        for schema in schemas {
228            let mut row = Row::new();
229            row.add_cell(schema.id.into());
230            row.add_cell(schema.database_id.into());
231            row.add_cell(schema.name.into());
232            table.add_row(row);
233        }
234
235        let _ = writeln!(s, "SCHEMA");
236        let _ = writeln!(s, "{table}");
237    }
238
239    async fn write_worker_nodes(&self, s: &mut String) {
240        let Ok(worker_actor_count) = self.metadata_manager.worker_actor_count() else {
241            tracing::warn!("failed to get worker actor count");
242            return;
243        };
244
245        use comfy_table::{Row, Table};
246        let Ok(worker_nodes) = self.metadata_manager.list_worker_node(None, None).await else {
247            tracing::warn!("failed to get worker nodes");
248            return;
249        };
250        let mut table = Table::new();
251        table.set_header({
252            let mut row = Row::new();
253            row.add_cell("id".into());
254            row.add_cell("host".into());
255            row.add_cell("hostname".into());
256            row.add_cell("type".into());
257            row.add_cell("state".into());
258            row.add_cell("parallelism".into());
259            row.add_cell("resource_group".into());
260            row.add_cell("is_streaming".into());
261            row.add_cell("is_serving".into());
262            row.add_cell("is_iceberg_compactor".into());
263            row.add_cell("rw_version".into());
264            row.add_cell("total_memory_bytes".into());
265            row.add_cell("total_cpu_cores".into());
266            row.add_cell("started_at".into());
267            row.add_cell("actor_count".into());
268            row
269        });
270        for worker_node in worker_nodes {
271            let mut row = Row::new();
272            row.add_cell(worker_node.id.into());
273            try_add_cell(
274                &mut row,
275                worker_node
276                    .host
277                    .as_ref()
278                    .map(|h| format!("{}:{}", h.host, h.port)),
279            );
280            try_add_cell(
281                &mut row,
282                worker_node.resource.as_ref().map(|r| r.hostname.clone()),
283            );
284            try_add_cell(
285                &mut row,
286                worker_node.get_type().ok().map(|t| t.as_str_name()),
287            );
288            try_add_cell(
289                &mut row,
290                worker_node.get_state().ok().map(|s| s.as_str_name()),
291            );
292            try_add_cell(&mut row, worker_node.parallelism());
293            try_add_cell(
294                &mut row,
295                worker_node
296                    .property
297                    .as_ref()
298                    .map(|p| p.resource_group.clone().unwrap_or("".to_owned())),
299            );
300            // is_streaming and is_serving are only meaningful for ComputeNode
301            let (is_streaming, is_serving) = {
302                if let Ok(t) = worker_node.get_type()
303                    && t == WorkerType::ComputeNode
304                {
305                    (
306                        worker_node.property.as_ref().map(|p| p.is_streaming),
307                        worker_node.property.as_ref().map(|p| p.is_serving),
308                    )
309                } else {
310                    (None, None)
311                }
312            };
313            try_add_cell(&mut row, is_streaming);
314            try_add_cell(&mut row, is_serving);
315            // is_iceberg_compactor is only meaningful for Compactor worker type
316            let is_iceberg_compactor = {
317                if let Ok(t) = worker_node.get_type()
318                    && t == WorkerType::Compactor
319                {
320                    worker_node
321                        .property
322                        .as_ref()
323                        .map(|p| p.is_iceberg_compactor)
324                } else {
325                    None
326                }
327            };
328            try_add_cell(&mut row, is_iceberg_compactor);
329            try_add_cell(
330                &mut row,
331                worker_node.resource.as_ref().map(|r| r.rw_version.clone()),
332            );
333            try_add_cell(
334                &mut row,
335                worker_node.resource.as_ref().map(|r| r.total_memory_bytes),
336            );
337            try_add_cell(
338                &mut row,
339                worker_node.resource.as_ref().map(|r| r.total_cpu_cores),
340            );
341            try_add_cell(
342                &mut row,
343                worker_node
344                    .started_at
345                    .and_then(|ts| Timestamptz::from_secs(ts as _).map(|t| t.to_string())),
346            );
347            let actor_count = {
348                if let Ok(t) = worker_node.get_type()
349                    && t != WorkerType::ComputeNode
350                {
351                    None
352                } else {
353                    match worker_actor_count.get(&worker_node.id) {
354                        None => Some(0),
355                        Some(c) => Some(*c),
356                    }
357                }
358            };
359            try_add_cell(&mut row, actor_count);
360            table.add_row(row);
361        }
362        let _ = writeln!(s, "{table}");
363    }
364
365    fn write_event_logs(&self, s: &mut String) {
366        let event_logs = self
367            .event_log_manager
368            .list_event_logs()
369            .into_iter()
370            .sorted_by(|a, b| {
371                a.timestamp
372                    .unwrap_or(0)
373                    .cmp(&b.timestamp.unwrap_or(0))
374                    .reverse()
375            })
376            .collect_vec();
377
378        let _ = writeln!(s, "latest barrier completions");
379        Self::write_event_logs_impl(
380            s,
381            event_logs.iter(),
382            |e| {
383                let Event::BarrierComplete(info) = e else {
384                    return None;
385                };
386                Some(json!(info).to_string())
387            },
388            Some(10),
389        );
390
391        let _ = writeln!(s);
392        let _ = writeln!(s, "latest barrier collection failures");
393        Self::write_event_logs_impl(
394            s,
395            event_logs.iter(),
396            |e| {
397                let Event::CollectBarrierFail(info) = e else {
398                    return None;
399                };
400                Some(json!(info).to_string())
401            },
402            Some(3),
403        );
404
405        let _ = writeln!(s);
406        let _ = writeln!(s, "latest barrier injection failures");
407        Self::write_event_logs_impl(
408            s,
409            event_logs.iter(),
410            |e| {
411                let Event::InjectBarrierFail(info) = e else {
412                    return None;
413                };
414                Some(json!(info).to_string())
415            },
416            Some(3),
417        );
418
419        let _ = writeln!(s);
420        let _ = writeln!(s, "latest worker node panics");
421        Self::write_event_logs_impl(
422            s,
423            event_logs.iter(),
424            |e| {
425                let Event::WorkerNodePanic(info) = e else {
426                    return None;
427                };
428                Some(json!(info).to_string())
429            },
430            Some(10),
431        );
432
433        let _ = writeln!(s);
434        let _ = writeln!(s, "latest create stream job failures");
435        Self::write_event_logs_impl(
436            s,
437            event_logs.iter(),
438            |e| {
439                let Event::CreateStreamJobFail(info) = e else {
440                    return None;
441                };
442                Some(json!(info).to_string())
443            },
444            Some(3),
445        );
446
447        let _ = writeln!(s);
448        let _ = writeln!(s, "latest dirty stream job clear-ups");
449        Self::write_event_logs_impl(
450            s,
451            event_logs.iter(),
452            |e| {
453                let Event::DirtyStreamJobClear(info) = e else {
454                    return None;
455                };
456                Some(json!(info).to_string())
457            },
458            Some(3),
459        );
460    }
461
462    fn write_event_logs_impl<'a, F>(
463        s: &mut String,
464        event_logs: impl Iterator<Item = &'a EventLog>,
465        get_event_info: F,
466        limit: Option<usize>,
467    ) where
468        F: Fn(&Event) -> Option<String>,
469    {
470        use comfy_table::{Row, Table};
471        let mut table = Table::new();
472        table.set_header({
473            let mut row = Row::new();
474            row.add_cell("created_at".into());
475            row.add_cell("info".into());
476            row
477        });
478        let mut row_count = 0;
479        for event_log in event_logs {
480            let Some(ref inner) = event_log.event else {
481                continue;
482            };
483            if let Some(limit) = limit
484                && row_count >= limit
485            {
486                break;
487            }
488            let mut row = Row::new();
489            let ts = event_log
490                .timestamp
491                .and_then(|ts| Timestamptz::from_millis(ts as _).map(|ts| ts.to_string()));
492            try_add_cell(&mut row, ts);
493            if let Some(info) = get_event_info(inner) {
494                row.add_cell(info.into());
495                row_count += 1;
496            } else {
497                continue;
498            }
499            table.add_row(row);
500        }
501        let _ = writeln!(s, "{table}");
502    }
503
504    async fn write_storage(&self, s: &mut String) {
505        let mut sst_num = 0;
506        let mut sst_total_file_size = 0;
507        let back_pressured_compaction_groups = self
508            .hummock_manger
509            .write_limits()
510            .await
511            .into_iter()
512            .filter_map(|(k, v)| {
513                if v.table_ids.is_empty() {
514                    None
515                } else {
516                    Some(k)
517                }
518            })
519            .join(",");
520        if !back_pressured_compaction_groups.is_empty() {
521            let _ = writeln!(
522                s,
523                "back pressured compaction groups: {back_pressured_compaction_groups}"
524            );
525        }
526
527        #[derive(PartialEq, Eq)]
528        struct SstableSort {
529            compaction_group_id: CompactionGroupId,
530            sst_id: HummockSstableId,
531            delete_ratio: u64,
532        }
533        impl PartialOrd for SstableSort {
534            fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
535                Some(self.cmp(other))
536            }
537        }
538        impl Ord for SstableSort {
539            fn cmp(&self, other: &Self) -> Ordering {
540                self.delete_ratio.cmp(&other.delete_ratio)
541            }
542        }
543        fn top_k_sstables(
544            top_k: usize,
545            heap: &mut BinaryHeap<Reverse<SstableSort>>,
546            e: SstableSort,
547        ) {
548            if heap.len() < top_k {
549                heap.push(Reverse(e));
550            } else if let Some(mut p) = heap.peek_mut()
551                && e.delete_ratio > p.0.delete_ratio
552            {
553                *p = Reverse(e);
554            }
555        }
556
557        let top_k = 10;
558        let mut top_tombstone_delete_sst = BinaryHeap::with_capacity(top_k);
559        let compaction_group_num = self
560            .hummock_manger
561            .on_current_version(|version| {
562                for compaction_group in version.levels.values() {
563                    let mut visit_level = |level: &Level| {
564                        sst_num += level.table_infos.len();
565                        sst_total_file_size +=
566                            level.table_infos.iter().map(|t| t.sst_size).sum::<u64>();
567                        for sst in &level.table_infos {
568                            if sst.total_key_count == 0 {
569                                continue;
570                            }
571                            let tombstone_delete_ratio =
572                                sst.stale_key_count * 10000 / sst.total_key_count;
573                            let e = SstableSort {
574                                compaction_group_id: compaction_group.group_id,
575                                sst_id: sst.sst_id,
576                                delete_ratio: tombstone_delete_ratio,
577                            };
578                            top_k_sstables(top_k, &mut top_tombstone_delete_sst, e);
579                        }
580                    };
581                    let l0 = &compaction_group.l0;
582                    // FIXME: why chaining levels iter leads to segmentation fault?
583                    for level in &l0.sub_levels {
584                        visit_level(level);
585                    }
586                    for level in &compaction_group.levels {
587                        visit_level(level);
588                    }
589                }
590                version.levels.len()
591            })
592            .await;
593
594        let _ = writeln!(s, "number of SSTables: {sst_num}");
595        let _ = writeln!(s, "total size of SSTables (byte): {sst_total_file_size}");
596        let _ = writeln!(s, "number of compaction groups: {compaction_group_num}");
597        use comfy_table::{Row, Table};
598        fn format_table(heap: BinaryHeap<Reverse<SstableSort>>) -> Table {
599            let mut table = Table::new();
600            table.set_header({
601                let mut row = Row::new();
602                row.add_cell("compaction group id".into());
603                row.add_cell("sstable id".into());
604                row.add_cell("delete ratio".into());
605                row
606            });
607            for sst in &heap.into_sorted_vec() {
608                let mut row = Row::new();
609                row.add_cell(sst.0.compaction_group_id.into());
610                row.add_cell(sst.0.sst_id.into());
611                row.add_cell(format!("{:.2}%", sst.0.delete_ratio as f64 / 100.0).into());
612                table.add_row(row);
613            }
614            table
615        }
616        let _ = writeln!(s);
617        let _ = writeln!(s, "top tombstone delete ratio");
618        let _ = writeln!(s, "{}", format_table(top_tombstone_delete_sst));
619        let _ = writeln!(s);
620
621        let _ = writeln!(s);
622        self.write_storage_prometheus(s).await;
623    }
624
625    async fn write_iceberg_compaction_schedule(&self, s: &mut String) {
626        use comfy_table::{Row, Table};
627
628        let sink_names = match self.metadata_manager.catalog_controller.list_sinks().await {
629            Ok(sinks) => sinks
630                .into_iter()
631                .map(|sink| (sink.id.as_raw_id(), sink.name))
632                .collect::<BTreeMap<u32, String>>(),
633            Err(err) => {
634                tracing::warn!(error=?err.as_report(), "failed to list sinks");
635                return;
636            }
637        };
638
639        let statuses = self.iceberg_compaction_manager.list_compaction_statuses();
640
641        let _ = writeln!(s, "ICEBERG COMPACTION SCHEDULE");
642
643        let mut table = Table::new();
644        table.set_header({
645            let mut row = Row::new();
646            row.add_cell("sink_id".into());
647            row.add_cell("sink_name".into());
648            row.add_cell("task_type".into());
649            row.add_cell("schedule_state".into());
650            row.add_cell("trigger_interval_sec".into());
651            row.add_cell("trigger_snapshot_count".into());
652            row.add_cell("pending_snapshot_count".into());
653            row.add_cell("next_compaction_after_sec".into());
654            row.add_cell("is_triggerable".into());
655            row
656        });
657
658        for status in statuses {
659            let mut row = Row::new();
660            row.add_cell(status.sink_id.as_raw_id().into());
661            try_add_cell(
662                &mut row,
663                sink_names.get(&status.sink_id.as_raw_id()).cloned(),
664            );
665            row.add_cell(status.task_type.into());
666            row.add_cell(status.schedule_state.into());
667            row.add_cell(status.trigger_interval_sec.into());
668            row.add_cell((status.trigger_snapshot_count as u64).into());
669            try_add_cell(
670                &mut row,
671                status.pending_snapshot_count.map(|count| count as u64),
672            );
673            try_add_cell(&mut row, status.next_compaction_after_sec);
674            row.add_cell(status.is_triggerable.into());
675            table.add_row(row);
676        }
677
678        let _ = writeln!(s, "{table}");
679    }
680
681    async fn write_streaming_prometheus(&self, s: &mut String) {
682        let _ = writeln!(s, "top sources by throughput (rows/s)");
683        let query = format!(
684            "topk(3, sum(rate(stream_source_output_rows_counts{{{}}}[10m]))by (source_name))",
685            self.prometheus_selector
686        );
687        self.write_instant_vector_impl(s, &query, vec!["source_name"])
688            .await;
689
690        let _ = writeln!(s);
691        let _ = writeln!(s, "top materialized views by throughput (rows/s)");
692        let query = format!(
693            "topk(3, sum(rate(stream_mview_input_row_count{{{}}}[10m]))by (table_id))",
694            self.prometheus_selector
695        );
696        self.write_instant_vector_impl(s, &query, vec!["table_id"])
697            .await;
698
699        let _ = writeln!(s);
700        let _ = writeln!(s, "top join executor by matched rows");
701        let query = format!(
702            "topk(3, histogram_quantile(0.9, sum(rate(stream_join_matched_join_keys_bucket{{{}}}[10m])) by (le, fragment_id, table_id)))",
703            self.prometheus_selector
704        );
705        self.write_instant_vector_impl(s, &query, vec!["table_id", "fragment_id"])
706            .await;
707    }
708
709    async fn write_storage_prometheus(&self, s: &mut String) {
710        let _ = writeln!(s, "top Hummock Get by duration (second)");
711        let query = format!(
712            "topk(3, histogram_quantile(0.9, sum(rate(state_store_get_duration_bucket{{{}}}[10m])) by (le, table_id)))",
713            self.prometheus_selector
714        );
715        self.write_instant_vector_impl(s, &query, vec!["table_id"])
716            .await;
717
718        let _ = writeln!(s);
719        let _ = writeln!(s, "top Hummock Iter Init by duration (second)");
720        let query = format!(
721            "topk(3, histogram_quantile(0.9, sum(rate(state_store_iter_init_duration_bucket{{{}}}[10m])) by (le, table_id)))",
722            self.prometheus_selector
723        );
724        self.write_instant_vector_impl(s, &query, vec!["table_id"])
725            .await;
726
727        let _ = writeln!(s);
728        let _ = writeln!(s, "top table commit flush by size (byte)");
729        let query = format!(
730            "topk(3, sum(rate(storage_commit_write_throughput{{{}}}[10m])) by (table_id))",
731            self.prometheus_selector
732        );
733        self.write_instant_vector_impl(s, &query, vec!["table_id"])
734            .await;
735
736        let _ = writeln!(s);
737        let _ = writeln!(s, "object store read throughput (bytes/s)");
738        let query = format!(
739            "sum(rate(object_store_read_bytes{{{}}}[10m])) by (job, instance)",
740            merge_prometheus_selector([&self.prometheus_selector, "job=~\"compute|compactor\""])
741        );
742        self.write_instant_vector_impl(s, &query, vec!["job", "instance"])
743            .await;
744
745        let _ = writeln!(s);
746        let _ = writeln!(s, "object store write throughput (bytes/s)");
747        let query = format!(
748            "sum(rate(object_store_write_bytes{{{}}}[10m])) by (job, instance)",
749            merge_prometheus_selector([&self.prometheus_selector, "job=~\"compute|compactor\""])
750        );
751        self.write_instant_vector_impl(s, &query, vec!["job", "instance"])
752            .await;
753
754        let _ = writeln!(s);
755        let _ = writeln!(s, "object store operation rate");
756        let query = format!(
757            "sum(rate(object_store_operation_latency_count{{{}}}[10m])) by (le, type, job, instance)",
758            merge_prometheus_selector([
759                &self.prometheus_selector,
760                "job=~\"compute|compactor\", type!~\"streaming_upload_write_bytes|streaming_read_read_bytes|streaming_read\""
761            ])
762        );
763        self.write_instant_vector_impl(s, &query, vec!["type", "job", "instance"])
764            .await;
765
766        let _ = writeln!(s);
767        let _ = writeln!(s, "object store operation duration (second)");
768        let query = format!(
769            "histogram_quantile(0.9, sum(rate(object_store_operation_latency_bucket{{{}}}[10m])) by (le, type, job, instance))",
770            merge_prometheus_selector([
771                &self.prometheus_selector,
772                "job=~\"compute|compactor\", type!~\"streaming_upload_write_bytes|streaming_read\""
773            ])
774        );
775        self.write_instant_vector_impl(s, &query, vec!["type", "job", "instance"])
776            .await;
777    }
778
779    async fn write_instant_vector_impl(&self, s: &mut String, query: &str, labels: Vec<&str>) {
780        let Some(ref client) = self.prometheus_client else {
781            return;
782        };
783        if let Ok(Vector(instant_vec)) = client
784            .query(query)
785            .get()
786            .await
787            .map(|result| result.into_inner().0)
788        {
789            for i in instant_vec {
790                let l = labels
791                    .iter()
792                    .map(|label| {
793                        format!(
794                            "{}={}",
795                            *label,
796                            i.metric()
797                                .get(*label)
798                                .map(|s| s.as_str())
799                                .unwrap_or_default()
800                        )
801                    })
802                    .join(",");
803                let _ = writeln!(s, "{}: {:.3}", l, i.sample().value());
804            }
805        }
806    }
807
808    async fn write_await_tree(&self, s: &mut String, actor_traces_format: ActorTracesFormat) {
809        let all = dump_cluster_await_tree(
810            &self.metadata_manager,
811            &self.await_tree_reg,
812            actor_traces_format,
813        )
814        .await;
815
816        if let Ok(all) = all {
817            write!(s, "{}", all.output()).unwrap();
818        } else {
819            tracing::warn!("failed to dump await tree");
820        }
821    }
822
823    async fn write_table_definition(&self, s: &mut String) -> MetaResult<()> {
824        let sources = self
825            .metadata_manager
826            .catalog_controller
827            .list_sources()
828            .await?
829            .into_iter()
830            .map(|s| {
831                (
832                    s.id.into(),
833                    (
834                        s.name,
835                        s.database_id,
836                        s.schema_id,
837                        s.definition,
838                        s.created_at_epoch,
839                    ),
840                )
841            })
842            .collect::<BTreeMap<_, _>>();
843        let mut user_tables = BTreeMap::new();
844        let mut mvs = BTreeMap::new();
845        let mut indexes = BTreeMap::new();
846        let mut internal_tables = BTreeMap::new();
847        {
848            let grouped = self
849                .metadata_manager
850                .catalog_controller
851                .list_all_state_tables()
852                .await?
853                .into_iter()
854                .chunk_by(|t| t.table_type());
855            for (table_type, tables) in &grouped {
856                let tables = tables.into_iter().map(|t| {
857                    (
858                        t.id.into(),
859                        (
860                            t.name,
861                            t.database_id,
862                            t.schema_id,
863                            t.definition,
864                            t.created_at_epoch,
865                        ),
866                    )
867                });
868                match table_type {
869                    PbTableType::Table => user_tables.extend(tables),
870                    PbTableType::MaterializedView => mvs.extend(tables),
871                    PbTableType::Index | PbTableType::VectorIndex => indexes.extend(tables),
872                    PbTableType::Internal => internal_tables.extend(tables),
873                    PbTableType::Unspecified => {
874                        tracing::error!("unspecified table type: {:?}", tables.collect_vec());
875                    }
876                }
877            }
878        }
879        let sinks = self
880            .metadata_manager
881            .catalog_controller
882            .list_sinks()
883            .await?
884            .into_iter()
885            .map(|s| {
886                (
887                    s.id.into(),
888                    (
889                        s.name,
890                        s.database_id,
891                        s.schema_id,
892                        s.definition,
893                        s.created_at_epoch,
894                    ),
895                )
896            })
897            .collect::<BTreeMap<_, _>>();
898        let views = self
899            .metadata_manager
900            .catalog_controller
901            .list_views()
902            .await?
903            .into_iter()
904            .map(|v| {
905                (
906                    v.id.into(),
907                    (v.name, v.database_id, v.schema_id, v.sql, None),
908                )
909            })
910            .collect::<BTreeMap<_, _>>();
911        let mut streaming_jobs = self
912            .metadata_manager
913            .catalog_controller
914            .list_streaming_job_infos()
915            .await?;
916        streaming_jobs.sort_by_key(|info| (info.obj_type as usize, info.job_id));
917        {
918            use comfy_table::{Row, Table};
919            let mut table = Table::new();
920            table.set_header({
921                let mut row = Row::new();
922                row.add_cell("job_id".into());
923                row.add_cell("name".into());
924                row.add_cell("obj_type".into());
925                row.add_cell("state".into());
926                row.add_cell("parallelism".into());
927                row.add_cell("max_parallelism".into());
928                row.add_cell("resource_group".into());
929                row.add_cell("database_id".into());
930                row.add_cell("schema_id".into());
931                row.add_cell("config_override".into());
932                row
933            });
934            for job in streaming_jobs {
935                let mut row = Row::new();
936                row.add_cell(job.job_id.into());
937                row.add_cell(job.name.into());
938                row.add_cell(job.obj_type.as_str().into());
939                row.add_cell(format_job_status(job.job_status).into());
940                row.add_cell(
941                    format_streaming_parallelism(
942                        &job.parallelism,
943                        job.adaptive_parallelism_strategy.as_deref(),
944                    )
945                    .into(),
946                );
947                row.add_cell(job.max_parallelism.into());
948                row.add_cell(job.resource_group.into());
949                row.add_cell(job.database_id.into());
950                row.add_cell(job.schema_id.into());
951                row.add_cell(job.config_override.into());
952                table.add_row(row);
953            }
954            let _ = writeln!(s);
955            let _ = writeln!(s, "STREAMING JOB");
956            let _ = writeln!(s, "{table}");
957        }
958        let catalogs = [
959            ("SOURCE", sources),
960            ("TABLE", user_tables),
961            ("MATERIALIZED VIEW", mvs),
962            ("INDEX", indexes),
963            ("SINK", sinks),
964            ("VIEW", views),
965            ("INTERNAL TABLE", internal_tables),
966        ];
967        let mut obj_id_to_name: HashMap<ObjectId, _> = HashMap::new();
968        for (title, items) in catalogs {
969            use comfy_table::{Row, Table};
970            let mut table = Table::new();
971            table.set_header({
972                let mut row = Row::new();
973                row.add_cell("id".into());
974                row.add_cell("name".into());
975                row.add_cell("database_id".into());
976                row.add_cell("schema_id".into());
977                row.add_cell("created_at".into());
978                row.add_cell("definition".into());
979                row
980            });
981            for (id, (name, database_id, schema_id, definition, created_at_epoch)) in items {
982                obj_id_to_name.insert(id, name.clone());
983                let mut row = Row::new();
984                let may_redact = redact_sql(&definition, self.redact_sql_option_keywords.clone())
985                    .unwrap_or_else(|| "[REDACTED]".into());
986                let created_at = if let Some(created_at_epoch) = created_at_epoch {
987                    format!("{}", Epoch::from(created_at_epoch).as_timestamptz())
988                } else {
989                    "".into()
990                };
991                row.add_cell(id.into());
992                row.add_cell(name.into());
993                row.add_cell(database_id.into());
994                row.add_cell(schema_id.into());
995                row.add_cell(created_at.into());
996                row.add_cell(may_redact.into());
997                table.add_row(row);
998            }
999            let _ = writeln!(s);
1000            let _ = writeln!(s, "{title}");
1001            let _ = writeln!(s, "{table}");
1002        }
1003
1004        let mut fragments = BTreeMap::new();
1005        for (actor_id, fragment_id, job_id, schema_id, obj_type) in self
1006            .metadata_manager
1007            .catalog_controller
1008            .list_actor_info()
1009            .await?
1010        {
1011            let entry = fragments
1012                .entry(fragment_id)
1013                .or_insert_with(|| (job_id, schema_id, obj_type, Vec::new()));
1014            entry.3.push(actor_id);
1015        }
1016
1017        use comfy_table::{Row, Table};
1018        let mut table = Table::new();
1019        table.set_header({
1020            let mut row = Row::new();
1021            row.add_cell("fragment_id".into());
1022            row.add_cell("job_id".into());
1023            row.add_cell("schema_id".into());
1024            row.add_cell("type".into());
1025            row.add_cell("name".into());
1026            row.add_cell("actor_count".into());
1027            row.add_cell("actors".into());
1028            row
1029        });
1030        for (fragment_id, (job_id, schema_id, obj_type, mut actor_ids)) in fragments {
1031            actor_ids.sort_unstable();
1032            let mut row = Row::new();
1033            row.add_cell(fragment_id.into());
1034            row.add_cell(job_id.into());
1035            row.add_cell(schema_id.into());
1036            row.add_cell(obj_type.as_str().into());
1037            row.add_cell(
1038                obj_id_to_name
1039                    .get(&job_id)
1040                    .cloned()
1041                    .unwrap_or_default()
1042                    .into(),
1043            );
1044            row.add_cell(actor_ids.len().into());
1045            row.add_cell(actor_ids.iter().join(", ").into());
1046            table.add_row(row);
1047        }
1048        let _ = writeln!(s);
1049        let _ = writeln!(s, "FRAGMENT");
1050        let _ = writeln!(s, "{table}");
1051        Ok(())
1052    }
1053
1054    fn write_license(&self, s: &mut String) {
1055        use comfy_table::presets::ASCII_BORDERS_ONLY;
1056        use comfy_table::{ContentArrangement, Row, Table};
1057
1058        let mut table = Table::new();
1059        table.load_preset(ASCII_BORDERS_ONLY);
1060        table.set_content_arrangement(ContentArrangement::Dynamic);
1061        table.set_header({
1062            let mut row = Row::new();
1063            row.add_cell("field".into());
1064            row.add_cell("value".into());
1065            row
1066        });
1067
1068        match LicenseManager::get().license() {
1069            Ok(license) => {
1070                let fmt_option = |value: Option<u64>| match value {
1071                    Some(v) => v.to_string(),
1072                    None => "unlimited".to_owned(),
1073                };
1074
1075                let expires_at = if license.exp == u64::MAX {
1076                    "never".to_owned()
1077                } else {
1078                    let exp_i64 = license.exp as i64;
1079                    chrono::DateTime::<chrono::Utc>::from_timestamp(exp_i64, 0)
1080                        .map(|ts| ts.to_rfc3339())
1081                        .unwrap_or_else(|| format!("invalid ({})", license.exp))
1082                };
1083
1084                let mut row = Row::new();
1085                row.add_cell("status".into());
1086                row.add_cell("valid".into());
1087                table.add_row(row);
1088
1089                let mut row = Row::new();
1090                row.add_cell("tier".into());
1091                row.add_cell(license.tier.name().into());
1092                table.add_row(row);
1093
1094                let mut row = Row::new();
1095                row.add_cell("expires_at".into());
1096                row.add_cell(expires_at.into());
1097                table.add_row(row);
1098
1099                let mut row = Row::new();
1100                row.add_cell("rwu_limit".into());
1101                row.add_cell(fmt_option(license.rwu_limit.map(|v| v.get())).into());
1102                table.add_row(row);
1103
1104                let mut row = Row::new();
1105                row.add_cell("cpu_core_limit".into());
1106                row.add_cell(fmt_option(license.cpu_core_limit()).into());
1107                table.add_row(row);
1108
1109                let mut row = Row::new();
1110                row.add_cell("memory_limit_bytes".into());
1111                row.add_cell(fmt_option(license.memory_limit()).into());
1112                table.add_row(row);
1113
1114                let mut features: Vec<_> = license
1115                    .tier
1116                    .available_features()
1117                    .map(|f| f.name())
1118                    .collect();
1119                features.sort_unstable();
1120                let feature_summary = format_features(&features);
1121
1122                let mut row = Row::new();
1123                row.add_cell("available_features".into());
1124                row.add_cell(feature_summary.into());
1125                table.add_row(row);
1126            }
1127            Err(error) => {
1128                let mut row = Row::new();
1129                row.add_cell("status".into());
1130                row.add_cell("invalid".into());
1131                table.add_row(row);
1132
1133                let mut row = Row::new();
1134                row.add_cell("error".into());
1135                row.add_cell(error.to_report_string().into());
1136                table.add_row(row);
1137            }
1138        }
1139
1140        let _ = writeln!(s, "LICENSE");
1141        let _ = writeln!(s, "{table}");
1142    }
1143
1144    async fn write_params(&self, s: &mut String) {
1145        let params = self.system_params_controller.get_params().await;
1146
1147        use comfy_table::{Row, Table};
1148        let mut table = Table::new();
1149        table.set_header({
1150            let mut row = Row::new();
1151            row.add_cell("key".into());
1152            row.add_cell("value".into());
1153            row
1154        });
1155
1156        let mut row = Row::new();
1157        row.add_cell("barrier_interval_ms".into());
1158        row.add_cell(params.barrier_interval_ms().to_string().into());
1159        table.add_row(row);
1160
1161        let mut row = Row::new();
1162        row.add_cell("checkpoint_frequency".into());
1163        row.add_cell(params.checkpoint_frequency().to_string().into());
1164        table.add_row(row);
1165
1166        let mut row = Row::new();
1167        row.add_cell("state_store".into());
1168        row.add_cell(params.state_store().to_owned().into());
1169        table.add_row(row);
1170
1171        let mut row = Row::new();
1172        row.add_cell("data_directory".into());
1173        row.add_cell(params.data_directory().to_owned().into());
1174        table.add_row(row);
1175
1176        let mut row = Row::new();
1177        row.add_cell("max_concurrent_creating_streaming_jobs".into());
1178        row.add_cell(
1179            params
1180                .max_concurrent_creating_streaming_jobs()
1181                .to_string()
1182                .into(),
1183        );
1184        table.add_row(row);
1185
1186        let mut row = Row::new();
1187        row.add_cell("time_travel_retention_ms".into());
1188        row.add_cell(params.time_travel_retention_ms().to_string().into());
1189        table.add_row(row);
1190
1191        let mut row = Row::new();
1192        row.add_cell("per_database_isolation".into());
1193        row.add_cell(params.per_database_isolation().to_string().into());
1194        table.add_row(row);
1195
1196        let _ = writeln!(s, "SYSTEM PARAMS");
1197        let _ = writeln!(s, "{table}");
1198    }
1199}
1200
1201fn try_add_cell<T: Into<comfy_table::Cell>>(row: &mut comfy_table::Row, t: Option<T>) {
1202    match t {
1203        Some(t) => {
1204            row.add_cell(t.into());
1205        }
1206        None => {
1207            row.add_cell("".into());
1208        }
1209    }
1210}
1211
1212fn merge_prometheus_selector<'a>(selectors: impl IntoIterator<Item = &'a str>) -> String {
1213    selectors.into_iter().filter(|s| !s.is_empty()).join(",")
1214}
1215
1216fn redact_sql(sql: &str, keywords: RedactSqlOptionKeywordsRef) -> Option<String> {
1217    match Parser::parse_sql(sql) {
1218        Ok(sqls) => Some(
1219            sqls.into_iter()
1220                .map(|sql| sql.to_redacted_string(keywords.clone()))
1221                .join(";"),
1222        ),
1223        Err(_) => None,
1224    }
1225}
1226
1227fn format_features(features: &[&'static str]) -> String {
1228    if features.is_empty() {
1229        return "(none)".into();
1230    }
1231
1232    const PER_LINE: usize = 6;
1233    features
1234        .chunks(PER_LINE)
1235        .map(|chunk| format!("  {}", chunk.join(", ")))
1236        .collect::<Vec<_>>()
1237        .join("\n")
1238}
1239
1240fn format_job_status(status: JobStatus) -> &'static str {
1241    match status {
1242        JobStatus::Initial => "initial",
1243        JobStatus::Creating => "creating",
1244        JobStatus::Created => "created",
1245    }
1246}
1247
1248fn format_streaming_parallelism(
1249    parallelism: &StreamingParallelism,
1250    adaptive_parallelism_strategy: Option<&str>,
1251) -> String {
1252    match parallelism {
1253        StreamingParallelism::Adaptive => adaptive_parallelism_strategy
1254            .and_then(format_adaptive_parallelism_strategy)
1255            .unwrap_or_else(|| "adaptive".into()),
1256        StreamingParallelism::Fixed(n) => n.to_string(),
1257        StreamingParallelism::Custom => adaptive_parallelism_strategy
1258            .and_then(format_adaptive_parallelism_strategy)
1259            .unwrap_or_else(|| "custom".into()),
1260    }
1261}
1262
1263fn format_adaptive_parallelism_strategy(strategy: &str) -> Option<String> {
1264    parse_strategy(strategy)
1265        .ok()
1266        .map(|strategy| match strategy {
1267            AdaptiveParallelismStrategy::Auto | AdaptiveParallelismStrategy::Full => {
1268                "adaptive".to_owned()
1269            }
1270            AdaptiveParallelismStrategy::Bounded(n) => format!("bounded({n})"),
1271            AdaptiveParallelismStrategy::Ratio(r) => format!("ratio({r})"),
1272        })
1273}