risingwave_meta/manager/
diagnose.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::{Ordering, Reverse};
16use std::collections::{BTreeMap, BinaryHeap, HashMap};
17use std::fmt::Write;
18use std::sync::Arc;
19
20use itertools::Itertools;
21use prometheus_http_query::response::Data::Vector;
22use risingwave_common::id::ObjectId;
23use risingwave_common::types::Timestamptz;
24use risingwave_common::util::StackTraceResponseExt;
25use risingwave_common::util::epoch::Epoch;
26use risingwave_hummock_sdk::HummockSstableId;
27use risingwave_hummock_sdk::level::Level;
28use risingwave_license::LicenseManager;
29use risingwave_meta_model::{JobStatus, StreamingParallelism};
30use risingwave_pb::catalog::table::PbTableType;
31use risingwave_pb::common::WorkerType;
32use risingwave_pb::meta::EventLog;
33use risingwave_pb::meta::event_log::Event;
34use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
35use risingwave_sqlparser::ast::RedactSqlOptionKeywordsRef;
36use risingwave_sqlparser::parser::Parser;
37use serde_json::json;
38use thiserror_ext::AsReport;
39
40use crate::MetaResult;
41use crate::hummock::HummockManagerRef;
42use crate::manager::MetadataManager;
43use crate::manager::event_log::EventLogManagerRef;
44use crate::rpc::await_tree::dump_cluster_await_tree;
45
46pub type DiagnoseCommandRef = Arc<DiagnoseCommand>;
47
48pub struct DiagnoseCommand {
49    metadata_manager: MetadataManager,
50    await_tree_reg: await_tree::Registry,
51    hummock_manger: HummockManagerRef,
52    event_log_manager: EventLogManagerRef,
53    prometheus_client: Option<prometheus_http_query::Client>,
54    prometheus_selector: String,
55    redact_sql_option_keywords: RedactSqlOptionKeywordsRef,
56}
57
58impl DiagnoseCommand {
59    pub fn new(
60        metadata_manager: MetadataManager,
61        await_tree_reg: await_tree::Registry,
62        hummock_manger: HummockManagerRef,
63        event_log_manager: EventLogManagerRef,
64        prometheus_client: Option<prometheus_http_query::Client>,
65        prometheus_selector: String,
66        redact_sql_option_keywords: RedactSqlOptionKeywordsRef,
67    ) -> Self {
68        Self {
69            metadata_manager,
70            await_tree_reg,
71            hummock_manger,
72            event_log_manager,
73            prometheus_client,
74            prometheus_selector,
75            redact_sql_option_keywords,
76        }
77    }
78
79    pub async fn report(&self, actor_traces_format: ActorTracesFormat) -> String {
80        let mut report = String::new();
81        let _ = writeln!(
82            report,
83            "report created at: {}\nversion: {}",
84            chrono::DateTime::<chrono::offset::Utc>::from(std::time::SystemTime::now()),
85            risingwave_common::current_cluster_version(),
86        );
87        let _ = writeln!(report);
88        self.write_license(&mut report);
89        let _ = writeln!(report);
90        self.write_catalog(&mut report).await;
91        let _ = writeln!(report);
92        self.write_worker_nodes(&mut report).await;
93        let _ = writeln!(report);
94        self.write_streaming_prometheus(&mut report).await;
95        let _ = writeln!(report);
96        self.write_storage(&mut report).await;
97        let _ = writeln!(report);
98        self.write_await_tree(&mut report, actor_traces_format)
99            .await;
100        let _ = writeln!(report);
101        self.write_event_logs(&mut report);
102        report
103    }
104
105    async fn write_catalog(&self, s: &mut String) {
106        self.write_catalog_inner(s).await;
107        let _ = self.write_table_definition(s).await.inspect_err(|e| {
108            tracing::warn!(
109                error = e.to_report_string(),
110                "failed to display table definition"
111            )
112        });
113    }
114
115    async fn write_catalog_inner(&self, s: &mut String) {
116        let stats = self.metadata_manager.catalog_controller.stats().await;
117
118        let stat = match stats {
119            Ok(stat) => stat,
120            Err(err) => {
121                tracing::warn!(error=?err.as_report(), "failed to get catalog stats");
122                return;
123            }
124        };
125        let _ = writeln!(s, "number of fragment: {}", stat.streaming_job_num);
126        let _ = writeln!(s, "number of actor: {}", stat.actor_num);
127        let _ = writeln!(s, "number of source: {}", stat.source_num);
128        let _ = writeln!(s, "number of table: {}", stat.table_num);
129        let _ = writeln!(s, "number of materialized view: {}", stat.mview_num);
130        let _ = writeln!(s, "number of sink: {}", stat.sink_num);
131        let _ = writeln!(s, "number of index: {}", stat.index_num);
132        let _ = writeln!(s, "number of function: {}", stat.function_num);
133    }
134
135    async fn write_worker_nodes(&self, s: &mut String) {
136        let Ok(worker_actor_count) = self.metadata_manager.worker_actor_count() else {
137            tracing::warn!("failed to get worker actor count");
138            return;
139        };
140
141        use comfy_table::{Row, Table};
142        let Ok(worker_nodes) = self.metadata_manager.list_worker_node(None, None).await else {
143            tracing::warn!("failed to get worker nodes");
144            return;
145        };
146        let mut table = Table::new();
147        table.set_header({
148            let mut row = Row::new();
149            row.add_cell("id".into());
150            row.add_cell("host".into());
151            row.add_cell("type".into());
152            row.add_cell("state".into());
153            row.add_cell("parallelism".into());
154            row.add_cell("is_streaming".into());
155            row.add_cell("is_serving".into());
156            row.add_cell("is_iceberg_compactor".into());
157            row.add_cell("rw_version".into());
158            row.add_cell("total_memory_bytes".into());
159            row.add_cell("total_cpu_cores".into());
160            row.add_cell("started_at".into());
161            row.add_cell("actor_count".into());
162            row
163        });
164        for worker_node in worker_nodes {
165            let mut row = Row::new();
166            row.add_cell(worker_node.id.into());
167            try_add_cell(
168                &mut row,
169                worker_node
170                    .host
171                    .as_ref()
172                    .map(|h| format!("{}:{}", h.host, h.port)),
173            );
174            try_add_cell(
175                &mut row,
176                worker_node.get_type().ok().map(|t| t.as_str_name()),
177            );
178            try_add_cell(
179                &mut row,
180                worker_node.get_state().ok().map(|s| s.as_str_name()),
181            );
182            try_add_cell(&mut row, worker_node.parallelism());
183            // is_streaming and is_serving are only meaningful for ComputeNode
184            let (is_streaming, is_serving) = {
185                if let Ok(t) = worker_node.get_type()
186                    && t == WorkerType::ComputeNode
187                {
188                    (
189                        worker_node.property.as_ref().map(|p| p.is_streaming),
190                        worker_node.property.as_ref().map(|p| p.is_serving),
191                    )
192                } else {
193                    (None, None)
194                }
195            };
196            try_add_cell(&mut row, is_streaming);
197            try_add_cell(&mut row, is_serving);
198            // is_iceberg_compactor is only meaningful for Compactor worker type
199            let is_iceberg_compactor = {
200                if let Ok(t) = worker_node.get_type()
201                    && t == WorkerType::Compactor
202                {
203                    worker_node
204                        .property
205                        .as_ref()
206                        .map(|p| p.is_iceberg_compactor)
207                } else {
208                    None
209                }
210            };
211            try_add_cell(&mut row, is_iceberg_compactor);
212            try_add_cell(
213                &mut row,
214                worker_node.resource.as_ref().map(|r| r.rw_version.clone()),
215            );
216            try_add_cell(
217                &mut row,
218                worker_node.resource.as_ref().map(|r| r.total_memory_bytes),
219            );
220            try_add_cell(
221                &mut row,
222                worker_node.resource.as_ref().map(|r| r.total_cpu_cores),
223            );
224            try_add_cell(
225                &mut row,
226                worker_node
227                    .started_at
228                    .and_then(|ts| Timestamptz::from_secs(ts as _).map(|t| t.to_string())),
229            );
230            let actor_count = {
231                if let Ok(t) = worker_node.get_type()
232                    && t != WorkerType::ComputeNode
233                {
234                    None
235                } else {
236                    match worker_actor_count.get(&worker_node.id) {
237                        None => Some(0),
238                        Some(c) => Some(*c),
239                    }
240                }
241            };
242            try_add_cell(&mut row, actor_count);
243            table.add_row(row);
244        }
245        let _ = writeln!(s, "{table}");
246    }
247
248    fn write_event_logs(&self, s: &mut String) {
249        let event_logs = self
250            .event_log_manager
251            .list_event_logs()
252            .into_iter()
253            .sorted_by(|a, b| {
254                a.timestamp
255                    .unwrap_or(0)
256                    .cmp(&b.timestamp.unwrap_or(0))
257                    .reverse()
258            })
259            .collect_vec();
260
261        let _ = writeln!(s, "latest barrier completions");
262        Self::write_event_logs_impl(
263            s,
264            event_logs.iter(),
265            |e| {
266                let Event::BarrierComplete(info) = e else {
267                    return None;
268                };
269                Some(json!(info).to_string())
270            },
271            Some(10),
272        );
273
274        let _ = writeln!(s);
275        let _ = writeln!(s, "latest barrier collection failures");
276        Self::write_event_logs_impl(
277            s,
278            event_logs.iter(),
279            |e| {
280                let Event::CollectBarrierFail(info) = e else {
281                    return None;
282                };
283                Some(json!(info).to_string())
284            },
285            Some(3),
286        );
287
288        let _ = writeln!(s);
289        let _ = writeln!(s, "latest barrier injection failures");
290        Self::write_event_logs_impl(
291            s,
292            event_logs.iter(),
293            |e| {
294                let Event::InjectBarrierFail(info) = e else {
295                    return None;
296                };
297                Some(json!(info).to_string())
298            },
299            Some(3),
300        );
301
302        let _ = writeln!(s);
303        let _ = writeln!(s, "latest worker node panics");
304        Self::write_event_logs_impl(
305            s,
306            event_logs.iter(),
307            |e| {
308                let Event::WorkerNodePanic(info) = e else {
309                    return None;
310                };
311                Some(json!(info).to_string())
312            },
313            Some(10),
314        );
315
316        let _ = writeln!(s);
317        let _ = writeln!(s, "latest create stream job failures");
318        Self::write_event_logs_impl(
319            s,
320            event_logs.iter(),
321            |e| {
322                let Event::CreateStreamJobFail(info) = e else {
323                    return None;
324                };
325                Some(json!(info).to_string())
326            },
327            Some(3),
328        );
329
330        let _ = writeln!(s);
331        let _ = writeln!(s, "latest dirty stream job clear-ups");
332        Self::write_event_logs_impl(
333            s,
334            event_logs.iter(),
335            |e| {
336                let Event::DirtyStreamJobClear(info) = e else {
337                    return None;
338                };
339                Some(json!(info).to_string())
340            },
341            Some(3),
342        );
343    }
344
345    fn write_event_logs_impl<'a, F>(
346        s: &mut String,
347        event_logs: impl Iterator<Item = &'a EventLog>,
348        get_event_info: F,
349        limit: Option<usize>,
350    ) where
351        F: Fn(&Event) -> Option<String>,
352    {
353        use comfy_table::{Row, Table};
354        let mut table = Table::new();
355        table.set_header({
356            let mut row = Row::new();
357            row.add_cell("created_at".into());
358            row.add_cell("info".into());
359            row
360        });
361        let mut row_count = 0;
362        for event_log in event_logs {
363            let Some(ref inner) = event_log.event else {
364                continue;
365            };
366            if let Some(limit) = limit
367                && row_count >= limit
368            {
369                break;
370            }
371            let mut row = Row::new();
372            let ts = event_log
373                .timestamp
374                .and_then(|ts| Timestamptz::from_millis(ts as _).map(|ts| ts.to_string()));
375            try_add_cell(&mut row, ts);
376            if let Some(info) = get_event_info(inner) {
377                row.add_cell(info.into());
378                row_count += 1;
379            } else {
380                continue;
381            }
382            table.add_row(row);
383        }
384        let _ = writeln!(s, "{table}");
385    }
386
387    async fn write_storage(&self, s: &mut String) {
388        let mut sst_num = 0;
389        let mut sst_total_file_size = 0;
390        let back_pressured_compaction_groups = self
391            .hummock_manger
392            .write_limits()
393            .await
394            .into_iter()
395            .filter_map(|(k, v)| {
396                if v.table_ids.is_empty() {
397                    None
398                } else {
399                    Some(k)
400                }
401            })
402            .join(",");
403        if !back_pressured_compaction_groups.is_empty() {
404            let _ = writeln!(
405                s,
406                "back pressured compaction groups: {back_pressured_compaction_groups}"
407            );
408        }
409
410        #[derive(PartialEq, Eq)]
411        struct SstableSort {
412            compaction_group_id: u64,
413            sst_id: HummockSstableId,
414            delete_ratio: u64,
415        }
416        impl PartialOrd for SstableSort {
417            fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
418                Some(self.cmp(other))
419            }
420        }
421        impl Ord for SstableSort {
422            fn cmp(&self, other: &Self) -> Ordering {
423                self.delete_ratio.cmp(&other.delete_ratio)
424            }
425        }
426        fn top_k_sstables(
427            top_k: usize,
428            heap: &mut BinaryHeap<Reverse<SstableSort>>,
429            e: SstableSort,
430        ) {
431            if heap.len() < top_k {
432                heap.push(Reverse(e));
433            } else if let Some(mut p) = heap.peek_mut()
434                && e.delete_ratio > p.0.delete_ratio
435            {
436                *p = Reverse(e);
437            }
438        }
439
440        let top_k = 10;
441        let mut top_tombstone_delete_sst = BinaryHeap::with_capacity(top_k);
442        let compaction_group_num = self
443            .hummock_manger
444            .on_current_version(|version| {
445                for compaction_group in version.levels.values() {
446                    let mut visit_level = |level: &Level| {
447                        sst_num += level.table_infos.len();
448                        sst_total_file_size +=
449                            level.table_infos.iter().map(|t| t.sst_size).sum::<u64>();
450                        for sst in &level.table_infos {
451                            if sst.total_key_count == 0 {
452                                continue;
453                            }
454                            let tombstone_delete_ratio =
455                                sst.stale_key_count * 10000 / sst.total_key_count;
456                            let e = SstableSort {
457                                compaction_group_id: compaction_group.group_id,
458                                sst_id: sst.sst_id,
459                                delete_ratio: tombstone_delete_ratio,
460                            };
461                            top_k_sstables(top_k, &mut top_tombstone_delete_sst, e);
462                        }
463                    };
464                    let l0 = &compaction_group.l0;
465                    // FIXME: why chaining levels iter leads to segmentation fault?
466                    for level in &l0.sub_levels {
467                        visit_level(level);
468                    }
469                    for level in &compaction_group.levels {
470                        visit_level(level);
471                    }
472                }
473                version.levels.len()
474            })
475            .await;
476
477        let _ = writeln!(s, "number of SSTables: {sst_num}");
478        let _ = writeln!(s, "total size of SSTables (byte): {sst_total_file_size}");
479        let _ = writeln!(s, "number of compaction groups: {compaction_group_num}");
480        use comfy_table::{Row, Table};
481        fn format_table(heap: BinaryHeap<Reverse<SstableSort>>) -> Table {
482            let mut table = Table::new();
483            table.set_header({
484                let mut row = Row::new();
485                row.add_cell("compaction group id".into());
486                row.add_cell("sstable id".into());
487                row.add_cell("delete ratio".into());
488                row
489            });
490            for sst in &heap.into_sorted_vec() {
491                let mut row = Row::new();
492                row.add_cell(sst.0.compaction_group_id.into());
493                row.add_cell(sst.0.sst_id.into());
494                row.add_cell(format!("{:.2}%", sst.0.delete_ratio as f64 / 100.0).into());
495                table.add_row(row);
496            }
497            table
498        }
499        let _ = writeln!(s);
500        let _ = writeln!(s, "top tombstone delete ratio");
501        let _ = writeln!(s, "{}", format_table(top_tombstone_delete_sst));
502        let _ = writeln!(s);
503
504        let _ = writeln!(s);
505        self.write_storage_prometheus(s).await;
506    }
507
508    async fn write_streaming_prometheus(&self, s: &mut String) {
509        let _ = writeln!(s, "top sources by throughput (rows/s)");
510        let query = format!(
511            "topk(3, sum(rate(stream_source_output_rows_counts{{{}}}[10m]))by (source_name))",
512            self.prometheus_selector
513        );
514        self.write_instant_vector_impl(s, &query, vec!["source_name"])
515            .await;
516
517        let _ = writeln!(s);
518        let _ = writeln!(s, "top materialized views by throughput (rows/s)");
519        let query = format!(
520            "topk(3, sum(rate(stream_mview_input_row_count{{{}}}[10m]))by (table_id))",
521            self.prometheus_selector
522        );
523        self.write_instant_vector_impl(s, &query, vec!["table_id"])
524            .await;
525
526        let _ = writeln!(s);
527        let _ = writeln!(s, "top join executor by matched rows");
528        let query = format!(
529            "topk(3, histogram_quantile(0.9, sum(rate(stream_join_matched_join_keys_bucket{{{}}}[10m])) by (le, fragment_id, table_id)))",
530            self.prometheus_selector
531        );
532        self.write_instant_vector_impl(s, &query, vec!["table_id", "fragment_id"])
533            .await;
534    }
535
536    async fn write_storage_prometheus(&self, s: &mut String) {
537        let _ = writeln!(s, "top Hummock Get by duration (second)");
538        let query = format!(
539            "topk(3, histogram_quantile(0.9, sum(rate(state_store_get_duration_bucket{{{}}}[10m])) by (le, table_id)))",
540            self.prometheus_selector
541        );
542        self.write_instant_vector_impl(s, &query, vec!["table_id"])
543            .await;
544
545        let _ = writeln!(s);
546        let _ = writeln!(s, "top Hummock Iter Init by duration (second)");
547        let query = format!(
548            "topk(3, histogram_quantile(0.9, sum(rate(state_store_iter_init_duration_bucket{{{}}}[10m])) by (le, table_id)))",
549            self.prometheus_selector
550        );
551        self.write_instant_vector_impl(s, &query, vec!["table_id"])
552            .await;
553
554        let _ = writeln!(s);
555        let _ = writeln!(s, "top table commit flush by size (byte)");
556        let query = format!(
557            "topk(3, sum(rate(storage_commit_write_throughput{{{}}}[10m])) by (table_id))",
558            self.prometheus_selector
559        );
560        self.write_instant_vector_impl(s, &query, vec!["table_id"])
561            .await;
562
563        let _ = writeln!(s);
564        let _ = writeln!(s, "object store read throughput (bytes/s)");
565        let query = format!(
566            "sum(rate(object_store_read_bytes{{{}}}[10m])) by (job, instance)",
567            merge_prometheus_selector([&self.prometheus_selector, "job=~\"compute|compactor\""])
568        );
569        self.write_instant_vector_impl(s, &query, vec!["job", "instance"])
570            .await;
571
572        let _ = writeln!(s);
573        let _ = writeln!(s, "object store write throughput (bytes/s)");
574        let query = format!(
575            "sum(rate(object_store_write_bytes{{{}}}[10m])) by (job, instance)",
576            merge_prometheus_selector([&self.prometheus_selector, "job=~\"compute|compactor\""])
577        );
578        self.write_instant_vector_impl(s, &query, vec!["job", "instance"])
579            .await;
580
581        let _ = writeln!(s);
582        let _ = writeln!(s, "object store operation rate");
583        let query = format!(
584            "sum(rate(object_store_operation_latency_count{{{}}}[10m])) by (le, type, job, instance)",
585            merge_prometheus_selector([
586                &self.prometheus_selector,
587                "job=~\"compute|compactor\", type!~\"streaming_upload_write_bytes|streaming_read_read_bytes|streaming_read\""
588            ])
589        );
590        self.write_instant_vector_impl(s, &query, vec!["type", "job", "instance"])
591            .await;
592
593        let _ = writeln!(s);
594        let _ = writeln!(s, "object store operation duration (second)");
595        let query = format!(
596            "histogram_quantile(0.9, sum(rate(object_store_operation_latency_bucket{{{}}}[10m])) by (le, type, job, instance))",
597            merge_prometheus_selector([
598                &self.prometheus_selector,
599                "job=~\"compute|compactor\", type!~\"streaming_upload_write_bytes|streaming_read\""
600            ])
601        );
602        self.write_instant_vector_impl(s, &query, vec!["type", "job", "instance"])
603            .await;
604    }
605
606    async fn write_instant_vector_impl(&self, s: &mut String, query: &str, labels: Vec<&str>) {
607        let Some(ref client) = self.prometheus_client else {
608            return;
609        };
610        if let Ok(Vector(instant_vec)) = client
611            .query(query)
612            .get()
613            .await
614            .map(|result| result.into_inner().0)
615        {
616            for i in instant_vec {
617                let l = labels
618                    .iter()
619                    .map(|label| {
620                        format!(
621                            "{}={}",
622                            *label,
623                            i.metric()
624                                .get(*label)
625                                .map(|s| s.as_str())
626                                .unwrap_or_default()
627                        )
628                    })
629                    .join(",");
630                let _ = writeln!(s, "{}: {:.3}", l, i.sample().value());
631            }
632        }
633    }
634
635    async fn write_await_tree(&self, s: &mut String, actor_traces_format: ActorTracesFormat) {
636        let all = dump_cluster_await_tree(
637            &self.metadata_manager,
638            &self.await_tree_reg,
639            actor_traces_format,
640        )
641        .await;
642
643        if let Ok(all) = all {
644            write!(s, "{}", all.output()).unwrap();
645        } else {
646            tracing::warn!("failed to dump await tree");
647        }
648    }
649
650    async fn write_table_definition(&self, s: &mut String) -> MetaResult<()> {
651        let sources = self
652            .metadata_manager
653            .catalog_controller
654            .list_sources()
655            .await?
656            .into_iter()
657            .map(|s| {
658                (
659                    s.id.into(),
660                    (s.name, s.schema_id, s.definition, s.created_at_epoch),
661                )
662            })
663            .collect::<BTreeMap<_, _>>();
664        let mut user_tables = BTreeMap::new();
665        let mut mvs = BTreeMap::new();
666        let mut indexes = BTreeMap::new();
667        let mut internal_tables = BTreeMap::new();
668        {
669            let grouped = self
670                .metadata_manager
671                .catalog_controller
672                .list_all_state_tables()
673                .await?
674                .into_iter()
675                .chunk_by(|t| t.table_type());
676            for (table_type, tables) in &grouped {
677                let tables = tables.into_iter().map(|t| {
678                    (
679                        t.id.into(),
680                        (t.name, t.schema_id, t.definition, t.created_at_epoch),
681                    )
682                });
683                match table_type {
684                    PbTableType::Table => user_tables.extend(tables),
685                    PbTableType::MaterializedView => mvs.extend(tables),
686                    PbTableType::Index | PbTableType::VectorIndex => indexes.extend(tables),
687                    PbTableType::Internal => internal_tables.extend(tables),
688                    PbTableType::Unspecified => {
689                        tracing::error!("unspecified table type: {:?}", tables.collect_vec());
690                    }
691                }
692            }
693        }
694        let sinks = self
695            .metadata_manager
696            .catalog_controller
697            .list_sinks()
698            .await?
699            .into_iter()
700            .map(|s| {
701                (
702                    s.id.into(),
703                    (s.name, s.schema_id, s.definition, s.created_at_epoch),
704                )
705            })
706            .collect::<BTreeMap<_, _>>();
707        let views = self
708            .metadata_manager
709            .catalog_controller
710            .list_views()
711            .await?
712            .into_iter()
713            .map(|v| (v.id.into(), (v.name, v.schema_id, v.sql, None)))
714            .collect::<BTreeMap<_, _>>();
715        let mut streaming_jobs = self
716            .metadata_manager
717            .catalog_controller
718            .list_streaming_job_infos()
719            .await?;
720        streaming_jobs.sort_by_key(|info| (info.obj_type as usize, info.job_id));
721        {
722            use comfy_table::{Row, Table};
723            let mut table = Table::new();
724            table.set_header({
725                let mut row = Row::new();
726                row.add_cell("job_id".into());
727                row.add_cell("name".into());
728                row.add_cell("obj_type".into());
729                row.add_cell("state".into());
730                row.add_cell("parallelism".into());
731                row.add_cell("max_parallelism".into());
732                row.add_cell("resource_group".into());
733                row.add_cell("database_id".into());
734                row.add_cell("schema_id".into());
735                row.add_cell("config_override".into());
736                row
737            });
738            for job in streaming_jobs {
739                let mut row = Row::new();
740                row.add_cell(job.job_id.into());
741                row.add_cell(job.name.into());
742                row.add_cell(job.obj_type.as_str().into());
743                row.add_cell(format_job_status(job.job_status).into());
744                row.add_cell(format_streaming_parallelism(&job.parallelism).into());
745                row.add_cell(job.max_parallelism.into());
746                row.add_cell(job.resource_group.into());
747                row.add_cell(job.database_id.into());
748                row.add_cell(job.schema_id.into());
749                row.add_cell(job.config_override.into());
750                table.add_row(row);
751            }
752            let _ = writeln!(s);
753            let _ = writeln!(s, "STREAMING JOB");
754            let _ = writeln!(s, "{table}");
755        }
756        let catalogs = [
757            ("SOURCE", sources),
758            ("TABLE", user_tables),
759            ("MATERIALIZED VIEW", mvs),
760            ("INDEX", indexes),
761            ("SINK", sinks),
762            ("VIEW", views),
763            ("INTERNAL TABLE", internal_tables),
764        ];
765        let mut obj_id_to_name: HashMap<ObjectId, _> = HashMap::new();
766        for (title, items) in catalogs {
767            use comfy_table::{Row, Table};
768            let mut table = Table::new();
769            table.set_header({
770                let mut row = Row::new();
771                row.add_cell("id".into());
772                row.add_cell("name".into());
773                row.add_cell("schema_id".into());
774                row.add_cell("created_at".into());
775                row.add_cell("definition".into());
776                row
777            });
778            for (id, (name, schema_id, definition, created_at_epoch)) in items {
779                obj_id_to_name.insert(id, name.clone());
780                let mut row = Row::new();
781                let may_redact = redact_sql(&definition, self.redact_sql_option_keywords.clone())
782                    .unwrap_or_else(|| "[REDACTED]".into());
783                let created_at = if let Some(created_at_epoch) = created_at_epoch {
784                    format!("{}", Epoch::from(created_at_epoch).as_timestamptz())
785                } else {
786                    "".into()
787                };
788                row.add_cell(id.into());
789                row.add_cell(name.into());
790                row.add_cell(schema_id.into());
791                row.add_cell(created_at.into());
792                row.add_cell(may_redact.into());
793                table.add_row(row);
794            }
795            let _ = writeln!(s);
796            let _ = writeln!(s, "{title}");
797            let _ = writeln!(s, "{table}");
798        }
799
800        let actors = self
801            .metadata_manager
802            .catalog_controller
803            .list_actor_info()
804            .await?
805            .into_iter()
806            .map(|(actor_id, fragment_id, job_id, schema_id, obj_type)| {
807                (
808                    actor_id,
809                    (
810                        fragment_id,
811                        job_id,
812                        schema_id,
813                        obj_type,
814                        obj_id_to_name.get(&job_id).cloned().unwrap_or_default(),
815                    ),
816                )
817            })
818            .collect::<BTreeMap<_, _>>();
819
820        use comfy_table::{Row, Table};
821        let mut table = Table::new();
822        table.set_header({
823            let mut row = Row::new();
824            row.add_cell("id".into());
825            row.add_cell("fragment_id".into());
826            row.add_cell("job_id".into());
827            row.add_cell("schema_id".into());
828            row.add_cell("type".into());
829            row.add_cell("name".into());
830            row
831        });
832        for (actor_id, (fragment_id, job_id, schema_id, ddl_type, name)) in actors {
833            let mut row = Row::new();
834            row.add_cell(actor_id.into());
835            row.add_cell(fragment_id.into());
836            row.add_cell(job_id.into());
837            row.add_cell(schema_id.into());
838            row.add_cell(ddl_type.as_str().into());
839            row.add_cell(name.into());
840            table.add_row(row);
841        }
842        let _ = writeln!(s);
843        let _ = writeln!(s, "ACTOR");
844        let _ = writeln!(s, "{table}");
845        Ok(())
846    }
847
848    fn write_license(&self, s: &mut String) {
849        use comfy_table::presets::ASCII_BORDERS_ONLY;
850        use comfy_table::{ContentArrangement, Row, Table};
851
852        let mut table = Table::new();
853        table.load_preset(ASCII_BORDERS_ONLY);
854        table.set_content_arrangement(ContentArrangement::Dynamic);
855        table.set_header({
856            let mut row = Row::new();
857            row.add_cell("field".into());
858            row.add_cell("value".into());
859            row
860        });
861
862        match LicenseManager::get().license() {
863            Ok(license) => {
864                let fmt_option = |value: Option<u64>| match value {
865                    Some(v) => v.to_string(),
866                    None => "unlimited".to_owned(),
867                };
868
869                let expires_at = if license.exp == u64::MAX {
870                    "never".to_owned()
871                } else {
872                    let exp_i64 = license.exp as i64;
873                    chrono::DateTime::<chrono::Utc>::from_timestamp(exp_i64, 0)
874                        .map(|ts| ts.to_rfc3339())
875                        .unwrap_or_else(|| format!("invalid ({})", license.exp))
876                };
877
878                let mut row = Row::new();
879                row.add_cell("status".into());
880                row.add_cell("valid".into());
881                table.add_row(row);
882
883                let mut row = Row::new();
884                row.add_cell("tier".into());
885                row.add_cell(license.tier.name().into());
886                table.add_row(row);
887
888                let mut row = Row::new();
889                row.add_cell("expires_at".into());
890                row.add_cell(expires_at.into());
891                table.add_row(row);
892
893                let mut row = Row::new();
894                row.add_cell("rwu_limit".into());
895                row.add_cell(fmt_option(license.rwu_limit.map(|v| v.get())).into());
896                table.add_row(row);
897
898                let mut row = Row::new();
899                row.add_cell("cpu_core_limit".into());
900                row.add_cell(fmt_option(license.cpu_core_limit()).into());
901                table.add_row(row);
902
903                let mut row = Row::new();
904                row.add_cell("memory_limit_bytes".into());
905                row.add_cell(fmt_option(license.memory_limit()).into());
906                table.add_row(row);
907
908                let mut features: Vec<_> = license
909                    .tier
910                    .available_features()
911                    .map(|f| f.name())
912                    .collect();
913                features.sort_unstable();
914                let feature_summary = format_features(&features);
915
916                let mut row = Row::new();
917                row.add_cell("available_features".into());
918                row.add_cell(feature_summary.into());
919                table.add_row(row);
920            }
921            Err(error) => {
922                let mut row = Row::new();
923                row.add_cell("status".into());
924                row.add_cell("invalid".into());
925                table.add_row(row);
926
927                let mut row = Row::new();
928                row.add_cell("error".into());
929                row.add_cell(error.to_report_string().into());
930                table.add_row(row);
931            }
932        }
933
934        let _ = writeln!(s, "LICENSE");
935        let _ = writeln!(s, "{table}");
936    }
937}
938
939fn try_add_cell<T: Into<comfy_table::Cell>>(row: &mut comfy_table::Row, t: Option<T>) {
940    match t {
941        Some(t) => {
942            row.add_cell(t.into());
943        }
944        None => {
945            row.add_cell("".into());
946        }
947    }
948}
949
950fn merge_prometheus_selector<'a>(selectors: impl IntoIterator<Item = &'a str>) -> String {
951    selectors.into_iter().filter(|s| !s.is_empty()).join(",")
952}
953
954fn redact_sql(sql: &str, keywords: RedactSqlOptionKeywordsRef) -> Option<String> {
955    match Parser::parse_sql(sql) {
956        Ok(sqls) => Some(
957            sqls.into_iter()
958                .map(|sql| sql.to_redacted_string(keywords.clone()))
959                .join(";"),
960        ),
961        Err(_) => None,
962    }
963}
964
965fn format_features(features: &[&'static str]) -> String {
966    if features.is_empty() {
967        return "(none)".into();
968    }
969
970    const PER_LINE: usize = 6;
971    features
972        .chunks(PER_LINE)
973        .map(|chunk| format!("  {}", chunk.join(", ")))
974        .collect::<Vec<_>>()
975        .join("\n")
976}
977
978fn format_job_status(status: JobStatus) -> &'static str {
979    match status {
980        JobStatus::Initial => "initial",
981        JobStatus::Creating => "creating",
982        JobStatus::Created => "created",
983    }
984}
985
986fn format_streaming_parallelism(parallelism: &StreamingParallelism) -> String {
987    match parallelism {
988        StreamingParallelism::Adaptive => "adaptive".into(),
989        StreamingParallelism::Fixed(n) => format!("fixed({n})"),
990        StreamingParallelism::Custom => "custom".into(),
991    }
992}