risingwave_meta/manager/
diagnose.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::{Ordering, Reverse};
16use std::collections::{BTreeMap, BinaryHeap, HashMap};
17use std::fmt::Write;
18use std::sync::Arc;
19
20use itertools::Itertools;
21use prometheus_http_query::response::Data::Vector;
22use risingwave_common::types::Timestamptz;
23use risingwave_common::util::StackTraceResponseExt;
24use risingwave_common::util::epoch::Epoch;
25use risingwave_hummock_sdk::HummockSstableId;
26use risingwave_hummock_sdk::level::Level;
27use risingwave_pb::catalog::table::PbTableType;
28use risingwave_pb::common::WorkerType;
29use risingwave_pb::meta::EventLog;
30use risingwave_pb::meta::event_log::Event;
31use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
32use risingwave_sqlparser::ast::RedactSqlOptionKeywordsRef;
33use risingwave_sqlparser::parser::Parser;
34use serde_json::json;
35use thiserror_ext::AsReport;
36
37use crate::MetaResult;
38use crate::hummock::HummockManagerRef;
39use crate::manager::MetadataManager;
40use crate::manager::event_log::EventLogManagerRef;
41use crate::rpc::await_tree::dump_cluster_await_tree;
42
43pub type DiagnoseCommandRef = Arc<DiagnoseCommand>;
44
45pub struct DiagnoseCommand {
46    metadata_manager: MetadataManager,
47    await_tree_reg: await_tree::Registry,
48    hummock_manger: HummockManagerRef,
49    event_log_manager: EventLogManagerRef,
50    prometheus_client: Option<prometheus_http_query::Client>,
51    prometheus_selector: String,
52    redact_sql_option_keywords: RedactSqlOptionKeywordsRef,
53}
54
55impl DiagnoseCommand {
56    pub fn new(
57        metadata_manager: MetadataManager,
58        await_tree_reg: await_tree::Registry,
59        hummock_manger: HummockManagerRef,
60        event_log_manager: EventLogManagerRef,
61        prometheus_client: Option<prometheus_http_query::Client>,
62        prometheus_selector: String,
63        redact_sql_option_keywords: RedactSqlOptionKeywordsRef,
64    ) -> Self {
65        Self {
66            metadata_manager,
67            await_tree_reg,
68            hummock_manger,
69            event_log_manager,
70            prometheus_client,
71            prometheus_selector,
72            redact_sql_option_keywords,
73        }
74    }
75
76    pub async fn report(&self, actor_traces_format: ActorTracesFormat) -> String {
77        let mut report = String::new();
78        let _ = writeln!(
79            report,
80            "report created at: {}\nversion: {}",
81            chrono::DateTime::<chrono::offset::Utc>::from(std::time::SystemTime::now()),
82            risingwave_common::current_cluster_version(),
83        );
84        let _ = writeln!(report);
85        self.write_catalog(&mut report).await;
86        let _ = writeln!(report);
87        self.write_worker_nodes(&mut report).await;
88        let _ = writeln!(report);
89        self.write_streaming_prometheus(&mut report).await;
90        let _ = writeln!(report);
91        self.write_storage(&mut report).await;
92        let _ = writeln!(report);
93        self.write_await_tree(&mut report, actor_traces_format)
94            .await;
95        let _ = writeln!(report);
96        self.write_event_logs(&mut report);
97        report
98    }
99
100    async fn write_catalog(&self, s: &mut String) {
101        self.write_catalog_inner(s).await;
102        let _ = self.write_table_definition(s).await.inspect_err(|e| {
103            tracing::warn!(
104                error = e.to_report_string(),
105                "failed to display table definition"
106            )
107        });
108    }
109
110    async fn write_catalog_inner(&self, s: &mut String) {
111        let stats = self.metadata_manager.catalog_controller.stats().await;
112
113        let stat = match stats {
114            Ok(stat) => stat,
115            Err(err) => {
116                tracing::warn!(error=?err.as_report(), "failed to get catalog stats");
117                return;
118            }
119        };
120        let _ = writeln!(s, "number of fragment: {}", stat.streaming_job_num);
121        let _ = writeln!(s, "number of actor: {}", stat.actor_num);
122        let _ = writeln!(s, "number of source: {}", stat.source_num);
123        let _ = writeln!(s, "number of table: {}", stat.table_num);
124        let _ = writeln!(s, "number of materialized view: {}", stat.mview_num);
125        let _ = writeln!(s, "number of sink: {}", stat.sink_num);
126        let _ = writeln!(s, "number of index: {}", stat.index_num);
127        let _ = writeln!(s, "number of function: {}", stat.function_num);
128    }
129
130    async fn write_worker_nodes(&self, s: &mut String) {
131        let Ok(worker_actor_count) = self.metadata_manager.worker_actor_count() else {
132            tracing::warn!("failed to get worker actor count");
133            return;
134        };
135
136        use comfy_table::{Row, Table};
137        let Ok(worker_nodes) = self.metadata_manager.list_worker_node(None, None).await else {
138            tracing::warn!("failed to get worker nodes");
139            return;
140        };
141        let mut table = Table::new();
142        table.set_header({
143            let mut row = Row::new();
144            row.add_cell("id".into());
145            row.add_cell("host".into());
146            row.add_cell("type".into());
147            row.add_cell("state".into());
148            row.add_cell("parallelism".into());
149            row.add_cell("is_streaming".into());
150            row.add_cell("is_serving".into());
151            row.add_cell("rw_version".into());
152            row.add_cell("total_memory_bytes".into());
153            row.add_cell("total_cpu_cores".into());
154            row.add_cell("started_at".into());
155            row.add_cell("actor_count".into());
156            row
157        });
158        for worker_node in worker_nodes {
159            let mut row = Row::new();
160            row.add_cell(worker_node.id.into());
161            try_add_cell(
162                &mut row,
163                worker_node
164                    .host
165                    .as_ref()
166                    .map(|h| format!("{}:{}", h.host, h.port)),
167            );
168            try_add_cell(
169                &mut row,
170                worker_node.get_type().ok().map(|t| t.as_str_name()),
171            );
172            try_add_cell(
173                &mut row,
174                worker_node.get_state().ok().map(|s| s.as_str_name()),
175            );
176            try_add_cell(&mut row, worker_node.parallelism());
177            try_add_cell(
178                &mut row,
179                worker_node.property.as_ref().map(|p| p.is_streaming),
180            );
181            try_add_cell(
182                &mut row,
183                worker_node.property.as_ref().map(|p| p.is_serving),
184            );
185            try_add_cell(
186                &mut row,
187                worker_node.resource.as_ref().map(|r| r.rw_version.clone()),
188            );
189            try_add_cell(
190                &mut row,
191                worker_node.resource.as_ref().map(|r| r.total_memory_bytes),
192            );
193            try_add_cell(
194                &mut row,
195                worker_node.resource.as_ref().map(|r| r.total_cpu_cores),
196            );
197            try_add_cell(
198                &mut row,
199                worker_node
200                    .started_at
201                    .and_then(|ts| Timestamptz::from_secs(ts as _).map(|t| t.to_string())),
202            );
203            let actor_count = {
204                if let Ok(t) = worker_node.get_type()
205                    && t != WorkerType::ComputeNode
206                {
207                    None
208                } else {
209                    match worker_actor_count.get(&(worker_node.id as _)) {
210                        None => Some(0),
211                        Some(c) => Some(*c),
212                    }
213                }
214            };
215            try_add_cell(&mut row, actor_count);
216            table.add_row(row);
217        }
218        let _ = writeln!(s, "{table}");
219    }
220
221    fn write_event_logs(&self, s: &mut String) {
222        let event_logs = self
223            .event_log_manager
224            .list_event_logs()
225            .into_iter()
226            .sorted_by(|a, b| {
227                a.timestamp
228                    .unwrap_or(0)
229                    .cmp(&b.timestamp.unwrap_or(0))
230                    .reverse()
231            })
232            .collect_vec();
233
234        let _ = writeln!(s, "latest barrier completions");
235        Self::write_event_logs_impl(
236            s,
237            event_logs.iter(),
238            |e| {
239                let Event::BarrierComplete(info) = e else {
240                    return None;
241                };
242                Some(json!(info).to_string())
243            },
244            Some(10),
245        );
246
247        let _ = writeln!(s);
248        let _ = writeln!(s, "latest barrier collection failures");
249        Self::write_event_logs_impl(
250            s,
251            event_logs.iter(),
252            |e| {
253                let Event::CollectBarrierFail(info) = e else {
254                    return None;
255                };
256                Some(json!(info).to_string())
257            },
258            Some(3),
259        );
260
261        let _ = writeln!(s);
262        let _ = writeln!(s, "latest barrier injection failures");
263        Self::write_event_logs_impl(
264            s,
265            event_logs.iter(),
266            |e| {
267                let Event::InjectBarrierFail(info) = e else {
268                    return None;
269                };
270                Some(json!(info).to_string())
271            },
272            Some(3),
273        );
274
275        let _ = writeln!(s);
276        let _ = writeln!(s, "latest worker node panics");
277        Self::write_event_logs_impl(
278            s,
279            event_logs.iter(),
280            |e| {
281                let Event::WorkerNodePanic(info) = e else {
282                    return None;
283                };
284                Some(json!(info).to_string())
285            },
286            Some(10),
287        );
288
289        let _ = writeln!(s);
290        let _ = writeln!(s, "latest create stream job failures");
291        Self::write_event_logs_impl(
292            s,
293            event_logs.iter(),
294            |e| {
295                let Event::CreateStreamJobFail(info) = e else {
296                    return None;
297                };
298                Some(json!(info).to_string())
299            },
300            Some(3),
301        );
302
303        let _ = writeln!(s);
304        let _ = writeln!(s, "latest dirty stream job clear-ups");
305        Self::write_event_logs_impl(
306            s,
307            event_logs.iter(),
308            |e| {
309                let Event::DirtyStreamJobClear(info) = e else {
310                    return None;
311                };
312                Some(json!(info).to_string())
313            },
314            Some(3),
315        );
316    }
317
318    fn write_event_logs_impl<'a, F>(
319        s: &mut String,
320        event_logs: impl Iterator<Item = &'a EventLog>,
321        get_event_info: F,
322        limit: Option<usize>,
323    ) where
324        F: Fn(&Event) -> Option<String>,
325    {
326        use comfy_table::{Row, Table};
327        let mut table = Table::new();
328        table.set_header({
329            let mut row = Row::new();
330            row.add_cell("created_at".into());
331            row.add_cell("info".into());
332            row
333        });
334        let mut row_count = 0;
335        for event_log in event_logs {
336            let Some(ref inner) = event_log.event else {
337                continue;
338            };
339            if let Some(limit) = limit
340                && row_count >= limit
341            {
342                break;
343            }
344            let mut row = Row::new();
345            let ts = event_log
346                .timestamp
347                .and_then(|ts| Timestamptz::from_millis(ts as _).map(|ts| ts.to_string()));
348            try_add_cell(&mut row, ts);
349            if let Some(info) = get_event_info(inner) {
350                row.add_cell(info.into());
351                row_count += 1;
352            } else {
353                continue;
354            }
355            table.add_row(row);
356        }
357        let _ = writeln!(s, "{table}");
358    }
359
360    async fn write_storage(&self, s: &mut String) {
361        let mut sst_num = 0;
362        let mut sst_total_file_size = 0;
363        let back_pressured_compaction_groups = self
364            .hummock_manger
365            .write_limits()
366            .await
367            .into_iter()
368            .filter_map(|(k, v)| {
369                if v.table_ids.is_empty() {
370                    None
371                } else {
372                    Some(k)
373                }
374            })
375            .join(",");
376        if !back_pressured_compaction_groups.is_empty() {
377            let _ = writeln!(
378                s,
379                "back pressured compaction groups: {back_pressured_compaction_groups}"
380            );
381        }
382
383        #[derive(PartialEq, Eq)]
384        struct SstableSort {
385            compaction_group_id: u64,
386            sst_id: HummockSstableId,
387            delete_ratio: u64,
388        }
389        impl PartialOrd for SstableSort {
390            fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
391                Some(self.cmp(other))
392            }
393        }
394        impl Ord for SstableSort {
395            fn cmp(&self, other: &Self) -> Ordering {
396                self.delete_ratio.cmp(&other.delete_ratio)
397            }
398        }
399        fn top_k_sstables(
400            top_k: usize,
401            heap: &mut BinaryHeap<Reverse<SstableSort>>,
402            e: SstableSort,
403        ) {
404            if heap.len() < top_k {
405                heap.push(Reverse(e));
406            } else if let Some(mut p) = heap.peek_mut()
407                && e.delete_ratio > p.0.delete_ratio
408            {
409                *p = Reverse(e);
410            }
411        }
412
413        let top_k = 10;
414        let mut top_tombstone_delete_sst = BinaryHeap::with_capacity(top_k);
415        let compaction_group_num = self
416            .hummock_manger
417            .on_current_version(|version| {
418                for compaction_group in version.levels.values() {
419                    let mut visit_level = |level: &Level| {
420                        sst_num += level.table_infos.len();
421                        sst_total_file_size +=
422                            level.table_infos.iter().map(|t| t.sst_size).sum::<u64>();
423                        for sst in &level.table_infos {
424                            if sst.total_key_count == 0 {
425                                continue;
426                            }
427                            let tombstone_delete_ratio =
428                                sst.stale_key_count * 10000 / sst.total_key_count;
429                            let e = SstableSort {
430                                compaction_group_id: compaction_group.group_id,
431                                sst_id: sst.sst_id,
432                                delete_ratio: tombstone_delete_ratio,
433                            };
434                            top_k_sstables(top_k, &mut top_tombstone_delete_sst, e);
435                        }
436                    };
437                    let l0 = &compaction_group.l0;
438                    // FIXME: why chaining levels iter leads to segmentation fault?
439                    for level in &l0.sub_levels {
440                        visit_level(level);
441                    }
442                    for level in &compaction_group.levels {
443                        visit_level(level);
444                    }
445                }
446                version.levels.len()
447            })
448            .await;
449
450        let _ = writeln!(s, "number of SSTables: {sst_num}");
451        let _ = writeln!(s, "total size of SSTables (byte): {sst_total_file_size}");
452        let _ = writeln!(s, "number of compaction groups: {compaction_group_num}");
453        use comfy_table::{Row, Table};
454        fn format_table(heap: BinaryHeap<Reverse<SstableSort>>) -> Table {
455            let mut table = Table::new();
456            table.set_header({
457                let mut row = Row::new();
458                row.add_cell("compaction group id".into());
459                row.add_cell("sstable id".into());
460                row.add_cell("delete ratio".into());
461                row
462            });
463            for sst in &heap.into_sorted_vec() {
464                let mut row = Row::new();
465                row.add_cell(sst.0.compaction_group_id.into());
466                row.add_cell(sst.0.sst_id.into());
467                row.add_cell(format!("{:.2}%", sst.0.delete_ratio as f64 / 100.0).into());
468                table.add_row(row);
469            }
470            table
471        }
472        let _ = writeln!(s);
473        let _ = writeln!(s, "top tombstone delete ratio");
474        let _ = writeln!(s, "{}", format_table(top_tombstone_delete_sst));
475        let _ = writeln!(s);
476
477        let _ = writeln!(s);
478        self.write_storage_prometheus(s).await;
479    }
480
481    async fn write_streaming_prometheus(&self, s: &mut String) {
482        let _ = writeln!(s, "top sources by throughput (rows/s)");
483        let query = format!(
484            "topk(3, sum(rate(stream_source_output_rows_counts{{{}}}[10m]))by (source_name))",
485            self.prometheus_selector
486        );
487        self.write_instant_vector_impl(s, &query, vec!["source_name"])
488            .await;
489
490        let _ = writeln!(s);
491        let _ = writeln!(s, "top materialized views by throughput (rows/s)");
492        let query = format!(
493            "topk(3, sum(rate(stream_mview_input_row_count{{{}}}[10m]))by (table_id))",
494            self.prometheus_selector
495        );
496        self.write_instant_vector_impl(s, &query, vec!["table_id"])
497            .await;
498
499        let _ = writeln!(s);
500        let _ = writeln!(s, "top join executor by matched rows");
501        let query = format!(
502            "topk(3, histogram_quantile(0.9, sum(rate(stream_join_matched_join_keys_bucket{{{}}}[10m])) by (le, fragment_id, table_id)))",
503            self.prometheus_selector
504        );
505        self.write_instant_vector_impl(s, &query, vec!["table_id", "fragment_id"])
506            .await;
507    }
508
509    async fn write_storage_prometheus(&self, s: &mut String) {
510        let _ = writeln!(s, "top Hummock Get by duration (second)");
511        let query = format!(
512            "topk(3, histogram_quantile(0.9, sum(rate(state_store_get_duration_bucket{{{}}}[10m])) by (le, table_id)))",
513            self.prometheus_selector
514        );
515        self.write_instant_vector_impl(s, &query, vec!["table_id"])
516            .await;
517
518        let _ = writeln!(s);
519        let _ = writeln!(s, "top Hummock Iter Init by duration (second)");
520        let query = format!(
521            "topk(3, histogram_quantile(0.9, sum(rate(state_store_iter_init_duration_bucket{{{}}}[10m])) by (le, table_id)))",
522            self.prometheus_selector
523        );
524        self.write_instant_vector_impl(s, &query, vec!["table_id"])
525            .await;
526
527        let _ = writeln!(s);
528        let _ = writeln!(s, "top table commit flush by size (byte)");
529        let query = format!(
530            "topk(3, sum(rate(storage_commit_write_throughput{{{}}}[10m])) by (table_id))",
531            self.prometheus_selector
532        );
533        self.write_instant_vector_impl(s, &query, vec!["table_id"])
534            .await;
535
536        let _ = writeln!(s);
537        let _ = writeln!(s, "object store read throughput (bytes/s)");
538        let query = format!(
539            "sum(rate(object_store_read_bytes{{{}}}[10m])) by (job, instance)",
540            merge_prometheus_selector([&self.prometheus_selector, "job=~\"compute|compactor\""])
541        );
542        self.write_instant_vector_impl(s, &query, vec!["job", "instance"])
543            .await;
544
545        let _ = writeln!(s);
546        let _ = writeln!(s, "object store write throughput (bytes/s)");
547        let query = format!(
548            "sum(rate(object_store_write_bytes{{{}}}[10m])) by (job, instance)",
549            merge_prometheus_selector([&self.prometheus_selector, "job=~\"compute|compactor\""])
550        );
551        self.write_instant_vector_impl(s, &query, vec!["job", "instance"])
552            .await;
553
554        let _ = writeln!(s);
555        let _ = writeln!(s, "object store operation rate");
556        let query = format!(
557            "sum(rate(object_store_operation_latency_count{{{}}}[10m])) by (le, type, job, instance)",
558            merge_prometheus_selector([
559                &self.prometheus_selector,
560                "job=~\"compute|compactor\", type!~\"streaming_upload_write_bytes|streaming_read_read_bytes|streaming_read\""
561            ])
562        );
563        self.write_instant_vector_impl(s, &query, vec!["type", "job", "instance"])
564            .await;
565
566        let _ = writeln!(s);
567        let _ = writeln!(s, "object store operation duration (second)");
568        let query = format!(
569            "histogram_quantile(0.9, sum(rate(object_store_operation_latency_bucket{{{}}}[10m])) by (le, type, job, instance))",
570            merge_prometheus_selector([
571                &self.prometheus_selector,
572                "job=~\"compute|compactor\", type!~\"streaming_upload_write_bytes|streaming_read\""
573            ])
574        );
575        self.write_instant_vector_impl(s, &query, vec!["type", "job", "instance"])
576            .await;
577    }
578
579    async fn write_instant_vector_impl(&self, s: &mut String, query: &str, labels: Vec<&str>) {
580        let Some(ref client) = self.prometheus_client else {
581            return;
582        };
583        if let Ok(Vector(instant_vec)) = client
584            .query(query)
585            .get()
586            .await
587            .map(|result| result.into_inner().0)
588        {
589            for i in instant_vec {
590                let l = labels
591                    .iter()
592                    .map(|label| {
593                        format!(
594                            "{}={}",
595                            *label,
596                            i.metric()
597                                .get(*label)
598                                .map(|s| s.as_str())
599                                .unwrap_or_default()
600                        )
601                    })
602                    .join(",");
603                let _ = writeln!(s, "{}: {:.3}", l, i.sample().value());
604            }
605        }
606    }
607
608    async fn write_await_tree(&self, s: &mut String, actor_traces_format: ActorTracesFormat) {
609        let all = dump_cluster_await_tree(
610            &self.metadata_manager,
611            &self.await_tree_reg,
612            actor_traces_format,
613        )
614        .await;
615
616        if let Ok(all) = all {
617            write!(s, "{}", all.output()).unwrap();
618        } else {
619            tracing::warn!("failed to dump await tree");
620        }
621    }
622
623    async fn write_table_definition(&self, s: &mut String) -> MetaResult<()> {
624        let sources = self
625            .metadata_manager
626            .catalog_controller
627            .list_sources()
628            .await?
629            .into_iter()
630            .map(|s| {
631                (
632                    s.id,
633                    (s.name, s.schema_id, s.definition, s.created_at_epoch),
634                )
635            })
636            .collect::<BTreeMap<_, _>>();
637        let mut user_tables = BTreeMap::new();
638        let mut mvs = BTreeMap::new();
639        let mut indexes = BTreeMap::new();
640        let mut internal_tables = BTreeMap::new();
641        {
642            let grouped = self
643                .metadata_manager
644                .catalog_controller
645                .list_all_state_tables()
646                .await?
647                .into_iter()
648                .chunk_by(|t| t.table_type());
649            for (table_type, tables) in &grouped {
650                let tables = tables.into_iter().map(|t| {
651                    (
652                        t.id,
653                        (t.name, t.schema_id, t.definition, t.created_at_epoch),
654                    )
655                });
656                match table_type {
657                    PbTableType::Table => user_tables.extend(tables),
658                    PbTableType::MaterializedView => mvs.extend(tables),
659                    PbTableType::Index | PbTableType::VectorIndex => indexes.extend(tables),
660                    PbTableType::Internal => internal_tables.extend(tables),
661                    PbTableType::Unspecified => {
662                        tracing::error!("unspecified table type: {:?}", tables.collect_vec());
663                    }
664                }
665            }
666        }
667        let sinks = self
668            .metadata_manager
669            .catalog_controller
670            .list_sinks()
671            .await?
672            .into_iter()
673            .map(|s| {
674                (
675                    s.id,
676                    (s.name, s.schema_id, s.definition, s.created_at_epoch),
677                )
678            })
679            .collect::<BTreeMap<_, _>>();
680        let catalogs = [
681            ("SOURCE", sources),
682            ("TABLE", user_tables),
683            ("MATERIALIZED VIEW", mvs),
684            ("INDEX", indexes),
685            ("SINK", sinks),
686            ("INTERNAL TABLE", internal_tables),
687        ];
688        let mut obj_id_to_name = HashMap::new();
689        for (title, items) in catalogs {
690            use comfy_table::{Row, Table};
691            let mut table = Table::new();
692            table.set_header({
693                let mut row = Row::new();
694                row.add_cell("id".into());
695                row.add_cell("name".into());
696                row.add_cell("schema_id".into());
697                row.add_cell("created_at".into());
698                row.add_cell("definition".into());
699                row
700            });
701            for (id, (name, schema_id, definition, created_at_epoch)) in items {
702                obj_id_to_name.insert(id, name.clone());
703                let mut row = Row::new();
704                let may_redact = redact_sql(&definition, self.redact_sql_option_keywords.clone())
705                    .unwrap_or_else(|| "[REDACTED]".into());
706                let created_at = if let Some(created_at_epoch) = created_at_epoch {
707                    format!("{}", Epoch::from(created_at_epoch).as_timestamptz())
708                } else {
709                    "".into()
710                };
711                row.add_cell(id.into());
712                row.add_cell(name.into());
713                row.add_cell(schema_id.into());
714                row.add_cell(created_at.into());
715                row.add_cell(may_redact.into());
716                table.add_row(row);
717            }
718            let _ = writeln!(s);
719            let _ = writeln!(s, "{title}");
720            let _ = writeln!(s, "{table}");
721        }
722
723        let actors = self
724            .metadata_manager
725            .catalog_controller
726            .list_actor_info()
727            .await?
728            .into_iter()
729            .map(|(actor_id, fragment_id, job_id, schema_id, obj_type)| {
730                (
731                    actor_id,
732                    (
733                        fragment_id,
734                        job_id,
735                        schema_id,
736                        obj_type,
737                        obj_id_to_name
738                            .get(&(job_id as _))
739                            .cloned()
740                            .unwrap_or_default(),
741                    ),
742                )
743            })
744            .collect::<BTreeMap<_, _>>();
745
746        use comfy_table::{Row, Table};
747        let mut table = Table::new();
748        table.set_header({
749            let mut row = Row::new();
750            row.add_cell("id".into());
751            row.add_cell("fragment_id".into());
752            row.add_cell("job_id".into());
753            row.add_cell("schema_id".into());
754            row.add_cell("type".into());
755            row.add_cell("name".into());
756            row
757        });
758        for (actor_id, (fragment_id, job_id, schema_id, ddl_type, name)) in actors {
759            let mut row = Row::new();
760            row.add_cell(actor_id.into());
761            row.add_cell(fragment_id.into());
762            row.add_cell(job_id.into());
763            row.add_cell(schema_id.into());
764            row.add_cell(ddl_type.as_str().into());
765            row.add_cell(name.into());
766            table.add_row(row);
767        }
768        let _ = writeln!(s);
769        let _ = writeln!(s, "ACTOR");
770        let _ = writeln!(s, "{table}");
771        Ok(())
772    }
773}
774
775fn try_add_cell<T: Into<comfy_table::Cell>>(row: &mut comfy_table::Row, t: Option<T>) {
776    match t {
777        Some(t) => {
778            row.add_cell(t.into());
779        }
780        None => {
781            row.add_cell("".into());
782        }
783    }
784}
785
786fn merge_prometheus_selector<'a>(selectors: impl IntoIterator<Item = &'a str>) -> String {
787    selectors.into_iter().filter(|s| !s.is_empty()).join(",")
788}
789
790fn redact_sql(sql: &str, keywords: RedactSqlOptionKeywordsRef) -> Option<String> {
791    match Parser::parse_sql(sql) {
792        Ok(sqls) => Some(
793            sqls.into_iter()
794                .map(|sql| sql.to_redacted_string(keywords.clone()))
795                .join(";"),
796        ),
797        Err(_) => None,
798    }
799}