risingwave_meta/manager/
diagnose.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::{Ordering, Reverse};
16use std::collections::{BTreeMap, BinaryHeap, HashMap};
17use std::fmt::Write;
18use std::sync::Arc;
19
20use itertools::Itertools;
21use prometheus_http_query::response::Data::Vector;
22use risingwave_common::types::Timestamptz;
23use risingwave_common::util::StackTraceResponseExt;
24use risingwave_hummock_sdk::HummockSstableId;
25use risingwave_hummock_sdk::level::Level;
26use risingwave_meta_model::table::TableType;
27use risingwave_pb::common::WorkerType;
28use risingwave_pb::meta::EventLog;
29use risingwave_pb::meta::event_log::Event;
30use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
31use risingwave_pb::monitor_service::{StackTraceRequest, StackTraceResponse};
32use risingwave_rpc_client::ComputeClientPool;
33use risingwave_sqlparser::ast::{CompatibleFormatEncode, Statement, Value};
34use risingwave_sqlparser::parser::Parser;
35use serde_json::json;
36use thiserror_ext::AsReport;
37
38use crate::MetaResult;
39use crate::hummock::HummockManagerRef;
40use crate::manager::MetadataManager;
41use crate::manager::event_log::EventLogManagerRef;
42
43pub type DiagnoseCommandRef = Arc<DiagnoseCommand>;
44
45pub struct DiagnoseCommand {
46    metadata_manager: MetadataManager,
47    hummock_manger: HummockManagerRef,
48    event_log_manager: EventLogManagerRef,
49    prometheus_client: Option<prometheus_http_query::Client>,
50    prometheus_selector: String,
51}
52
53impl DiagnoseCommand {
54    pub fn new(
55        metadata_manager: MetadataManager,
56        hummock_manger: HummockManagerRef,
57        event_log_manager: EventLogManagerRef,
58        prometheus_client: Option<prometheus_http_query::Client>,
59        prometheus_selector: String,
60    ) -> Self {
61        Self {
62            metadata_manager,
63            hummock_manger,
64            event_log_manager,
65            prometheus_client,
66            prometheus_selector,
67        }
68    }
69
70    #[cfg_attr(coverage, coverage(off))]
71    pub async fn report(&self, actor_traces_format: ActorTracesFormat) -> String {
72        let mut report = String::new();
73        let _ = writeln!(
74            report,
75            "report created at: {}\nversion: {}",
76            chrono::DateTime::<chrono::offset::Utc>::from(std::time::SystemTime::now()),
77            risingwave_common::current_cluster_version(),
78        );
79        let _ = writeln!(report);
80        self.write_catalog(&mut report).await;
81        let _ = writeln!(report);
82        self.write_worker_nodes(&mut report).await;
83        let _ = writeln!(report);
84        self.write_streaming_prometheus(&mut report).await;
85        let _ = writeln!(report);
86        self.write_storage(&mut report).await;
87        let _ = writeln!(report);
88        self.write_await_tree(&mut report, actor_traces_format)
89            .await;
90        let _ = writeln!(report);
91        self.write_event_logs(&mut report);
92        report
93    }
94
95    #[cfg_attr(coverage, coverage(off))]
96    async fn write_catalog(&self, s: &mut String) {
97        self.write_catalog_inner(s).await;
98        let _ = self.write_table_definition(s).await.inspect_err(|e| {
99            tracing::warn!(
100                error = e.to_report_string(),
101                "failed to display table definition"
102            )
103        });
104    }
105
106    #[cfg_attr(coverage, coverage(off))]
107    async fn write_catalog_inner(&self, s: &mut String) {
108        let guard = self
109            .metadata_manager
110            .catalog_controller
111            .get_inner_read_guard()
112            .await;
113        let stat = match guard.stats().await {
114            Ok(stat) => stat,
115            Err(err) => {
116                tracing::warn!(error=?err.as_report(), "failed to get catalog stats");
117                return;
118            }
119        };
120        let _ = writeln!(s, "number of fragment: {}", stat.streaming_job_num);
121        let _ = writeln!(s, "number of actor: {}", stat.actor_num);
122        let _ = writeln!(s, "number of source: {}", stat.source_num);
123        let _ = writeln!(s, "number of table: {}", stat.table_num);
124        let _ = writeln!(s, "number of materialized view: {}", stat.mview_num);
125        let _ = writeln!(s, "number of sink: {}", stat.sink_num);
126        let _ = writeln!(s, "number of index: {}", stat.index_num);
127        let _ = writeln!(s, "number of function: {}", stat.function_num);
128    }
129
130    #[cfg_attr(coverage, coverage(off))]
131    async fn write_worker_nodes(&self, s: &mut String) {
132        let Ok(worker_actor_count) = self.metadata_manager.worker_actor_count().await else {
133            tracing::warn!("failed to get worker actor count");
134            return;
135        };
136
137        use comfy_table::{Row, Table};
138        let Ok(worker_nodes) = self.metadata_manager.list_worker_node(None, None).await else {
139            tracing::warn!("failed to get worker nodes");
140            return;
141        };
142        let mut table = Table::new();
143        table.set_header({
144            let mut row = Row::new();
145            row.add_cell("id".into());
146            row.add_cell("host".into());
147            row.add_cell("type".into());
148            row.add_cell("state".into());
149            row.add_cell("parallelism".into());
150            row.add_cell("is_streaming".into());
151            row.add_cell("is_serving".into());
152            row.add_cell("rw_version".into());
153            row.add_cell("total_memory_bytes".into());
154            row.add_cell("total_cpu_cores".into());
155            row.add_cell("started_at".into());
156            row.add_cell("actor_count".into());
157            row
158        });
159        for worker_node in worker_nodes {
160            let mut row = Row::new();
161            row.add_cell(worker_node.id.into());
162            try_add_cell(
163                &mut row,
164                worker_node
165                    .host
166                    .as_ref()
167                    .map(|h| format!("{}:{}", h.host, h.port)),
168            );
169            try_add_cell(
170                &mut row,
171                worker_node.get_type().ok().map(|t| t.as_str_name()),
172            );
173            try_add_cell(
174                &mut row,
175                worker_node.get_state().ok().map(|s| s.as_str_name()),
176            );
177            try_add_cell(&mut row, worker_node.parallelism());
178            try_add_cell(
179                &mut row,
180                worker_node.property.as_ref().map(|p| p.is_streaming),
181            );
182            try_add_cell(
183                &mut row,
184                worker_node.property.as_ref().map(|p| p.is_serving),
185            );
186            try_add_cell(
187                &mut row,
188                worker_node.resource.as_ref().map(|r| r.rw_version.clone()),
189            );
190            try_add_cell(
191                &mut row,
192                worker_node.resource.as_ref().map(|r| r.total_memory_bytes),
193            );
194            try_add_cell(
195                &mut row,
196                worker_node.resource.as_ref().map(|r| r.total_cpu_cores),
197            );
198            try_add_cell(
199                &mut row,
200                worker_node
201                    .started_at
202                    .and_then(|ts| Timestamptz::from_secs(ts as _).map(|t| t.to_string())),
203            );
204            let actor_count = {
205                if let Ok(t) = worker_node.get_type()
206                    && t != WorkerType::ComputeNode
207                {
208                    None
209                } else {
210                    match worker_actor_count.get(&(worker_node.id as _)) {
211                        None => Some(0),
212                        Some(c) => Some(*c),
213                    }
214                }
215            };
216            try_add_cell(&mut row, actor_count);
217            table.add_row(row);
218        }
219        let _ = writeln!(s, "{table}");
220    }
221
222    #[cfg_attr(coverage, coverage(off))]
223    fn write_event_logs(&self, s: &mut String) {
224        let event_logs = self
225            .event_log_manager
226            .list_event_logs()
227            .into_iter()
228            .sorted_by(|a, b| {
229                a.timestamp
230                    .unwrap_or(0)
231                    .cmp(&b.timestamp.unwrap_or(0))
232                    .reverse()
233            })
234            .collect_vec();
235
236        let _ = writeln!(s, "latest barrier completions");
237        Self::write_event_logs_impl(
238            s,
239            event_logs.iter(),
240            |e| {
241                let Event::BarrierComplete(info) = e else {
242                    return None;
243                };
244                Some(json!(info).to_string())
245            },
246            Some(10),
247        );
248
249        let _ = writeln!(s);
250        let _ = writeln!(s, "latest barrier collection failures");
251        Self::write_event_logs_impl(
252            s,
253            event_logs.iter(),
254            |e| {
255                let Event::CollectBarrierFail(info) = e else {
256                    return None;
257                };
258                Some(json!(info).to_string())
259            },
260            Some(3),
261        );
262
263        let _ = writeln!(s);
264        let _ = writeln!(s, "latest barrier injection failures");
265        Self::write_event_logs_impl(
266            s,
267            event_logs.iter(),
268            |e| {
269                let Event::InjectBarrierFail(info) = e else {
270                    return None;
271                };
272                Some(json!(info).to_string())
273            },
274            Some(3),
275        );
276
277        let _ = writeln!(s);
278        let _ = writeln!(s, "latest worker node panics");
279        Self::write_event_logs_impl(
280            s,
281            event_logs.iter(),
282            |e| {
283                let Event::WorkerNodePanic(info) = e else {
284                    return None;
285                };
286                Some(json!(info).to_string())
287            },
288            Some(10),
289        );
290
291        let _ = writeln!(s);
292        let _ = writeln!(s, "latest create stream job failures");
293        Self::write_event_logs_impl(
294            s,
295            event_logs.iter(),
296            |e| {
297                let Event::CreateStreamJobFail(info) = e else {
298                    return None;
299                };
300                Some(json!(info).to_string())
301            },
302            Some(3),
303        );
304
305        let _ = writeln!(s);
306        let _ = writeln!(s, "latest dirty stream job clear-ups");
307        Self::write_event_logs_impl(
308            s,
309            event_logs.iter(),
310            |e| {
311                let Event::DirtyStreamJobClear(info) = e else {
312                    return None;
313                };
314                Some(json!(info).to_string())
315            },
316            Some(3),
317        );
318    }
319
320    #[cfg_attr(coverage, coverage(off))]
321    fn write_event_logs_impl<'a, F>(
322        s: &mut String,
323        event_logs: impl Iterator<Item = &'a EventLog>,
324        get_event_info: F,
325        limit: Option<usize>,
326    ) where
327        F: Fn(&Event) -> Option<String>,
328    {
329        use comfy_table::{Row, Table};
330        let mut table = Table::new();
331        table.set_header({
332            let mut row = Row::new();
333            row.add_cell("created_at".into());
334            row.add_cell("info".into());
335            row
336        });
337        let mut row_count = 0;
338        for event_log in event_logs {
339            let Some(ref inner) = event_log.event else {
340                continue;
341            };
342            if let Some(limit) = limit
343                && row_count >= limit
344            {
345                break;
346            }
347            let mut row = Row::new();
348            let ts = event_log
349                .timestamp
350                .and_then(|ts| Timestamptz::from_millis(ts as _).map(|ts| ts.to_string()));
351            try_add_cell(&mut row, ts);
352            if let Some(info) = get_event_info(inner) {
353                row.add_cell(info.into());
354                row_count += 1;
355            } else {
356                continue;
357            }
358            table.add_row(row);
359        }
360        let _ = writeln!(s, "{table}");
361    }
362
363    #[cfg_attr(coverage, coverage(off))]
364    async fn write_storage(&self, s: &mut String) {
365        let mut sst_num = 0;
366        let mut sst_total_file_size = 0;
367        let back_pressured_compaction_groups = self
368            .hummock_manger
369            .write_limits()
370            .await
371            .into_iter()
372            .filter_map(|(k, v)| {
373                if v.table_ids.is_empty() {
374                    None
375                } else {
376                    Some(k)
377                }
378            })
379            .join(",");
380        if !back_pressured_compaction_groups.is_empty() {
381            let _ = writeln!(
382                s,
383                "back pressured compaction groups: {back_pressured_compaction_groups}"
384            );
385        }
386
387        #[derive(PartialEq, Eq)]
388        struct SstableSort {
389            compaction_group_id: u64,
390            sst_id: HummockSstableId,
391            delete_ratio: u64,
392        }
393        impl PartialOrd for SstableSort {
394            fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
395                Some(self.cmp(other))
396            }
397        }
398        impl Ord for SstableSort {
399            fn cmp(&self, other: &Self) -> Ordering {
400                self.delete_ratio.cmp(&other.delete_ratio)
401            }
402        }
403        fn top_k_sstables(
404            top_k: usize,
405            heap: &mut BinaryHeap<Reverse<SstableSort>>,
406            e: SstableSort,
407        ) {
408            if heap.len() < top_k {
409                heap.push(Reverse(e));
410            } else if let Some(mut p) = heap.peek_mut() {
411                if e.delete_ratio > p.0.delete_ratio {
412                    *p = Reverse(e);
413                }
414            }
415        }
416
417        let top_k = 10;
418        let mut top_tombstone_delete_sst = BinaryHeap::with_capacity(top_k);
419        let compaction_group_num = self
420            .hummock_manger
421            .on_current_version(|version| {
422                for compaction_group in version.levels.values() {
423                    let mut visit_level = |level: &Level| {
424                        sst_num += level.table_infos.len();
425                        sst_total_file_size +=
426                            level.table_infos.iter().map(|t| t.sst_size).sum::<u64>();
427                        for sst in &level.table_infos {
428                            if sst.total_key_count == 0 {
429                                continue;
430                            }
431                            let tombstone_delete_ratio =
432                                sst.stale_key_count * 10000 / sst.total_key_count;
433                            let e = SstableSort {
434                                compaction_group_id: compaction_group.group_id,
435                                sst_id: sst.sst_id,
436                                delete_ratio: tombstone_delete_ratio,
437                            };
438                            top_k_sstables(top_k, &mut top_tombstone_delete_sst, e);
439                        }
440                    };
441                    let l0 = &compaction_group.l0;
442                    // FIXME: why chaining levels iter leads to segmentation fault?
443                    for level in &l0.sub_levels {
444                        visit_level(level);
445                    }
446                    for level in &compaction_group.levels {
447                        visit_level(level);
448                    }
449                }
450                version.levels.len()
451            })
452            .await;
453
454        let _ = writeln!(s, "number of SSTables: {sst_num}");
455        let _ = writeln!(s, "total size of SSTables (byte): {sst_total_file_size}");
456        let _ = writeln!(s, "number of compaction groups: {compaction_group_num}");
457        use comfy_table::{Row, Table};
458        fn format_table(heap: BinaryHeap<Reverse<SstableSort>>) -> Table {
459            let mut table = Table::new();
460            table.set_header({
461                let mut row = Row::new();
462                row.add_cell("compaction group id".into());
463                row.add_cell("sstable id".into());
464                row.add_cell("delete ratio".into());
465                row
466            });
467            for sst in &heap.into_sorted_vec() {
468                let mut row = Row::new();
469                row.add_cell(sst.0.compaction_group_id.into());
470                row.add_cell(sst.0.sst_id.into());
471                row.add_cell(format!("{:.2}%", sst.0.delete_ratio as f64 / 100.0).into());
472                table.add_row(row);
473            }
474            table
475        }
476        let _ = writeln!(s);
477        let _ = writeln!(s, "top tombstone delete ratio");
478        let _ = writeln!(s, "{}", format_table(top_tombstone_delete_sst));
479        let _ = writeln!(s);
480
481        let _ = writeln!(s);
482        self.write_storage_prometheus(s).await;
483    }
484
485    #[cfg_attr(coverage, coverage(off))]
486    async fn write_streaming_prometheus(&self, s: &mut String) {
487        let _ = writeln!(s, "top sources by throughput (rows/s)");
488        let query = format!(
489            "topk(3, sum(rate(stream_source_output_rows_counts{{{}}}[10m]))by (source_name))",
490            self.prometheus_selector
491        );
492        self.write_instant_vector_impl(s, &query, vec!["source_name"])
493            .await;
494
495        let _ = writeln!(s);
496        let _ = writeln!(s, "top materialized views by throughput (rows/s)");
497        let query = format!(
498            "topk(3, sum(rate(stream_mview_input_row_count{{{}}}[10m]))by (table_id))",
499            self.prometheus_selector
500        );
501        self.write_instant_vector_impl(s, &query, vec!["table_id"])
502            .await;
503
504        let _ = writeln!(s);
505        let _ = writeln!(s, "top join executor by matched rows");
506        let query = format!(
507            "topk(3, histogram_quantile(0.9, sum(rate(stream_join_matched_join_keys_bucket{{{}}}[10m])) by (le, fragment_id, table_id)))",
508            self.prometheus_selector
509        );
510        self.write_instant_vector_impl(s, &query, vec!["table_id", "fragment_id"])
511            .await;
512    }
513
514    #[cfg_attr(coverage, coverage(off))]
515    async fn write_storage_prometheus(&self, s: &mut String) {
516        let _ = writeln!(s, "top Hummock Get by duration (second)");
517        let query = format!(
518            "topk(3, histogram_quantile(0.9, sum(rate(state_store_get_duration_bucket{{{}}}[10m])) by (le, table_id)))",
519            self.prometheus_selector
520        );
521        self.write_instant_vector_impl(s, &query, vec!["table_id"])
522            .await;
523
524        let _ = writeln!(s);
525        let _ = writeln!(s, "top Hummock Iter Init by duration (second)");
526        let query = format!(
527            "topk(3, histogram_quantile(0.9, sum(rate(state_store_iter_init_duration_bucket{{{}}}[10m])) by (le, table_id)))",
528            self.prometheus_selector
529        );
530        self.write_instant_vector_impl(s, &query, vec!["table_id"])
531            .await;
532
533        let _ = writeln!(s);
534        let _ = writeln!(s, "top table commit flush by size (byte)");
535        let query = format!(
536            "topk(3, sum(rate(storage_commit_write_throughput{{{}}}[10m])) by (table_id))",
537            self.prometheus_selector
538        );
539        self.write_instant_vector_impl(s, &query, vec!["table_id"])
540            .await;
541
542        let _ = writeln!(s);
543        let _ = writeln!(s, "object store read throughput (bytes/s)");
544        let query = format!(
545            "sum(rate(object_store_read_bytes{{{}}}[10m])) by (job, instance)",
546            merge_prometheus_selector([&self.prometheus_selector, "job=~\"compute|compactor\""])
547        );
548        self.write_instant_vector_impl(s, &query, vec!["job", "instance"])
549            .await;
550
551        let _ = writeln!(s);
552        let _ = writeln!(s, "object store write throughput (bytes/s)");
553        let query = format!(
554            "sum(rate(object_store_write_bytes{{{}}}[10m])) by (job, instance)",
555            merge_prometheus_selector([&self.prometheus_selector, "job=~\"compute|compactor\""])
556        );
557        self.write_instant_vector_impl(s, &query, vec!["job", "instance"])
558            .await;
559
560        let _ = writeln!(s);
561        let _ = writeln!(s, "object store operation rate");
562        let query = format!(
563            "sum(rate(object_store_operation_latency_count{{{}}}[10m])) by (le, type, job, instance)",
564            merge_prometheus_selector([
565                &self.prometheus_selector,
566                "job=~\"compute|compactor\", type!~\"streaming_upload_write_bytes|streaming_read_read_bytes|streaming_read\""
567            ])
568        );
569        self.write_instant_vector_impl(s, &query, vec!["type", "job", "instance"])
570            .await;
571
572        let _ = writeln!(s);
573        let _ = writeln!(s, "object store operation duration (second)");
574        let query = format!(
575            "histogram_quantile(0.9, sum(rate(object_store_operation_latency_bucket{{{}}}[10m])) by (le, type, job, instance))",
576            merge_prometheus_selector([
577                &self.prometheus_selector,
578                "job=~\"compute|compactor\", type!~\"streaming_upload_write_bytes|streaming_read\""
579            ])
580        );
581        self.write_instant_vector_impl(s, &query, vec!["type", "job", "instance"])
582            .await;
583    }
584
585    #[cfg_attr(coverage, coverage(off))]
586    async fn write_instant_vector_impl(&self, s: &mut String, query: &str, labels: Vec<&str>) {
587        let Some(ref client) = self.prometheus_client else {
588            return;
589        };
590        if let Ok(Vector(instant_vec)) = client
591            .query(query)
592            .get()
593            .await
594            .map(|result| result.into_inner().0)
595        {
596            for i in instant_vec {
597                let l = labels
598                    .iter()
599                    .map(|label| {
600                        format!(
601                            "{}={}",
602                            *label,
603                            i.metric()
604                                .get(*label)
605                                .map(|s| s.as_str())
606                                .unwrap_or_default()
607                        )
608                    })
609                    .join(",");
610                let _ = writeln!(s, "{}: {:.3}", l, i.sample().value());
611            }
612        }
613    }
614
615    #[cfg_attr(coverage, coverage(off))]
616    async fn write_await_tree(&self, s: &mut String, actor_traces_format: ActorTracesFormat) {
617        // Most lines of code are copied from dashboard::handlers::dump_await_tree_all, because the latter cannot be called directly from here.
618        let Ok(worker_nodes) = self
619            .metadata_manager
620            .list_worker_node(Some(WorkerType::ComputeNode), None)
621            .await
622        else {
623            tracing::warn!("failed to get worker nodes");
624            return;
625        };
626
627        let mut all = StackTraceResponse::default();
628
629        let compute_clients = ComputeClientPool::adhoc();
630        for worker_node in &worker_nodes {
631            if let Ok(client) = compute_clients.get(worker_node).await
632                && let Ok(result) = client
633                    .stack_trace(StackTraceRequest {
634                        actor_traces_format: actor_traces_format as i32,
635                    })
636                    .await
637            {
638                all.merge_other(result);
639            }
640        }
641
642        write!(s, "{}", all.output()).unwrap();
643    }
644
645    async fn write_table_definition(&self, s: &mut String) -> MetaResult<()> {
646        let sources = self
647            .metadata_manager
648            .catalog_controller
649            .list_sources()
650            .await?
651            .into_iter()
652            .map(|s| (s.id, (s.name, s.schema_id, s.definition)))
653            .collect::<BTreeMap<_, _>>();
654        let tables = self
655            .metadata_manager
656            .catalog_controller
657            .list_tables_by_type(TableType::Table)
658            .await?
659            .into_iter()
660            .map(|t| (t.id, (t.name, t.schema_id, t.definition)))
661            .collect::<BTreeMap<_, _>>();
662        let mvs = self
663            .metadata_manager
664            .catalog_controller
665            .list_tables_by_type(TableType::MaterializedView)
666            .await?
667            .into_iter()
668            .map(|t| (t.id, (t.name, t.schema_id, t.definition)))
669            .collect::<BTreeMap<_, _>>();
670        let indexes = self
671            .metadata_manager
672            .catalog_controller
673            .list_tables_by_type(TableType::Index)
674            .await?
675            .into_iter()
676            .map(|t| (t.id, (t.name, t.schema_id, t.definition)))
677            .collect::<BTreeMap<_, _>>();
678        let sinks = self
679            .metadata_manager
680            .catalog_controller
681            .list_sinks()
682            .await?
683            .into_iter()
684            .map(|s| (s.id, (s.name, s.schema_id, s.definition)))
685            .collect::<BTreeMap<_, _>>();
686        let catalogs = [
687            ("SOURCE", sources),
688            ("TABLE", tables),
689            ("MATERIALIZED VIEW", mvs),
690            ("INDEX", indexes),
691            ("SINK", sinks),
692        ];
693        let mut obj_id_to_name = HashMap::new();
694        for (title, items) in catalogs {
695            use comfy_table::{Row, Table};
696            let mut table = Table::new();
697            table.set_header({
698                let mut row = Row::new();
699                row.add_cell("id".into());
700                row.add_cell("name".into());
701                row.add_cell("schema_id".into());
702                row.add_cell("definition".into());
703                row
704            });
705            for (id, (name, schema_id, definition)) in items {
706                obj_id_to_name.insert(id, name.clone());
707                let mut row = Row::new();
708                let may_redact =
709                    redact_all_sql_options(&definition).unwrap_or_else(|| "[REDACTED]".into());
710                row.add_cell(id.into());
711                row.add_cell(name.into());
712                row.add_cell(schema_id.into());
713                row.add_cell(may_redact.into());
714                table.add_row(row);
715            }
716            let _ = writeln!(s);
717            let _ = writeln!(s, "{title}");
718            let _ = writeln!(s, "{table}");
719        }
720
721        let actors = self
722            .metadata_manager
723            .catalog_controller
724            .list_actor_info()
725            .await?
726            .into_iter()
727            .map(|(actor_id, fragment_id, job_id, schema_id, obj_type)| {
728                (
729                    actor_id,
730                    (
731                        fragment_id,
732                        job_id,
733                        schema_id,
734                        obj_type,
735                        obj_id_to_name
736                            .get(&(job_id as _))
737                            .cloned()
738                            .unwrap_or_default(),
739                    ),
740                )
741            })
742            .collect::<BTreeMap<_, _>>();
743
744        use comfy_table::{Row, Table};
745        let mut table = Table::new();
746        table.set_header({
747            let mut row = Row::new();
748            row.add_cell("id".into());
749            row.add_cell("fragment_id".into());
750            row.add_cell("job_id".into());
751            row.add_cell("schema_id".into());
752            row.add_cell("type".into());
753            row.add_cell("name".into());
754            row
755        });
756        for (actor_id, (fragment_id, job_id, schema_id, ddl_type, name)) in actors {
757            let mut row = Row::new();
758            row.add_cell(actor_id.into());
759            row.add_cell(fragment_id.into());
760            row.add_cell(job_id.into());
761            row.add_cell(schema_id.into());
762            row.add_cell(ddl_type.as_str().into());
763            row.add_cell(name.into());
764            table.add_row(row);
765        }
766        let _ = writeln!(s);
767        let _ = writeln!(s, "ACTOR");
768        let _ = writeln!(s, "{table}");
769        Ok(())
770    }
771}
772
773#[cfg_attr(coverage, coverage(off))]
774fn try_add_cell<T: Into<comfy_table::Cell>>(row: &mut comfy_table::Row, t: Option<T>) {
775    match t {
776        Some(t) => {
777            row.add_cell(t.into());
778        }
779        None => {
780            row.add_cell("".into());
781        }
782    }
783}
784
785#[cfg_attr(coverage, coverage(off))]
786fn merge_prometheus_selector<'a>(selectors: impl IntoIterator<Item = &'a str>) -> String {
787    selectors.into_iter().filter(|s| !s.is_empty()).join(",")
788}
789
790fn redact_all_sql_options(sql: &str) -> Option<String> {
791    let Ok(mut statements) = Parser::parse_sql(sql) else {
792        return None;
793    };
794    let mut redacted = String::new();
795    for statement in &mut statements {
796        let options = match statement {
797            Statement::CreateTable {
798                with_options,
799                format_encode,
800                ..
801            } => {
802                let format_encode = match format_encode {
803                    Some(CompatibleFormatEncode::V2(cs)) => Some(&mut cs.row_options),
804                    _ => None,
805                };
806                (Some(with_options), format_encode)
807            }
808            Statement::CreateSource { stmt } => {
809                let format_encode = match &mut stmt.format_encode {
810                    CompatibleFormatEncode::V2(cs) => Some(&mut cs.row_options),
811                    _ => None,
812                };
813                (Some(&mut stmt.with_properties.0), format_encode)
814            }
815            Statement::CreateSink { stmt } => {
816                let format_encode = match &mut stmt.sink_schema {
817                    Some(cs) => Some(&mut cs.row_options),
818                    _ => None,
819                };
820                (Some(&mut stmt.with_properties.0), format_encode)
821            }
822            _ => (None, None),
823        };
824        if let Some(options) = options.0 {
825            for option in options {
826                option.value = Value::SingleQuotedString("[REDACTED]".into()).into();
827            }
828        }
829        if let Some(options) = options.1 {
830            for option in options {
831                option.value = Value::SingleQuotedString("[REDACTED]".into()).into();
832            }
833        }
834        writeln!(&mut redacted, "{statement}").unwrap();
835    }
836    Some(redacted)
837}