risingwave_meta/manager/
diagnose.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::{Ordering, Reverse};
16use std::collections::{BTreeMap, BinaryHeap, HashMap};
17use std::fmt::Write;
18use std::sync::Arc;
19
20use itertools::Itertools;
21use prometheus_http_query::response::Data::Vector;
22use risingwave_common::id::ObjectId;
23use risingwave_common::types::Timestamptz;
24use risingwave_common::util::StackTraceResponseExt;
25use risingwave_common::util::epoch::Epoch;
26use risingwave_hummock_sdk::HummockSstableId;
27use risingwave_hummock_sdk::level::Level;
28use risingwave_pb::catalog::table::PbTableType;
29use risingwave_pb::common::WorkerType;
30use risingwave_pb::meta::EventLog;
31use risingwave_pb::meta::event_log::Event;
32use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
33use risingwave_sqlparser::ast::RedactSqlOptionKeywordsRef;
34use risingwave_sqlparser::parser::Parser;
35use serde_json::json;
36use thiserror_ext::AsReport;
37
38use crate::MetaResult;
39use crate::hummock::HummockManagerRef;
40use crate::manager::MetadataManager;
41use crate::manager::event_log::EventLogManagerRef;
42use crate::rpc::await_tree::dump_cluster_await_tree;
43
44pub type DiagnoseCommandRef = Arc<DiagnoseCommand>;
45
46pub struct DiagnoseCommand {
47    metadata_manager: MetadataManager,
48    await_tree_reg: await_tree::Registry,
49    hummock_manger: HummockManagerRef,
50    event_log_manager: EventLogManagerRef,
51    prometheus_client: Option<prometheus_http_query::Client>,
52    prometheus_selector: String,
53    redact_sql_option_keywords: RedactSqlOptionKeywordsRef,
54}
55
56impl DiagnoseCommand {
57    pub fn new(
58        metadata_manager: MetadataManager,
59        await_tree_reg: await_tree::Registry,
60        hummock_manger: HummockManagerRef,
61        event_log_manager: EventLogManagerRef,
62        prometheus_client: Option<prometheus_http_query::Client>,
63        prometheus_selector: String,
64        redact_sql_option_keywords: RedactSqlOptionKeywordsRef,
65    ) -> Self {
66        Self {
67            metadata_manager,
68            await_tree_reg,
69            hummock_manger,
70            event_log_manager,
71            prometheus_client,
72            prometheus_selector,
73            redact_sql_option_keywords,
74        }
75    }
76
77    pub async fn report(&self, actor_traces_format: ActorTracesFormat) -> String {
78        let mut report = String::new();
79        let _ = writeln!(
80            report,
81            "report created at: {}\nversion: {}",
82            chrono::DateTime::<chrono::offset::Utc>::from(std::time::SystemTime::now()),
83            risingwave_common::current_cluster_version(),
84        );
85        let _ = writeln!(report);
86        self.write_catalog(&mut report).await;
87        let _ = writeln!(report);
88        self.write_worker_nodes(&mut report).await;
89        let _ = writeln!(report);
90        self.write_streaming_prometheus(&mut report).await;
91        let _ = writeln!(report);
92        self.write_storage(&mut report).await;
93        let _ = writeln!(report);
94        self.write_await_tree(&mut report, actor_traces_format)
95            .await;
96        let _ = writeln!(report);
97        self.write_event_logs(&mut report);
98        report
99    }
100
101    async fn write_catalog(&self, s: &mut String) {
102        self.write_catalog_inner(s).await;
103        let _ = self.write_table_definition(s).await.inspect_err(|e| {
104            tracing::warn!(
105                error = e.to_report_string(),
106                "failed to display table definition"
107            )
108        });
109    }
110
111    async fn write_catalog_inner(&self, s: &mut String) {
112        let stats = self.metadata_manager.catalog_controller.stats().await;
113
114        let stat = match stats {
115            Ok(stat) => stat,
116            Err(err) => {
117                tracing::warn!(error=?err.as_report(), "failed to get catalog stats");
118                return;
119            }
120        };
121        let _ = writeln!(s, "number of fragment: {}", stat.streaming_job_num);
122        let _ = writeln!(s, "number of actor: {}", stat.actor_num);
123        let _ = writeln!(s, "number of source: {}", stat.source_num);
124        let _ = writeln!(s, "number of table: {}", stat.table_num);
125        let _ = writeln!(s, "number of materialized view: {}", stat.mview_num);
126        let _ = writeln!(s, "number of sink: {}", stat.sink_num);
127        let _ = writeln!(s, "number of index: {}", stat.index_num);
128        let _ = writeln!(s, "number of function: {}", stat.function_num);
129    }
130
131    async fn write_worker_nodes(&self, s: &mut String) {
132        let Ok(worker_actor_count) = self.metadata_manager.worker_actor_count() else {
133            tracing::warn!("failed to get worker actor count");
134            return;
135        };
136
137        use comfy_table::{Row, Table};
138        let Ok(worker_nodes) = self.metadata_manager.list_worker_node(None, None).await else {
139            tracing::warn!("failed to get worker nodes");
140            return;
141        };
142        let mut table = Table::new();
143        table.set_header({
144            let mut row = Row::new();
145            row.add_cell("id".into());
146            row.add_cell("host".into());
147            row.add_cell("type".into());
148            row.add_cell("state".into());
149            row.add_cell("parallelism".into());
150            row.add_cell("is_streaming".into());
151            row.add_cell("is_serving".into());
152            row.add_cell("rw_version".into());
153            row.add_cell("total_memory_bytes".into());
154            row.add_cell("total_cpu_cores".into());
155            row.add_cell("started_at".into());
156            row.add_cell("actor_count".into());
157            row
158        });
159        for worker_node in worker_nodes {
160            let mut row = Row::new();
161            row.add_cell(worker_node.id.into());
162            try_add_cell(
163                &mut row,
164                worker_node
165                    .host
166                    .as_ref()
167                    .map(|h| format!("{}:{}", h.host, h.port)),
168            );
169            try_add_cell(
170                &mut row,
171                worker_node.get_type().ok().map(|t| t.as_str_name()),
172            );
173            try_add_cell(
174                &mut row,
175                worker_node.get_state().ok().map(|s| s.as_str_name()),
176            );
177            try_add_cell(&mut row, worker_node.parallelism());
178            try_add_cell(
179                &mut row,
180                worker_node.property.as_ref().map(|p| p.is_streaming),
181            );
182            try_add_cell(
183                &mut row,
184                worker_node.property.as_ref().map(|p| p.is_serving),
185            );
186            try_add_cell(
187                &mut row,
188                worker_node.resource.as_ref().map(|r| r.rw_version.clone()),
189            );
190            try_add_cell(
191                &mut row,
192                worker_node.resource.as_ref().map(|r| r.total_memory_bytes),
193            );
194            try_add_cell(
195                &mut row,
196                worker_node.resource.as_ref().map(|r| r.total_cpu_cores),
197            );
198            try_add_cell(
199                &mut row,
200                worker_node
201                    .started_at
202                    .and_then(|ts| Timestamptz::from_secs(ts as _).map(|t| t.to_string())),
203            );
204            let actor_count = {
205                if let Ok(t) = worker_node.get_type()
206                    && t != WorkerType::ComputeNode
207                {
208                    None
209                } else {
210                    match worker_actor_count.get(&worker_node.id) {
211                        None => Some(0),
212                        Some(c) => Some(*c),
213                    }
214                }
215            };
216            try_add_cell(&mut row, actor_count);
217            table.add_row(row);
218        }
219        let _ = writeln!(s, "{table}");
220    }
221
222    fn write_event_logs(&self, s: &mut String) {
223        let event_logs = self
224            .event_log_manager
225            .list_event_logs()
226            .into_iter()
227            .sorted_by(|a, b| {
228                a.timestamp
229                    .unwrap_or(0)
230                    .cmp(&b.timestamp.unwrap_or(0))
231                    .reverse()
232            })
233            .collect_vec();
234
235        let _ = writeln!(s, "latest barrier completions");
236        Self::write_event_logs_impl(
237            s,
238            event_logs.iter(),
239            |e| {
240                let Event::BarrierComplete(info) = e else {
241                    return None;
242                };
243                Some(json!(info).to_string())
244            },
245            Some(10),
246        );
247
248        let _ = writeln!(s);
249        let _ = writeln!(s, "latest barrier collection failures");
250        Self::write_event_logs_impl(
251            s,
252            event_logs.iter(),
253            |e| {
254                let Event::CollectBarrierFail(info) = e else {
255                    return None;
256                };
257                Some(json!(info).to_string())
258            },
259            Some(3),
260        );
261
262        let _ = writeln!(s);
263        let _ = writeln!(s, "latest barrier injection failures");
264        Self::write_event_logs_impl(
265            s,
266            event_logs.iter(),
267            |e| {
268                let Event::InjectBarrierFail(info) = e else {
269                    return None;
270                };
271                Some(json!(info).to_string())
272            },
273            Some(3),
274        );
275
276        let _ = writeln!(s);
277        let _ = writeln!(s, "latest worker node panics");
278        Self::write_event_logs_impl(
279            s,
280            event_logs.iter(),
281            |e| {
282                let Event::WorkerNodePanic(info) = e else {
283                    return None;
284                };
285                Some(json!(info).to_string())
286            },
287            Some(10),
288        );
289
290        let _ = writeln!(s);
291        let _ = writeln!(s, "latest create stream job failures");
292        Self::write_event_logs_impl(
293            s,
294            event_logs.iter(),
295            |e| {
296                let Event::CreateStreamJobFail(info) = e else {
297                    return None;
298                };
299                Some(json!(info).to_string())
300            },
301            Some(3),
302        );
303
304        let _ = writeln!(s);
305        let _ = writeln!(s, "latest dirty stream job clear-ups");
306        Self::write_event_logs_impl(
307            s,
308            event_logs.iter(),
309            |e| {
310                let Event::DirtyStreamJobClear(info) = e else {
311                    return None;
312                };
313                Some(json!(info).to_string())
314            },
315            Some(3),
316        );
317    }
318
319    fn write_event_logs_impl<'a, F>(
320        s: &mut String,
321        event_logs: impl Iterator<Item = &'a EventLog>,
322        get_event_info: F,
323        limit: Option<usize>,
324    ) where
325        F: Fn(&Event) -> Option<String>,
326    {
327        use comfy_table::{Row, Table};
328        let mut table = Table::new();
329        table.set_header({
330            let mut row = Row::new();
331            row.add_cell("created_at".into());
332            row.add_cell("info".into());
333            row
334        });
335        let mut row_count = 0;
336        for event_log in event_logs {
337            let Some(ref inner) = event_log.event else {
338                continue;
339            };
340            if let Some(limit) = limit
341                && row_count >= limit
342            {
343                break;
344            }
345            let mut row = Row::new();
346            let ts = event_log
347                .timestamp
348                .and_then(|ts| Timestamptz::from_millis(ts as _).map(|ts| ts.to_string()));
349            try_add_cell(&mut row, ts);
350            if let Some(info) = get_event_info(inner) {
351                row.add_cell(info.into());
352                row_count += 1;
353            } else {
354                continue;
355            }
356            table.add_row(row);
357        }
358        let _ = writeln!(s, "{table}");
359    }
360
361    async fn write_storage(&self, s: &mut String) {
362        let mut sst_num = 0;
363        let mut sst_total_file_size = 0;
364        let back_pressured_compaction_groups = self
365            .hummock_manger
366            .write_limits()
367            .await
368            .into_iter()
369            .filter_map(|(k, v)| {
370                if v.table_ids.is_empty() {
371                    None
372                } else {
373                    Some(k)
374                }
375            })
376            .join(",");
377        if !back_pressured_compaction_groups.is_empty() {
378            let _ = writeln!(
379                s,
380                "back pressured compaction groups: {back_pressured_compaction_groups}"
381            );
382        }
383
384        #[derive(PartialEq, Eq)]
385        struct SstableSort {
386            compaction_group_id: u64,
387            sst_id: HummockSstableId,
388            delete_ratio: u64,
389        }
390        impl PartialOrd for SstableSort {
391            fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
392                Some(self.cmp(other))
393            }
394        }
395        impl Ord for SstableSort {
396            fn cmp(&self, other: &Self) -> Ordering {
397                self.delete_ratio.cmp(&other.delete_ratio)
398            }
399        }
400        fn top_k_sstables(
401            top_k: usize,
402            heap: &mut BinaryHeap<Reverse<SstableSort>>,
403            e: SstableSort,
404        ) {
405            if heap.len() < top_k {
406                heap.push(Reverse(e));
407            } else if let Some(mut p) = heap.peek_mut()
408                && e.delete_ratio > p.0.delete_ratio
409            {
410                *p = Reverse(e);
411            }
412        }
413
414        let top_k = 10;
415        let mut top_tombstone_delete_sst = BinaryHeap::with_capacity(top_k);
416        let compaction_group_num = self
417            .hummock_manger
418            .on_current_version(|version| {
419                for compaction_group in version.levels.values() {
420                    let mut visit_level = |level: &Level| {
421                        sst_num += level.table_infos.len();
422                        sst_total_file_size +=
423                            level.table_infos.iter().map(|t| t.sst_size).sum::<u64>();
424                        for sst in &level.table_infos {
425                            if sst.total_key_count == 0 {
426                                continue;
427                            }
428                            let tombstone_delete_ratio =
429                                sst.stale_key_count * 10000 / sst.total_key_count;
430                            let e = SstableSort {
431                                compaction_group_id: compaction_group.group_id,
432                                sst_id: sst.sst_id,
433                                delete_ratio: tombstone_delete_ratio,
434                            };
435                            top_k_sstables(top_k, &mut top_tombstone_delete_sst, e);
436                        }
437                    };
438                    let l0 = &compaction_group.l0;
439                    // FIXME: why chaining levels iter leads to segmentation fault?
440                    for level in &l0.sub_levels {
441                        visit_level(level);
442                    }
443                    for level in &compaction_group.levels {
444                        visit_level(level);
445                    }
446                }
447                version.levels.len()
448            })
449            .await;
450
451        let _ = writeln!(s, "number of SSTables: {sst_num}");
452        let _ = writeln!(s, "total size of SSTables (byte): {sst_total_file_size}");
453        let _ = writeln!(s, "number of compaction groups: {compaction_group_num}");
454        use comfy_table::{Row, Table};
455        fn format_table(heap: BinaryHeap<Reverse<SstableSort>>) -> Table {
456            let mut table = Table::new();
457            table.set_header({
458                let mut row = Row::new();
459                row.add_cell("compaction group id".into());
460                row.add_cell("sstable id".into());
461                row.add_cell("delete ratio".into());
462                row
463            });
464            for sst in &heap.into_sorted_vec() {
465                let mut row = Row::new();
466                row.add_cell(sst.0.compaction_group_id.into());
467                row.add_cell(sst.0.sst_id.into());
468                row.add_cell(format!("{:.2}%", sst.0.delete_ratio as f64 / 100.0).into());
469                table.add_row(row);
470            }
471            table
472        }
473        let _ = writeln!(s);
474        let _ = writeln!(s, "top tombstone delete ratio");
475        let _ = writeln!(s, "{}", format_table(top_tombstone_delete_sst));
476        let _ = writeln!(s);
477
478        let _ = writeln!(s);
479        self.write_storage_prometheus(s).await;
480    }
481
482    async fn write_streaming_prometheus(&self, s: &mut String) {
483        let _ = writeln!(s, "top sources by throughput (rows/s)");
484        let query = format!(
485            "topk(3, sum(rate(stream_source_output_rows_counts{{{}}}[10m]))by (source_name))",
486            self.prometheus_selector
487        );
488        self.write_instant_vector_impl(s, &query, vec!["source_name"])
489            .await;
490
491        let _ = writeln!(s);
492        let _ = writeln!(s, "top materialized views by throughput (rows/s)");
493        let query = format!(
494            "topk(3, sum(rate(stream_mview_input_row_count{{{}}}[10m]))by (table_id))",
495            self.prometheus_selector
496        );
497        self.write_instant_vector_impl(s, &query, vec!["table_id"])
498            .await;
499
500        let _ = writeln!(s);
501        let _ = writeln!(s, "top join executor by matched rows");
502        let query = format!(
503            "topk(3, histogram_quantile(0.9, sum(rate(stream_join_matched_join_keys_bucket{{{}}}[10m])) by (le, fragment_id, table_id)))",
504            self.prometheus_selector
505        );
506        self.write_instant_vector_impl(s, &query, vec!["table_id", "fragment_id"])
507            .await;
508    }
509
510    async fn write_storage_prometheus(&self, s: &mut String) {
511        let _ = writeln!(s, "top Hummock Get by duration (second)");
512        let query = format!(
513            "topk(3, histogram_quantile(0.9, sum(rate(state_store_get_duration_bucket{{{}}}[10m])) by (le, table_id)))",
514            self.prometheus_selector
515        );
516        self.write_instant_vector_impl(s, &query, vec!["table_id"])
517            .await;
518
519        let _ = writeln!(s);
520        let _ = writeln!(s, "top Hummock Iter Init by duration (second)");
521        let query = format!(
522            "topk(3, histogram_quantile(0.9, sum(rate(state_store_iter_init_duration_bucket{{{}}}[10m])) by (le, table_id)))",
523            self.prometheus_selector
524        );
525        self.write_instant_vector_impl(s, &query, vec!["table_id"])
526            .await;
527
528        let _ = writeln!(s);
529        let _ = writeln!(s, "top table commit flush by size (byte)");
530        let query = format!(
531            "topk(3, sum(rate(storage_commit_write_throughput{{{}}}[10m])) by (table_id))",
532            self.prometheus_selector
533        );
534        self.write_instant_vector_impl(s, &query, vec!["table_id"])
535            .await;
536
537        let _ = writeln!(s);
538        let _ = writeln!(s, "object store read throughput (bytes/s)");
539        let query = format!(
540            "sum(rate(object_store_read_bytes{{{}}}[10m])) by (job, instance)",
541            merge_prometheus_selector([&self.prometheus_selector, "job=~\"compute|compactor\""])
542        );
543        self.write_instant_vector_impl(s, &query, vec!["job", "instance"])
544            .await;
545
546        let _ = writeln!(s);
547        let _ = writeln!(s, "object store write throughput (bytes/s)");
548        let query = format!(
549            "sum(rate(object_store_write_bytes{{{}}}[10m])) by (job, instance)",
550            merge_prometheus_selector([&self.prometheus_selector, "job=~\"compute|compactor\""])
551        );
552        self.write_instant_vector_impl(s, &query, vec!["job", "instance"])
553            .await;
554
555        let _ = writeln!(s);
556        let _ = writeln!(s, "object store operation rate");
557        let query = format!(
558            "sum(rate(object_store_operation_latency_count{{{}}}[10m])) by (le, type, job, instance)",
559            merge_prometheus_selector([
560                &self.prometheus_selector,
561                "job=~\"compute|compactor\", type!~\"streaming_upload_write_bytes|streaming_read_read_bytes|streaming_read\""
562            ])
563        );
564        self.write_instant_vector_impl(s, &query, vec!["type", "job", "instance"])
565            .await;
566
567        let _ = writeln!(s);
568        let _ = writeln!(s, "object store operation duration (second)");
569        let query = format!(
570            "histogram_quantile(0.9, sum(rate(object_store_operation_latency_bucket{{{}}}[10m])) by (le, type, job, instance))",
571            merge_prometheus_selector([
572                &self.prometheus_selector,
573                "job=~\"compute|compactor\", type!~\"streaming_upload_write_bytes|streaming_read\""
574            ])
575        );
576        self.write_instant_vector_impl(s, &query, vec!["type", "job", "instance"])
577            .await;
578    }
579
580    async fn write_instant_vector_impl(&self, s: &mut String, query: &str, labels: Vec<&str>) {
581        let Some(ref client) = self.prometheus_client else {
582            return;
583        };
584        if let Ok(Vector(instant_vec)) = client
585            .query(query)
586            .get()
587            .await
588            .map(|result| result.into_inner().0)
589        {
590            for i in instant_vec {
591                let l = labels
592                    .iter()
593                    .map(|label| {
594                        format!(
595                            "{}={}",
596                            *label,
597                            i.metric()
598                                .get(*label)
599                                .map(|s| s.as_str())
600                                .unwrap_or_default()
601                        )
602                    })
603                    .join(",");
604                let _ = writeln!(s, "{}: {:.3}", l, i.sample().value());
605            }
606        }
607    }
608
609    async fn write_await_tree(&self, s: &mut String, actor_traces_format: ActorTracesFormat) {
610        let all = dump_cluster_await_tree(
611            &self.metadata_manager,
612            &self.await_tree_reg,
613            actor_traces_format,
614        )
615        .await;
616
617        if let Ok(all) = all {
618            write!(s, "{}", all.output()).unwrap();
619        } else {
620            tracing::warn!("failed to dump await tree");
621        }
622    }
623
624    async fn write_table_definition(&self, s: &mut String) -> MetaResult<()> {
625        let sources = self
626            .metadata_manager
627            .catalog_controller
628            .list_sources()
629            .await?
630            .into_iter()
631            .map(|s| {
632                (
633                    s.id.into(),
634                    (s.name, s.schema_id, s.definition, s.created_at_epoch),
635                )
636            })
637            .collect::<BTreeMap<_, _>>();
638        let mut user_tables = BTreeMap::new();
639        let mut mvs = BTreeMap::new();
640        let mut indexes = BTreeMap::new();
641        let mut internal_tables = BTreeMap::new();
642        {
643            let grouped = self
644                .metadata_manager
645                .catalog_controller
646                .list_all_state_tables()
647                .await?
648                .into_iter()
649                .chunk_by(|t| t.table_type());
650            for (table_type, tables) in &grouped {
651                let tables = tables.into_iter().map(|t| {
652                    (
653                        t.id.into(),
654                        (t.name, t.schema_id, t.definition, t.created_at_epoch),
655                    )
656                });
657                match table_type {
658                    PbTableType::Table => user_tables.extend(tables),
659                    PbTableType::MaterializedView => mvs.extend(tables),
660                    PbTableType::Index | PbTableType::VectorIndex => indexes.extend(tables),
661                    PbTableType::Internal => internal_tables.extend(tables),
662                    PbTableType::Unspecified => {
663                        tracing::error!("unspecified table type: {:?}", tables.collect_vec());
664                    }
665                }
666            }
667        }
668        let sinks = self
669            .metadata_manager
670            .catalog_controller
671            .list_sinks()
672            .await?
673            .into_iter()
674            .map(|s| {
675                (
676                    s.id.into(),
677                    (s.name, s.schema_id, s.definition, s.created_at_epoch),
678                )
679            })
680            .collect::<BTreeMap<_, _>>();
681        let catalogs = [
682            ("SOURCE", sources),
683            ("TABLE", user_tables),
684            ("MATERIALIZED VIEW", mvs),
685            ("INDEX", indexes),
686            ("SINK", sinks),
687            ("INTERNAL TABLE", internal_tables),
688        ];
689        let mut obj_id_to_name: HashMap<ObjectId, _> = HashMap::new();
690        for (title, items) in catalogs {
691            use comfy_table::{Row, Table};
692            let mut table = Table::new();
693            table.set_header({
694                let mut row = Row::new();
695                row.add_cell("id".into());
696                row.add_cell("name".into());
697                row.add_cell("schema_id".into());
698                row.add_cell("created_at".into());
699                row.add_cell("definition".into());
700                row
701            });
702            for (id, (name, schema_id, definition, created_at_epoch)) in items {
703                obj_id_to_name.insert(id, name.clone());
704                let mut row = Row::new();
705                let may_redact = redact_sql(&definition, self.redact_sql_option_keywords.clone())
706                    .unwrap_or_else(|| "[REDACTED]".into());
707                let created_at = if let Some(created_at_epoch) = created_at_epoch {
708                    format!("{}", Epoch::from(created_at_epoch).as_timestamptz())
709                } else {
710                    "".into()
711                };
712                row.add_cell(id.into());
713                row.add_cell(name.into());
714                row.add_cell(schema_id.into());
715                row.add_cell(created_at.into());
716                row.add_cell(may_redact.into());
717                table.add_row(row);
718            }
719            let _ = writeln!(s);
720            let _ = writeln!(s, "{title}");
721            let _ = writeln!(s, "{table}");
722        }
723
724        let actors = self
725            .metadata_manager
726            .catalog_controller
727            .list_actor_info()
728            .await?
729            .into_iter()
730            .map(|(actor_id, fragment_id, job_id, schema_id, obj_type)| {
731                (
732                    actor_id,
733                    (
734                        fragment_id,
735                        job_id,
736                        schema_id,
737                        obj_type,
738                        obj_id_to_name.get(&job_id).cloned().unwrap_or_default(),
739                    ),
740                )
741            })
742            .collect::<BTreeMap<_, _>>();
743
744        use comfy_table::{Row, Table};
745        let mut table = Table::new();
746        table.set_header({
747            let mut row = Row::new();
748            row.add_cell("id".into());
749            row.add_cell("fragment_id".into());
750            row.add_cell("job_id".into());
751            row.add_cell("schema_id".into());
752            row.add_cell("type".into());
753            row.add_cell("name".into());
754            row
755        });
756        for (actor_id, (fragment_id, job_id, schema_id, ddl_type, name)) in actors {
757            let mut row = Row::new();
758            row.add_cell(actor_id.into());
759            row.add_cell(fragment_id.into());
760            row.add_cell(job_id.into());
761            row.add_cell(schema_id.into());
762            row.add_cell(ddl_type.as_str().into());
763            row.add_cell(name.into());
764            table.add_row(row);
765        }
766        let _ = writeln!(s);
767        let _ = writeln!(s, "ACTOR");
768        let _ = writeln!(s, "{table}");
769        Ok(())
770    }
771}
772
773fn try_add_cell<T: Into<comfy_table::Cell>>(row: &mut comfy_table::Row, t: Option<T>) {
774    match t {
775        Some(t) => {
776            row.add_cell(t.into());
777        }
778        None => {
779            row.add_cell("".into());
780        }
781    }
782}
783
784fn merge_prometheus_selector<'a>(selectors: impl IntoIterator<Item = &'a str>) -> String {
785    selectors.into_iter().filter(|s| !s.is_empty()).join(",")
786}
787
788fn redact_sql(sql: &str, keywords: RedactSqlOptionKeywordsRef) -> Option<String> {
789    match Parser::parse_sql(sql) {
790        Ok(sqls) => Some(
791            sqls.into_iter()
792                .map(|sql| sql.to_redacted_string(keywords.clone()))
793                .join(";"),
794        ),
795        Err(_) => None,
796    }
797}