risingwave_meta/manager/
diagnose.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::{Ordering, Reverse};
16use std::collections::{BTreeMap, BinaryHeap, HashMap};
17use std::fmt::Write;
18use std::sync::Arc;
19
20use itertools::Itertools;
21use prometheus_http_query::response::Data::Vector;
22use risingwave_common::types::Timestamptz;
23use risingwave_common::util::StackTraceResponseExt;
24use risingwave_common::util::epoch::Epoch;
25use risingwave_hummock_sdk::HummockSstableId;
26use risingwave_hummock_sdk::level::Level;
27use risingwave_pb::catalog::table::PbTableType;
28use risingwave_pb::common::WorkerType;
29use risingwave_pb::meta::EventLog;
30use risingwave_pb::meta::event_log::Event;
31use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
32use risingwave_sqlparser::ast::RedactSqlOptionKeywordsRef;
33use risingwave_sqlparser::parser::Parser;
34use serde_json::json;
35use thiserror_ext::AsReport;
36
37use crate::MetaResult;
38use crate::hummock::HummockManagerRef;
39use crate::manager::MetadataManager;
40use crate::manager::event_log::EventLogManagerRef;
41use crate::rpc::await_tree::dump_cluster_await_tree;
42
43pub type DiagnoseCommandRef = Arc<DiagnoseCommand>;
44
45pub struct DiagnoseCommand {
46    metadata_manager: MetadataManager,
47    await_tree_reg: await_tree::Registry,
48    hummock_manger: HummockManagerRef,
49    event_log_manager: EventLogManagerRef,
50    prometheus_client: Option<prometheus_http_query::Client>,
51    prometheus_selector: String,
52    redact_sql_option_keywords: RedactSqlOptionKeywordsRef,
53}
54
55impl DiagnoseCommand {
56    pub fn new(
57        metadata_manager: MetadataManager,
58        await_tree_reg: await_tree::Registry,
59        hummock_manger: HummockManagerRef,
60        event_log_manager: EventLogManagerRef,
61        prometheus_client: Option<prometheus_http_query::Client>,
62        prometheus_selector: String,
63        redact_sql_option_keywords: RedactSqlOptionKeywordsRef,
64    ) -> Self {
65        Self {
66            metadata_manager,
67            await_tree_reg,
68            hummock_manger,
69            event_log_manager,
70            prometheus_client,
71            prometheus_selector,
72            redact_sql_option_keywords,
73        }
74    }
75
76    pub async fn report(&self, actor_traces_format: ActorTracesFormat) -> String {
77        let mut report = String::new();
78        let _ = writeln!(
79            report,
80            "report created at: {}\nversion: {}",
81            chrono::DateTime::<chrono::offset::Utc>::from(std::time::SystemTime::now()),
82            risingwave_common::current_cluster_version(),
83        );
84        let _ = writeln!(report);
85        self.write_catalog(&mut report).await;
86        let _ = writeln!(report);
87        self.write_worker_nodes(&mut report).await;
88        let _ = writeln!(report);
89        self.write_streaming_prometheus(&mut report).await;
90        let _ = writeln!(report);
91        self.write_storage(&mut report).await;
92        let _ = writeln!(report);
93        self.write_await_tree(&mut report, actor_traces_format)
94            .await;
95        let _ = writeln!(report);
96        self.write_event_logs(&mut report);
97        report
98    }
99
100    async fn write_catalog(&self, s: &mut String) {
101        self.write_catalog_inner(s).await;
102        let _ = self.write_table_definition(s).await.inspect_err(|e| {
103            tracing::warn!(
104                error = e.to_report_string(),
105                "failed to display table definition"
106            )
107        });
108    }
109
110    async fn write_catalog_inner(&self, s: &mut String) {
111        let guard = self
112            .metadata_manager
113            .catalog_controller
114            .get_inner_read_guard()
115            .await;
116        let stat = match guard.stats().await {
117            Ok(stat) => stat,
118            Err(err) => {
119                tracing::warn!(error=?err.as_report(), "failed to get catalog stats");
120                return;
121            }
122        };
123        let _ = writeln!(s, "number of fragment: {}", stat.streaming_job_num);
124        let _ = writeln!(s, "number of actor: {}", stat.actor_num);
125        let _ = writeln!(s, "number of source: {}", stat.source_num);
126        let _ = writeln!(s, "number of table: {}", stat.table_num);
127        let _ = writeln!(s, "number of materialized view: {}", stat.mview_num);
128        let _ = writeln!(s, "number of sink: {}", stat.sink_num);
129        let _ = writeln!(s, "number of index: {}", stat.index_num);
130        let _ = writeln!(s, "number of function: {}", stat.function_num);
131    }
132
133    async fn write_worker_nodes(&self, s: &mut String) {
134        let Ok(worker_actor_count) = self.metadata_manager.worker_actor_count().await else {
135            tracing::warn!("failed to get worker actor count");
136            return;
137        };
138
139        use comfy_table::{Row, Table};
140        let Ok(worker_nodes) = self.metadata_manager.list_worker_node(None, None).await else {
141            tracing::warn!("failed to get worker nodes");
142            return;
143        };
144        let mut table = Table::new();
145        table.set_header({
146            let mut row = Row::new();
147            row.add_cell("id".into());
148            row.add_cell("host".into());
149            row.add_cell("type".into());
150            row.add_cell("state".into());
151            row.add_cell("parallelism".into());
152            row.add_cell("is_streaming".into());
153            row.add_cell("is_serving".into());
154            row.add_cell("rw_version".into());
155            row.add_cell("total_memory_bytes".into());
156            row.add_cell("total_cpu_cores".into());
157            row.add_cell("started_at".into());
158            row.add_cell("actor_count".into());
159            row
160        });
161        for worker_node in worker_nodes {
162            let mut row = Row::new();
163            row.add_cell(worker_node.id.into());
164            try_add_cell(
165                &mut row,
166                worker_node
167                    .host
168                    .as_ref()
169                    .map(|h| format!("{}:{}", h.host, h.port)),
170            );
171            try_add_cell(
172                &mut row,
173                worker_node.get_type().ok().map(|t| t.as_str_name()),
174            );
175            try_add_cell(
176                &mut row,
177                worker_node.get_state().ok().map(|s| s.as_str_name()),
178            );
179            try_add_cell(&mut row, worker_node.parallelism());
180            try_add_cell(
181                &mut row,
182                worker_node.property.as_ref().map(|p| p.is_streaming),
183            );
184            try_add_cell(
185                &mut row,
186                worker_node.property.as_ref().map(|p| p.is_serving),
187            );
188            try_add_cell(
189                &mut row,
190                worker_node.resource.as_ref().map(|r| r.rw_version.clone()),
191            );
192            try_add_cell(
193                &mut row,
194                worker_node.resource.as_ref().map(|r| r.total_memory_bytes),
195            );
196            try_add_cell(
197                &mut row,
198                worker_node.resource.as_ref().map(|r| r.total_cpu_cores),
199            );
200            try_add_cell(
201                &mut row,
202                worker_node
203                    .started_at
204                    .and_then(|ts| Timestamptz::from_secs(ts as _).map(|t| t.to_string())),
205            );
206            let actor_count = {
207                if let Ok(t) = worker_node.get_type()
208                    && t != WorkerType::ComputeNode
209                {
210                    None
211                } else {
212                    match worker_actor_count.get(&(worker_node.id as _)) {
213                        None => Some(0),
214                        Some(c) => Some(*c),
215                    }
216                }
217            };
218            try_add_cell(&mut row, actor_count);
219            table.add_row(row);
220        }
221        let _ = writeln!(s, "{table}");
222    }
223
224    fn write_event_logs(&self, s: &mut String) {
225        let event_logs = self
226            .event_log_manager
227            .list_event_logs()
228            .into_iter()
229            .sorted_by(|a, b| {
230                a.timestamp
231                    .unwrap_or(0)
232                    .cmp(&b.timestamp.unwrap_or(0))
233                    .reverse()
234            })
235            .collect_vec();
236
237        let _ = writeln!(s, "latest barrier completions");
238        Self::write_event_logs_impl(
239            s,
240            event_logs.iter(),
241            |e| {
242                let Event::BarrierComplete(info) = e else {
243                    return None;
244                };
245                Some(json!(info).to_string())
246            },
247            Some(10),
248        );
249
250        let _ = writeln!(s);
251        let _ = writeln!(s, "latest barrier collection failures");
252        Self::write_event_logs_impl(
253            s,
254            event_logs.iter(),
255            |e| {
256                let Event::CollectBarrierFail(info) = e else {
257                    return None;
258                };
259                Some(json!(info).to_string())
260            },
261            Some(3),
262        );
263
264        let _ = writeln!(s);
265        let _ = writeln!(s, "latest barrier injection failures");
266        Self::write_event_logs_impl(
267            s,
268            event_logs.iter(),
269            |e| {
270                let Event::InjectBarrierFail(info) = e else {
271                    return None;
272                };
273                Some(json!(info).to_string())
274            },
275            Some(3),
276        );
277
278        let _ = writeln!(s);
279        let _ = writeln!(s, "latest worker node panics");
280        Self::write_event_logs_impl(
281            s,
282            event_logs.iter(),
283            |e| {
284                let Event::WorkerNodePanic(info) = e else {
285                    return None;
286                };
287                Some(json!(info).to_string())
288            },
289            Some(10),
290        );
291
292        let _ = writeln!(s);
293        let _ = writeln!(s, "latest create stream job failures");
294        Self::write_event_logs_impl(
295            s,
296            event_logs.iter(),
297            |e| {
298                let Event::CreateStreamJobFail(info) = e else {
299                    return None;
300                };
301                Some(json!(info).to_string())
302            },
303            Some(3),
304        );
305
306        let _ = writeln!(s);
307        let _ = writeln!(s, "latest dirty stream job clear-ups");
308        Self::write_event_logs_impl(
309            s,
310            event_logs.iter(),
311            |e| {
312                let Event::DirtyStreamJobClear(info) = e else {
313                    return None;
314                };
315                Some(json!(info).to_string())
316            },
317            Some(3),
318        );
319    }
320
321    fn write_event_logs_impl<'a, F>(
322        s: &mut String,
323        event_logs: impl Iterator<Item = &'a EventLog>,
324        get_event_info: F,
325        limit: Option<usize>,
326    ) where
327        F: Fn(&Event) -> Option<String>,
328    {
329        use comfy_table::{Row, Table};
330        let mut table = Table::new();
331        table.set_header({
332            let mut row = Row::new();
333            row.add_cell("created_at".into());
334            row.add_cell("info".into());
335            row
336        });
337        let mut row_count = 0;
338        for event_log in event_logs {
339            let Some(ref inner) = event_log.event else {
340                continue;
341            };
342            if let Some(limit) = limit
343                && row_count >= limit
344            {
345                break;
346            }
347            let mut row = Row::new();
348            let ts = event_log
349                .timestamp
350                .and_then(|ts| Timestamptz::from_millis(ts as _).map(|ts| ts.to_string()));
351            try_add_cell(&mut row, ts);
352            if let Some(info) = get_event_info(inner) {
353                row.add_cell(info.into());
354                row_count += 1;
355            } else {
356                continue;
357            }
358            table.add_row(row);
359        }
360        let _ = writeln!(s, "{table}");
361    }
362
363    async fn write_storage(&self, s: &mut String) {
364        let mut sst_num = 0;
365        let mut sst_total_file_size = 0;
366        let back_pressured_compaction_groups = self
367            .hummock_manger
368            .write_limits()
369            .await
370            .into_iter()
371            .filter_map(|(k, v)| {
372                if v.table_ids.is_empty() {
373                    None
374                } else {
375                    Some(k)
376                }
377            })
378            .join(",");
379        if !back_pressured_compaction_groups.is_empty() {
380            let _ = writeln!(
381                s,
382                "back pressured compaction groups: {back_pressured_compaction_groups}"
383            );
384        }
385
386        #[derive(PartialEq, Eq)]
387        struct SstableSort {
388            compaction_group_id: u64,
389            sst_id: HummockSstableId,
390            delete_ratio: u64,
391        }
392        impl PartialOrd for SstableSort {
393            fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
394                Some(self.cmp(other))
395            }
396        }
397        impl Ord for SstableSort {
398            fn cmp(&self, other: &Self) -> Ordering {
399                self.delete_ratio.cmp(&other.delete_ratio)
400            }
401        }
402        fn top_k_sstables(
403            top_k: usize,
404            heap: &mut BinaryHeap<Reverse<SstableSort>>,
405            e: SstableSort,
406        ) {
407            if heap.len() < top_k {
408                heap.push(Reverse(e));
409            } else if let Some(mut p) = heap.peek_mut()
410                && e.delete_ratio > p.0.delete_ratio
411            {
412                *p = Reverse(e);
413            }
414        }
415
416        let top_k = 10;
417        let mut top_tombstone_delete_sst = BinaryHeap::with_capacity(top_k);
418        let compaction_group_num = self
419            .hummock_manger
420            .on_current_version(|version| {
421                for compaction_group in version.levels.values() {
422                    let mut visit_level = |level: &Level| {
423                        sst_num += level.table_infos.len();
424                        sst_total_file_size +=
425                            level.table_infos.iter().map(|t| t.sst_size).sum::<u64>();
426                        for sst in &level.table_infos {
427                            if sst.total_key_count == 0 {
428                                continue;
429                            }
430                            let tombstone_delete_ratio =
431                                sst.stale_key_count * 10000 / sst.total_key_count;
432                            let e = SstableSort {
433                                compaction_group_id: compaction_group.group_id,
434                                sst_id: sst.sst_id,
435                                delete_ratio: tombstone_delete_ratio,
436                            };
437                            top_k_sstables(top_k, &mut top_tombstone_delete_sst, e);
438                        }
439                    };
440                    let l0 = &compaction_group.l0;
441                    // FIXME: why chaining levels iter leads to segmentation fault?
442                    for level in &l0.sub_levels {
443                        visit_level(level);
444                    }
445                    for level in &compaction_group.levels {
446                        visit_level(level);
447                    }
448                }
449                version.levels.len()
450            })
451            .await;
452
453        let _ = writeln!(s, "number of SSTables: {sst_num}");
454        let _ = writeln!(s, "total size of SSTables (byte): {sst_total_file_size}");
455        let _ = writeln!(s, "number of compaction groups: {compaction_group_num}");
456        use comfy_table::{Row, Table};
457        fn format_table(heap: BinaryHeap<Reverse<SstableSort>>) -> Table {
458            let mut table = Table::new();
459            table.set_header({
460                let mut row = Row::new();
461                row.add_cell("compaction group id".into());
462                row.add_cell("sstable id".into());
463                row.add_cell("delete ratio".into());
464                row
465            });
466            for sst in &heap.into_sorted_vec() {
467                let mut row = Row::new();
468                row.add_cell(sst.0.compaction_group_id.into());
469                row.add_cell(sst.0.sst_id.into());
470                row.add_cell(format!("{:.2}%", sst.0.delete_ratio as f64 / 100.0).into());
471                table.add_row(row);
472            }
473            table
474        }
475        let _ = writeln!(s);
476        let _ = writeln!(s, "top tombstone delete ratio");
477        let _ = writeln!(s, "{}", format_table(top_tombstone_delete_sst));
478        let _ = writeln!(s);
479
480        let _ = writeln!(s);
481        self.write_storage_prometheus(s).await;
482    }
483
484    async fn write_streaming_prometheus(&self, s: &mut String) {
485        let _ = writeln!(s, "top sources by throughput (rows/s)");
486        let query = format!(
487            "topk(3, sum(rate(stream_source_output_rows_counts{{{}}}[10m]))by (source_name))",
488            self.prometheus_selector
489        );
490        self.write_instant_vector_impl(s, &query, vec!["source_name"])
491            .await;
492
493        let _ = writeln!(s);
494        let _ = writeln!(s, "top materialized views by throughput (rows/s)");
495        let query = format!(
496            "topk(3, sum(rate(stream_mview_input_row_count{{{}}}[10m]))by (table_id))",
497            self.prometheus_selector
498        );
499        self.write_instant_vector_impl(s, &query, vec!["table_id"])
500            .await;
501
502        let _ = writeln!(s);
503        let _ = writeln!(s, "top join executor by matched rows");
504        let query = format!(
505            "topk(3, histogram_quantile(0.9, sum(rate(stream_join_matched_join_keys_bucket{{{}}}[10m])) by (le, fragment_id, table_id)))",
506            self.prometheus_selector
507        );
508        self.write_instant_vector_impl(s, &query, vec!["table_id", "fragment_id"])
509            .await;
510    }
511
512    async fn write_storage_prometheus(&self, s: &mut String) {
513        let _ = writeln!(s, "top Hummock Get by duration (second)");
514        let query = format!(
515            "topk(3, histogram_quantile(0.9, sum(rate(state_store_get_duration_bucket{{{}}}[10m])) by (le, table_id)))",
516            self.prometheus_selector
517        );
518        self.write_instant_vector_impl(s, &query, vec!["table_id"])
519            .await;
520
521        let _ = writeln!(s);
522        let _ = writeln!(s, "top Hummock Iter Init by duration (second)");
523        let query = format!(
524            "topk(3, histogram_quantile(0.9, sum(rate(state_store_iter_init_duration_bucket{{{}}}[10m])) by (le, table_id)))",
525            self.prometheus_selector
526        );
527        self.write_instant_vector_impl(s, &query, vec!["table_id"])
528            .await;
529
530        let _ = writeln!(s);
531        let _ = writeln!(s, "top table commit flush by size (byte)");
532        let query = format!(
533            "topk(3, sum(rate(storage_commit_write_throughput{{{}}}[10m])) by (table_id))",
534            self.prometheus_selector
535        );
536        self.write_instant_vector_impl(s, &query, vec!["table_id"])
537            .await;
538
539        let _ = writeln!(s);
540        let _ = writeln!(s, "object store read throughput (bytes/s)");
541        let query = format!(
542            "sum(rate(object_store_read_bytes{{{}}}[10m])) by (job, instance)",
543            merge_prometheus_selector([&self.prometheus_selector, "job=~\"compute|compactor\""])
544        );
545        self.write_instant_vector_impl(s, &query, vec!["job", "instance"])
546            .await;
547
548        let _ = writeln!(s);
549        let _ = writeln!(s, "object store write throughput (bytes/s)");
550        let query = format!(
551            "sum(rate(object_store_write_bytes{{{}}}[10m])) by (job, instance)",
552            merge_prometheus_selector([&self.prometheus_selector, "job=~\"compute|compactor\""])
553        );
554        self.write_instant_vector_impl(s, &query, vec!["job", "instance"])
555            .await;
556
557        let _ = writeln!(s);
558        let _ = writeln!(s, "object store operation rate");
559        let query = format!(
560            "sum(rate(object_store_operation_latency_count{{{}}}[10m])) by (le, type, job, instance)",
561            merge_prometheus_selector([
562                &self.prometheus_selector,
563                "job=~\"compute|compactor\", type!~\"streaming_upload_write_bytes|streaming_read_read_bytes|streaming_read\""
564            ])
565        );
566        self.write_instant_vector_impl(s, &query, vec!["type", "job", "instance"])
567            .await;
568
569        let _ = writeln!(s);
570        let _ = writeln!(s, "object store operation duration (second)");
571        let query = format!(
572            "histogram_quantile(0.9, sum(rate(object_store_operation_latency_bucket{{{}}}[10m])) by (le, type, job, instance))",
573            merge_prometheus_selector([
574                &self.prometheus_selector,
575                "job=~\"compute|compactor\", type!~\"streaming_upload_write_bytes|streaming_read\""
576            ])
577        );
578        self.write_instant_vector_impl(s, &query, vec!["type", "job", "instance"])
579            .await;
580    }
581
582    async fn write_instant_vector_impl(&self, s: &mut String, query: &str, labels: Vec<&str>) {
583        let Some(ref client) = self.prometheus_client else {
584            return;
585        };
586        if let Ok(Vector(instant_vec)) = client
587            .query(query)
588            .get()
589            .await
590            .map(|result| result.into_inner().0)
591        {
592            for i in instant_vec {
593                let l = labels
594                    .iter()
595                    .map(|label| {
596                        format!(
597                            "{}={}",
598                            *label,
599                            i.metric()
600                                .get(*label)
601                                .map(|s| s.as_str())
602                                .unwrap_or_default()
603                        )
604                    })
605                    .join(",");
606                let _ = writeln!(s, "{}: {:.3}", l, i.sample().value());
607            }
608        }
609    }
610
611    async fn write_await_tree(&self, s: &mut String, actor_traces_format: ActorTracesFormat) {
612        let all = dump_cluster_await_tree(
613            &self.metadata_manager,
614            &self.await_tree_reg,
615            actor_traces_format,
616        )
617        .await;
618
619        if let Ok(all) = all {
620            write!(s, "{}", all.output()).unwrap();
621        } else {
622            tracing::warn!("failed to dump await tree");
623        }
624    }
625
626    async fn write_table_definition(&self, s: &mut String) -> MetaResult<()> {
627        let sources = self
628            .metadata_manager
629            .catalog_controller
630            .list_sources()
631            .await?
632            .into_iter()
633            .map(|s| {
634                (
635                    s.id,
636                    (s.name, s.schema_id, s.definition, s.created_at_epoch),
637                )
638            })
639            .collect::<BTreeMap<_, _>>();
640        let mut user_tables = BTreeMap::new();
641        let mut mvs = BTreeMap::new();
642        let mut indexes = BTreeMap::new();
643        let mut internal_tables = BTreeMap::new();
644        {
645            let grouped = self
646                .metadata_manager
647                .catalog_controller
648                .list_all_state_tables()
649                .await?
650                .into_iter()
651                .chunk_by(|t| t.table_type());
652            for (table_type, tables) in &grouped {
653                let tables = tables.into_iter().map(|t| {
654                    (
655                        t.id,
656                        (t.name, t.schema_id, t.definition, t.created_at_epoch),
657                    )
658                });
659                match table_type {
660                    PbTableType::Table => user_tables.extend(tables),
661                    PbTableType::MaterializedView => mvs.extend(tables),
662                    PbTableType::Index | PbTableType::VectorIndex => indexes.extend(tables),
663                    PbTableType::Internal => internal_tables.extend(tables),
664                    PbTableType::Unspecified => {
665                        tracing::error!("unspecified table type: {:?}", tables.collect_vec());
666                    }
667                }
668            }
669        }
670        let sinks = self
671            .metadata_manager
672            .catalog_controller
673            .list_sinks()
674            .await?
675            .into_iter()
676            .map(|s| {
677                (
678                    s.id,
679                    (s.name, s.schema_id, s.definition, s.created_at_epoch),
680                )
681            })
682            .collect::<BTreeMap<_, _>>();
683        let catalogs = [
684            ("SOURCE", sources),
685            ("TABLE", user_tables),
686            ("MATERIALIZED VIEW", mvs),
687            ("INDEX", indexes),
688            ("SINK", sinks),
689            ("INTERNAL TABLE", internal_tables),
690        ];
691        let mut obj_id_to_name = HashMap::new();
692        for (title, items) in catalogs {
693            use comfy_table::{Row, Table};
694            let mut table = Table::new();
695            table.set_header({
696                let mut row = Row::new();
697                row.add_cell("id".into());
698                row.add_cell("name".into());
699                row.add_cell("schema_id".into());
700                row.add_cell("created_at".into());
701                row.add_cell("definition".into());
702                row
703            });
704            for (id, (name, schema_id, definition, created_at_epoch)) in items {
705                obj_id_to_name.insert(id, name.clone());
706                let mut row = Row::new();
707                let may_redact = redact_sql(&definition, self.redact_sql_option_keywords.clone())
708                    .unwrap_or_else(|| "[REDACTED]".into());
709                let created_at = if let Some(created_at_epoch) = created_at_epoch {
710                    format!("{}", Epoch::from(created_at_epoch).as_timestamptz())
711                } else {
712                    "".into()
713                };
714                row.add_cell(id.into());
715                row.add_cell(name.into());
716                row.add_cell(schema_id.into());
717                row.add_cell(created_at.into());
718                row.add_cell(may_redact.into());
719                table.add_row(row);
720            }
721            let _ = writeln!(s);
722            let _ = writeln!(s, "{title}");
723            let _ = writeln!(s, "{table}");
724        }
725
726        let actors = self
727            .metadata_manager
728            .catalog_controller
729            .list_actor_info()
730            .await?
731            .into_iter()
732            .map(|(actor_id, fragment_id, job_id, schema_id, obj_type)| {
733                (
734                    actor_id,
735                    (
736                        fragment_id,
737                        job_id,
738                        schema_id,
739                        obj_type,
740                        obj_id_to_name
741                            .get(&(job_id as _))
742                            .cloned()
743                            .unwrap_or_default(),
744                    ),
745                )
746            })
747            .collect::<BTreeMap<_, _>>();
748
749        use comfy_table::{Row, Table};
750        let mut table = Table::new();
751        table.set_header({
752            let mut row = Row::new();
753            row.add_cell("id".into());
754            row.add_cell("fragment_id".into());
755            row.add_cell("job_id".into());
756            row.add_cell("schema_id".into());
757            row.add_cell("type".into());
758            row.add_cell("name".into());
759            row
760        });
761        for (actor_id, (fragment_id, job_id, schema_id, ddl_type, name)) in actors {
762            let mut row = Row::new();
763            row.add_cell(actor_id.into());
764            row.add_cell(fragment_id.into());
765            row.add_cell(job_id.into());
766            row.add_cell(schema_id.into());
767            row.add_cell(ddl_type.as_str().into());
768            row.add_cell(name.into());
769            table.add_row(row);
770        }
771        let _ = writeln!(s);
772        let _ = writeln!(s, "ACTOR");
773        let _ = writeln!(s, "{table}");
774        Ok(())
775    }
776}
777
778fn try_add_cell<T: Into<comfy_table::Cell>>(row: &mut comfy_table::Row, t: Option<T>) {
779    match t {
780        Some(t) => {
781            row.add_cell(t.into());
782        }
783        None => {
784            row.add_cell("".into());
785        }
786    }
787}
788
789fn merge_prometheus_selector<'a>(selectors: impl IntoIterator<Item = &'a str>) -> String {
790    selectors.into_iter().filter(|s| !s.is_empty()).join(",")
791}
792
793fn redact_sql(sql: &str, keywords: RedactSqlOptionKeywordsRef) -> Option<String> {
794    match Parser::parse_sql(sql) {
795        Ok(sqls) => Some(
796            sqls.into_iter()
797                .map(|sql| sql.to_redacted_string(keywords.clone()))
798                .join(";"),
799        ),
800        Err(_) => None,
801    }
802}