risingwave_meta/manager/
diagnose.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::{Ordering, Reverse};
16use std::collections::{BTreeMap, BinaryHeap, HashMap};
17use std::fmt::Write;
18use std::sync::Arc;
19
20use itertools::Itertools;
21use prometheus_http_query::response::Data::Vector;
22use risingwave_common::types::Timestamptz;
23use risingwave_common::util::StackTraceResponseExt;
24use risingwave_hummock_sdk::level::Level;
25use risingwave_meta_model::table::TableType;
26use risingwave_pb::common::WorkerType;
27use risingwave_pb::meta::EventLog;
28use risingwave_pb::meta::event_log::Event;
29use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
30use risingwave_pb::monitor_service::{StackTraceRequest, StackTraceResponse};
31use risingwave_rpc_client::ComputeClientPool;
32use risingwave_sqlparser::ast::{CompatibleFormatEncode, Statement, Value};
33use risingwave_sqlparser::parser::Parser;
34use serde_json::json;
35use thiserror_ext::AsReport;
36
37use crate::MetaResult;
38use crate::hummock::HummockManagerRef;
39use crate::manager::MetadataManager;
40use crate::manager::event_log::EventLogManagerRef;
41
42pub type DiagnoseCommandRef = Arc<DiagnoseCommand>;
43
44pub struct DiagnoseCommand {
45    metadata_manager: MetadataManager,
46    hummock_manger: HummockManagerRef,
47    event_log_manager: EventLogManagerRef,
48    prometheus_client: Option<prometheus_http_query::Client>,
49    prometheus_selector: String,
50}
51
52impl DiagnoseCommand {
53    pub fn new(
54        metadata_manager: MetadataManager,
55        hummock_manger: HummockManagerRef,
56        event_log_manager: EventLogManagerRef,
57        prometheus_client: Option<prometheus_http_query::Client>,
58        prometheus_selector: String,
59    ) -> Self {
60        Self {
61            metadata_manager,
62            hummock_manger,
63            event_log_manager,
64            prometheus_client,
65            prometheus_selector,
66        }
67    }
68
69    #[cfg_attr(coverage, coverage(off))]
70    pub async fn report(&self, actor_traces_format: ActorTracesFormat) -> String {
71        let mut report = String::new();
72        let _ = writeln!(
73            report,
74            "report created at: {}\nversion: {}",
75            chrono::DateTime::<chrono::offset::Utc>::from(std::time::SystemTime::now()),
76            risingwave_common::current_cluster_version(),
77        );
78        let _ = writeln!(report);
79        self.write_catalog(&mut report).await;
80        let _ = writeln!(report);
81        self.write_worker_nodes(&mut report).await;
82        let _ = writeln!(report);
83        self.write_streaming_prometheus(&mut report).await;
84        let _ = writeln!(report);
85        self.write_storage(&mut report).await;
86        let _ = writeln!(report);
87        self.write_await_tree(&mut report, actor_traces_format)
88            .await;
89        let _ = writeln!(report);
90        self.write_event_logs(&mut report);
91        report
92    }
93
94    #[cfg_attr(coverage, coverage(off))]
95    async fn write_catalog(&self, s: &mut String) {
96        self.write_catalog_inner(s).await;
97        let _ = self.write_table_definition(s).await.inspect_err(|e| {
98            tracing::warn!(
99                error = e.to_report_string(),
100                "failed to display table definition"
101            )
102        });
103    }
104
105    #[cfg_attr(coverage, coverage(off))]
106    async fn write_catalog_inner(&self, s: &mut String) {
107        let guard = self
108            .metadata_manager
109            .catalog_controller
110            .get_inner_read_guard()
111            .await;
112        let stat = match guard.stats().await {
113            Ok(stat) => stat,
114            Err(err) => {
115                tracing::warn!(error=?err.as_report(), "failed to get catalog stats");
116                return;
117            }
118        };
119        let _ = writeln!(s, "number of fragment: {}", stat.streaming_job_num);
120        let _ = writeln!(s, "number of actor: {}", stat.actor_num);
121        let _ = writeln!(s, "number of source: {}", stat.source_num);
122        let _ = writeln!(s, "number of table: {}", stat.table_num);
123        let _ = writeln!(s, "number of materialized view: {}", stat.mview_num);
124        let _ = writeln!(s, "number of sink: {}", stat.sink_num);
125        let _ = writeln!(s, "number of index: {}", stat.index_num);
126        let _ = writeln!(s, "number of function: {}", stat.function_num);
127    }
128
129    #[cfg_attr(coverage, coverage(off))]
130    async fn write_worker_nodes(&self, s: &mut String) {
131        let Ok(worker_actor_count) = self.metadata_manager.worker_actor_count().await else {
132            tracing::warn!("failed to get worker actor count");
133            return;
134        };
135
136        use comfy_table::{Row, Table};
137        let Ok(worker_nodes) = self.metadata_manager.list_worker_node(None, None).await else {
138            tracing::warn!("failed to get worker nodes");
139            return;
140        };
141        let mut table = Table::new();
142        table.set_header({
143            let mut row = Row::new();
144            row.add_cell("id".into());
145            row.add_cell("host".into());
146            row.add_cell("type".into());
147            row.add_cell("state".into());
148            row.add_cell("parallelism".into());
149            row.add_cell("is_streaming".into());
150            row.add_cell("is_serving".into());
151            row.add_cell("rw_version".into());
152            row.add_cell("total_memory_bytes".into());
153            row.add_cell("total_cpu_cores".into());
154            row.add_cell("started_at".into());
155            row.add_cell("actor_count".into());
156            row
157        });
158        for worker_node in worker_nodes {
159            let mut row = Row::new();
160            row.add_cell(worker_node.id.into());
161            try_add_cell(
162                &mut row,
163                worker_node
164                    .host
165                    .as_ref()
166                    .map(|h| format!("{}:{}", h.host, h.port)),
167            );
168            try_add_cell(
169                &mut row,
170                worker_node.get_type().ok().map(|t| t.as_str_name()),
171            );
172            try_add_cell(
173                &mut row,
174                worker_node.get_state().ok().map(|s| s.as_str_name()),
175            );
176            try_add_cell(&mut row, worker_node.parallelism());
177            try_add_cell(
178                &mut row,
179                worker_node.property.as_ref().map(|p| p.is_streaming),
180            );
181            try_add_cell(
182                &mut row,
183                worker_node.property.as_ref().map(|p| p.is_serving),
184            );
185            try_add_cell(
186                &mut row,
187                worker_node.resource.as_ref().map(|r| r.rw_version.clone()),
188            );
189            try_add_cell(
190                &mut row,
191                worker_node.resource.as_ref().map(|r| r.total_memory_bytes),
192            );
193            try_add_cell(
194                &mut row,
195                worker_node.resource.as_ref().map(|r| r.total_cpu_cores),
196            );
197            try_add_cell(
198                &mut row,
199                worker_node
200                    .started_at
201                    .and_then(|ts| Timestamptz::from_secs(ts as _).map(|t| t.to_string())),
202            );
203            let actor_count = {
204                if let Ok(t) = worker_node.get_type()
205                    && t != WorkerType::ComputeNode
206                {
207                    None
208                } else {
209                    match worker_actor_count.get(&(worker_node.id as _)) {
210                        None => Some(0),
211                        Some(c) => Some(*c),
212                    }
213                }
214            };
215            try_add_cell(&mut row, actor_count);
216            table.add_row(row);
217        }
218        let _ = writeln!(s, "{table}");
219    }
220
221    #[cfg_attr(coverage, coverage(off))]
222    fn write_event_logs(&self, s: &mut String) {
223        let event_logs = self
224            .event_log_manager
225            .list_event_logs()
226            .into_iter()
227            .sorted_by(|a, b| {
228                a.timestamp
229                    .unwrap_or(0)
230                    .cmp(&b.timestamp.unwrap_or(0))
231                    .reverse()
232            })
233            .collect_vec();
234
235        let _ = writeln!(s, "latest barrier completions");
236        Self::write_event_logs_impl(
237            s,
238            event_logs.iter(),
239            |e| {
240                let Event::BarrierComplete(info) = e else {
241                    return None;
242                };
243                Some(json!(info).to_string())
244            },
245            Some(10),
246        );
247
248        let _ = writeln!(s);
249        let _ = writeln!(s, "latest barrier collection failures");
250        Self::write_event_logs_impl(
251            s,
252            event_logs.iter(),
253            |e| {
254                let Event::CollectBarrierFail(info) = e else {
255                    return None;
256                };
257                Some(json!(info).to_string())
258            },
259            Some(3),
260        );
261
262        let _ = writeln!(s);
263        let _ = writeln!(s, "latest barrier injection failures");
264        Self::write_event_logs_impl(
265            s,
266            event_logs.iter(),
267            |e| {
268                let Event::InjectBarrierFail(info) = e else {
269                    return None;
270                };
271                Some(json!(info).to_string())
272            },
273            Some(3),
274        );
275
276        let _ = writeln!(s);
277        let _ = writeln!(s, "latest worker node panics");
278        Self::write_event_logs_impl(
279            s,
280            event_logs.iter(),
281            |e| {
282                let Event::WorkerNodePanic(info) = e else {
283                    return None;
284                };
285                Some(json!(info).to_string())
286            },
287            Some(10),
288        );
289
290        let _ = writeln!(s);
291        let _ = writeln!(s, "latest create stream job failures");
292        Self::write_event_logs_impl(
293            s,
294            event_logs.iter(),
295            |e| {
296                let Event::CreateStreamJobFail(info) = e else {
297                    return None;
298                };
299                Some(json!(info).to_string())
300            },
301            Some(3),
302        );
303
304        let _ = writeln!(s);
305        let _ = writeln!(s, "latest dirty stream job clear-ups");
306        Self::write_event_logs_impl(
307            s,
308            event_logs.iter(),
309            |e| {
310                let Event::DirtyStreamJobClear(info) = e else {
311                    return None;
312                };
313                Some(json!(info).to_string())
314            },
315            Some(3),
316        );
317    }
318
319    #[cfg_attr(coverage, coverage(off))]
320    fn write_event_logs_impl<'a, F>(
321        s: &mut String,
322        event_logs: impl Iterator<Item = &'a EventLog>,
323        get_event_info: F,
324        limit: Option<usize>,
325    ) where
326        F: Fn(&Event) -> Option<String>,
327    {
328        use comfy_table::{Row, Table};
329        let mut table = Table::new();
330        table.set_header({
331            let mut row = Row::new();
332            row.add_cell("created_at".into());
333            row.add_cell("info".into());
334            row
335        });
336        let mut row_count = 0;
337        for event_log in event_logs {
338            let Some(ref inner) = event_log.event else {
339                continue;
340            };
341            if let Some(limit) = limit
342                && row_count >= limit
343            {
344                break;
345            }
346            let mut row = Row::new();
347            let ts = event_log
348                .timestamp
349                .and_then(|ts| Timestamptz::from_millis(ts as _).map(|ts| ts.to_string()));
350            try_add_cell(&mut row, ts);
351            if let Some(info) = get_event_info(inner) {
352                row.add_cell(info.into());
353                row_count += 1;
354            } else {
355                continue;
356            }
357            table.add_row(row);
358        }
359        let _ = writeln!(s, "{table}");
360    }
361
362    #[cfg_attr(coverage, coverage(off))]
363    async fn write_storage(&self, s: &mut String) {
364        let mut sst_num = 0;
365        let mut sst_total_file_size = 0;
366        let back_pressured_compaction_groups = self
367            .hummock_manger
368            .write_limits()
369            .await
370            .into_iter()
371            .filter_map(|(k, v)| {
372                if v.table_ids.is_empty() {
373                    None
374                } else {
375                    Some(k)
376                }
377            })
378            .join(",");
379        if !back_pressured_compaction_groups.is_empty() {
380            let _ = writeln!(
381                s,
382                "back pressured compaction groups: {back_pressured_compaction_groups}"
383            );
384        }
385
386        #[derive(PartialEq, Eq)]
387        struct SstableSort {
388            compaction_group_id: u64,
389            sst_id: u64,
390            delete_ratio: u64,
391        }
392        impl PartialOrd for SstableSort {
393            fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
394                Some(self.cmp(other))
395            }
396        }
397        impl Ord for SstableSort {
398            fn cmp(&self, other: &Self) -> Ordering {
399                self.delete_ratio.cmp(&other.delete_ratio)
400            }
401        }
402        fn top_k_sstables(
403            top_k: usize,
404            heap: &mut BinaryHeap<Reverse<SstableSort>>,
405            e: SstableSort,
406        ) {
407            if heap.len() < top_k {
408                heap.push(Reverse(e));
409            } else if let Some(mut p) = heap.peek_mut() {
410                if e.delete_ratio > p.0.delete_ratio {
411                    *p = Reverse(e);
412                }
413            }
414        }
415
416        let top_k = 10;
417        let mut top_tombstone_delete_sst = BinaryHeap::with_capacity(top_k);
418        let compaction_group_num = self
419            .hummock_manger
420            .on_current_version(|version| {
421                for compaction_group in version.levels.values() {
422                    let mut visit_level = |level: &Level| {
423                        sst_num += level.table_infos.len();
424                        sst_total_file_size +=
425                            level.table_infos.iter().map(|t| t.sst_size).sum::<u64>();
426                        for sst in &level.table_infos {
427                            if sst.total_key_count == 0 {
428                                continue;
429                            }
430                            let tombstone_delete_ratio =
431                                sst.stale_key_count * 10000 / sst.total_key_count;
432                            let e = SstableSort {
433                                compaction_group_id: compaction_group.group_id,
434                                sst_id: sst.sst_id,
435                                delete_ratio: tombstone_delete_ratio,
436                            };
437                            top_k_sstables(top_k, &mut top_tombstone_delete_sst, e);
438                        }
439                    };
440                    let l0 = &compaction_group.l0;
441                    // FIXME: why chaining levels iter leads to segmentation fault?
442                    for level in &l0.sub_levels {
443                        visit_level(level);
444                    }
445                    for level in &compaction_group.levels {
446                        visit_level(level);
447                    }
448                }
449                version.levels.len()
450            })
451            .await;
452
453        let _ = writeln!(s, "number of SSTables: {sst_num}");
454        let _ = writeln!(s, "total size of SSTables (byte): {sst_total_file_size}");
455        let _ = writeln!(s, "number of compaction groups: {compaction_group_num}");
456        use comfy_table::{Row, Table};
457        fn format_table(heap: BinaryHeap<Reverse<SstableSort>>) -> Table {
458            let mut table = Table::new();
459            table.set_header({
460                let mut row = Row::new();
461                row.add_cell("compaction group id".into());
462                row.add_cell("sstable id".into());
463                row.add_cell("delete ratio".into());
464                row
465            });
466            for sst in &heap.into_sorted_vec() {
467                let mut row = Row::new();
468                row.add_cell(sst.0.compaction_group_id.into());
469                row.add_cell(sst.0.sst_id.into());
470                row.add_cell(format!("{:.2}%", sst.0.delete_ratio as f64 / 100.0).into());
471                table.add_row(row);
472            }
473            table
474        }
475        let _ = writeln!(s);
476        let _ = writeln!(s, "top tombstone delete ratio");
477        let _ = writeln!(s, "{}", format_table(top_tombstone_delete_sst));
478        let _ = writeln!(s);
479
480        let _ = writeln!(s);
481        self.write_storage_prometheus(s).await;
482    }
483
484    #[cfg_attr(coverage, coverage(off))]
485    async fn write_streaming_prometheus(&self, s: &mut String) {
486        let _ = writeln!(s, "top sources by throughput (rows/s)");
487        let query = format!(
488            "topk(3, sum(rate(stream_source_output_rows_counts{{{}}}[10m]))by (source_name))",
489            self.prometheus_selector
490        );
491        self.write_instant_vector_impl(s, &query, vec!["source_name"])
492            .await;
493
494        let _ = writeln!(s);
495        let _ = writeln!(s, "top materialized views by throughput (rows/s)");
496        let query = format!(
497            "topk(3, sum(rate(stream_mview_input_row_count{{{}}}[10m]))by (table_id))",
498            self.prometheus_selector
499        );
500        self.write_instant_vector_impl(s, &query, vec!["table_id"])
501            .await;
502
503        let _ = writeln!(s);
504        let _ = writeln!(s, "top join executor by matched rows");
505        let query = format!(
506            "topk(3, histogram_quantile(0.9, sum(rate(stream_join_matched_join_keys_bucket{{{}}}[10m])) by (le, fragment_id, table_id)))",
507            self.prometheus_selector
508        );
509        self.write_instant_vector_impl(s, &query, vec!["table_id", "fragment_id"])
510            .await;
511    }
512
513    #[cfg_attr(coverage, coverage(off))]
514    async fn write_storage_prometheus(&self, s: &mut String) {
515        let _ = writeln!(s, "top Hummock Get by duration (second)");
516        let query = format!(
517            "topk(3, histogram_quantile(0.9, sum(rate(state_store_get_duration_bucket{{{}}}[10m])) by (le, table_id)))",
518            self.prometheus_selector
519        );
520        self.write_instant_vector_impl(s, &query, vec!["table_id"])
521            .await;
522
523        let _ = writeln!(s);
524        let _ = writeln!(s, "top Hummock Iter Init by duration (second)");
525        let query = format!(
526            "topk(3, histogram_quantile(0.9, sum(rate(state_store_iter_init_duration_bucket{{{}}}[10m])) by (le, table_id)))",
527            self.prometheus_selector
528        );
529        self.write_instant_vector_impl(s, &query, vec!["table_id"])
530            .await;
531
532        let _ = writeln!(s);
533        let _ = writeln!(s, "top table commit flush by size (byte)");
534        let query = format!(
535            "topk(3, sum(rate(storage_commit_write_throughput{{{}}}[10m])) by (table_id))",
536            self.prometheus_selector
537        );
538        self.write_instant_vector_impl(s, &query, vec!["table_id"])
539            .await;
540
541        let _ = writeln!(s);
542        let _ = writeln!(s, "object store read throughput (bytes/s)");
543        let query = format!(
544            "sum(rate(object_store_read_bytes{{{}}}[10m])) by (job, instance)",
545            merge_prometheus_selector([&self.prometheus_selector, "job=~\"compute|compactor\""])
546        );
547        self.write_instant_vector_impl(s, &query, vec!["job", "instance"])
548            .await;
549
550        let _ = writeln!(s);
551        let _ = writeln!(s, "object store write throughput (bytes/s)");
552        let query = format!(
553            "sum(rate(object_store_write_bytes{{{}}}[10m])) by (job, instance)",
554            merge_prometheus_selector([&self.prometheus_selector, "job=~\"compute|compactor\""])
555        );
556        self.write_instant_vector_impl(s, &query, vec!["job", "instance"])
557            .await;
558
559        let _ = writeln!(s);
560        let _ = writeln!(s, "object store operation rate");
561        let query = format!(
562            "sum(rate(object_store_operation_latency_count{{{}}}[10m])) by (le, type, job, instance)",
563            merge_prometheus_selector([
564                &self.prometheus_selector,
565                "job=~\"compute|compactor\", type!~\"streaming_upload_write_bytes|streaming_read_read_bytes|streaming_read\""
566            ])
567        );
568        self.write_instant_vector_impl(s, &query, vec!["type", "job", "instance"])
569            .await;
570
571        let _ = writeln!(s);
572        let _ = writeln!(s, "object store operation duration (second)");
573        let query = format!(
574            "histogram_quantile(0.9, sum(rate(object_store_operation_latency_bucket{{{}}}[10m])) by (le, type, job, instance))",
575            merge_prometheus_selector([
576                &self.prometheus_selector,
577                "job=~\"compute|compactor\", type!~\"streaming_upload_write_bytes|streaming_read\""
578            ])
579        );
580        self.write_instant_vector_impl(s, &query, vec!["type", "job", "instance"])
581            .await;
582    }
583
584    #[cfg_attr(coverage, coverage(off))]
585    async fn write_instant_vector_impl(&self, s: &mut String, query: &str, labels: Vec<&str>) {
586        let Some(ref client) = self.prometheus_client else {
587            return;
588        };
589        if let Ok(Vector(instant_vec)) = client
590            .query(query)
591            .get()
592            .await
593            .map(|result| result.into_inner().0)
594        {
595            for i in instant_vec {
596                let l = labels
597                    .iter()
598                    .map(|label| {
599                        format!(
600                            "{}={}",
601                            *label,
602                            i.metric()
603                                .get(*label)
604                                .map(|s| s.as_str())
605                                .unwrap_or_default()
606                        )
607                    })
608                    .join(",");
609                let _ = writeln!(s, "{}: {:.3}", l, i.sample().value());
610            }
611        }
612    }
613
614    #[cfg_attr(coverage, coverage(off))]
615    async fn write_await_tree(&self, s: &mut String, actor_traces_format: ActorTracesFormat) {
616        // Most lines of code are copied from dashboard::handlers::dump_await_tree_all, because the latter cannot be called directly from here.
617        let Ok(worker_nodes) = self
618            .metadata_manager
619            .list_worker_node(Some(WorkerType::ComputeNode), None)
620            .await
621        else {
622            tracing::warn!("failed to get worker nodes");
623            return;
624        };
625
626        let mut all = StackTraceResponse::default();
627
628        let compute_clients = ComputeClientPool::adhoc();
629        for worker_node in &worker_nodes {
630            if let Ok(client) = compute_clients.get(worker_node).await
631                && let Ok(result) = client
632                    .stack_trace(StackTraceRequest {
633                        actor_traces_format: actor_traces_format as i32,
634                    })
635                    .await
636            {
637                all.merge_other(result);
638            }
639        }
640
641        write!(s, "{}", all.output()).unwrap();
642    }
643
644    async fn write_table_definition(&self, s: &mut String) -> MetaResult<()> {
645        let sources = self
646            .metadata_manager
647            .catalog_controller
648            .list_sources()
649            .await?
650            .into_iter()
651            .map(|s| (s.id, (s.name, s.schema_id, s.definition)))
652            .collect::<BTreeMap<_, _>>();
653        let tables = self
654            .metadata_manager
655            .catalog_controller
656            .list_tables_by_type(TableType::Table)
657            .await?
658            .into_iter()
659            .map(|t| (t.id, (t.name, t.schema_id, t.definition)))
660            .collect::<BTreeMap<_, _>>();
661        let mvs = self
662            .metadata_manager
663            .catalog_controller
664            .list_tables_by_type(TableType::MaterializedView)
665            .await?
666            .into_iter()
667            .map(|t| (t.id, (t.name, t.schema_id, t.definition)))
668            .collect::<BTreeMap<_, _>>();
669        let indexes = self
670            .metadata_manager
671            .catalog_controller
672            .list_tables_by_type(TableType::Index)
673            .await?
674            .into_iter()
675            .map(|t| (t.id, (t.name, t.schema_id, t.definition)))
676            .collect::<BTreeMap<_, _>>();
677        let sinks = self
678            .metadata_manager
679            .catalog_controller
680            .list_sinks()
681            .await?
682            .into_iter()
683            .map(|s| (s.id, (s.name, s.schema_id, s.definition)))
684            .collect::<BTreeMap<_, _>>();
685        let catalogs = [
686            ("SOURCE", sources),
687            ("TABLE", tables),
688            ("MATERIALIZED VIEW", mvs),
689            ("INDEX", indexes),
690            ("SINK", sinks),
691        ];
692        let mut obj_id_to_name = HashMap::new();
693        for (title, items) in catalogs {
694            use comfy_table::{Row, Table};
695            let mut table = Table::new();
696            table.set_header({
697                let mut row = Row::new();
698                row.add_cell("id".into());
699                row.add_cell("name".into());
700                row.add_cell("schema_id".into());
701                row.add_cell("definition".into());
702                row
703            });
704            for (id, (name, schema_id, definition)) in items {
705                obj_id_to_name.insert(id, name.clone());
706                let mut row = Row::new();
707                let may_redact =
708                    redact_all_sql_options(&definition).unwrap_or_else(|| "[REDACTED]".into());
709                row.add_cell(id.into());
710                row.add_cell(name.into());
711                row.add_cell(schema_id.into());
712                row.add_cell(may_redact.into());
713                table.add_row(row);
714            }
715            let _ = writeln!(s);
716            let _ = writeln!(s, "{title}");
717            let _ = writeln!(s, "{table}");
718        }
719
720        let actors = self
721            .metadata_manager
722            .catalog_controller
723            .list_actor_info()
724            .await?
725            .into_iter()
726            .map(|(actor_id, fragment_id, job_id, schema_id, obj_type)| {
727                (
728                    actor_id,
729                    (
730                        fragment_id,
731                        job_id,
732                        schema_id,
733                        obj_type,
734                        obj_id_to_name
735                            .get(&(job_id as _))
736                            .cloned()
737                            .unwrap_or_default(),
738                    ),
739                )
740            })
741            .collect::<BTreeMap<_, _>>();
742
743        use comfy_table::{Row, Table};
744        let mut table = Table::new();
745        table.set_header({
746            let mut row = Row::new();
747            row.add_cell("id".into());
748            row.add_cell("fragment_id".into());
749            row.add_cell("job_id".into());
750            row.add_cell("schema_id".into());
751            row.add_cell("type".into());
752            row.add_cell("name".into());
753            row
754        });
755        for (actor_id, (fragment_id, job_id, schema_id, ddl_type, name)) in actors {
756            let mut row = Row::new();
757            row.add_cell(actor_id.into());
758            row.add_cell(fragment_id.into());
759            row.add_cell(job_id.into());
760            row.add_cell(schema_id.into());
761            row.add_cell(ddl_type.as_str().into());
762            row.add_cell(name.into());
763            table.add_row(row);
764        }
765        let _ = writeln!(s);
766        let _ = writeln!(s, "ACTOR");
767        let _ = writeln!(s, "{table}");
768        Ok(())
769    }
770}
771
772#[cfg_attr(coverage, coverage(off))]
773fn try_add_cell<T: Into<comfy_table::Cell>>(row: &mut comfy_table::Row, t: Option<T>) {
774    match t {
775        Some(t) => {
776            row.add_cell(t.into());
777        }
778        None => {
779            row.add_cell("".into());
780        }
781    }
782}
783
784#[cfg_attr(coverage, coverage(off))]
785fn merge_prometheus_selector<'a>(selectors: impl IntoIterator<Item = &'a str>) -> String {
786    selectors.into_iter().filter(|s| !s.is_empty()).join(",")
787}
788
789fn redact_all_sql_options(sql: &str) -> Option<String> {
790    let Ok(mut statements) = Parser::parse_sql(sql) else {
791        return None;
792    };
793    let mut redacted = String::new();
794    for statement in &mut statements {
795        let options = match statement {
796            Statement::CreateTable {
797                with_options,
798                format_encode,
799                ..
800            } => {
801                let format_encode = match format_encode {
802                    Some(CompatibleFormatEncode::V2(cs)) => Some(&mut cs.row_options),
803                    _ => None,
804                };
805                (Some(with_options), format_encode)
806            }
807            Statement::CreateSource { stmt } => {
808                let format_encode = match &mut stmt.format_encode {
809                    CompatibleFormatEncode::V2(cs) => Some(&mut cs.row_options),
810                    _ => None,
811                };
812                (Some(&mut stmt.with_properties.0), format_encode)
813            }
814            Statement::CreateSink { stmt } => {
815                let format_encode = match &mut stmt.sink_schema {
816                    Some(cs) => Some(&mut cs.row_options),
817                    _ => None,
818                };
819                (Some(&mut stmt.with_properties.0), format_encode)
820            }
821            _ => (None, None),
822        };
823        if let Some(options) = options.0 {
824            for option in options {
825                option.value = Value::SingleQuotedString("[REDACTED]".into()).into();
826            }
827        }
828        if let Some(options) = options.1 {
829            for option in options {
830                option.value = Value::SingleQuotedString("[REDACTED]".into()).into();
831            }
832        }
833        writeln!(&mut redacted, "{statement}").unwrap();
834    }
835    Some(redacted)
836}