risingwave_meta/manager/
diagnose.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::{Ordering, Reverse};
16use std::collections::{BTreeMap, BinaryHeap, HashMap};
17use std::fmt::Write;
18use std::sync::Arc;
19
20use itertools::Itertools;
21use prometheus_http_query::response::Data::Vector;
22use risingwave_common::types::Timestamptz;
23use risingwave_common::util::StackTraceResponseExt;
24use risingwave_hummock_sdk::HummockSstableId;
25use risingwave_hummock_sdk::level::Level;
26use risingwave_meta_model::table::TableType;
27use risingwave_pb::common::WorkerType;
28use risingwave_pb::meta::EventLog;
29use risingwave_pb::meta::event_log::Event;
30use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
31use risingwave_sqlparser::ast::RedactSqlOptionKeywordsRef;
32use risingwave_sqlparser::parser::Parser;
33use serde_json::json;
34use thiserror_ext::AsReport;
35
36use crate::MetaResult;
37use crate::hummock::HummockManagerRef;
38use crate::manager::MetadataManager;
39use crate::manager::event_log::EventLogManagerRef;
40use crate::rpc::await_tree::dump_cluster_await_tree;
41
42pub type DiagnoseCommandRef = Arc<DiagnoseCommand>;
43
44pub struct DiagnoseCommand {
45    metadata_manager: MetadataManager,
46    await_tree_reg: await_tree::Registry,
47    hummock_manger: HummockManagerRef,
48    event_log_manager: EventLogManagerRef,
49    prometheus_client: Option<prometheus_http_query::Client>,
50    prometheus_selector: String,
51    redact_sql_option_keywords: RedactSqlOptionKeywordsRef,
52}
53
54impl DiagnoseCommand {
55    pub fn new(
56        metadata_manager: MetadataManager,
57        await_tree_reg: await_tree::Registry,
58        hummock_manger: HummockManagerRef,
59        event_log_manager: EventLogManagerRef,
60        prometheus_client: Option<prometheus_http_query::Client>,
61        prometheus_selector: String,
62        redact_sql_option_keywords: RedactSqlOptionKeywordsRef,
63    ) -> Self {
64        Self {
65            metadata_manager,
66            await_tree_reg,
67            hummock_manger,
68            event_log_manager,
69            prometheus_client,
70            prometheus_selector,
71            redact_sql_option_keywords,
72        }
73    }
74
75    #[cfg_attr(coverage, coverage(off))]
76    pub async fn report(&self, actor_traces_format: ActorTracesFormat) -> String {
77        let mut report = String::new();
78        let _ = writeln!(
79            report,
80            "report created at: {}\nversion: {}",
81            chrono::DateTime::<chrono::offset::Utc>::from(std::time::SystemTime::now()),
82            risingwave_common::current_cluster_version(),
83        );
84        let _ = writeln!(report);
85        self.write_catalog(&mut report).await;
86        let _ = writeln!(report);
87        self.write_worker_nodes(&mut report).await;
88        let _ = writeln!(report);
89        self.write_streaming_prometheus(&mut report).await;
90        let _ = writeln!(report);
91        self.write_storage(&mut report).await;
92        let _ = writeln!(report);
93        self.write_await_tree(&mut report, actor_traces_format)
94            .await;
95        let _ = writeln!(report);
96        self.write_event_logs(&mut report);
97        report
98    }
99
100    #[cfg_attr(coverage, coverage(off))]
101    async fn write_catalog(&self, s: &mut String) {
102        self.write_catalog_inner(s).await;
103        let _ = self.write_table_definition(s).await.inspect_err(|e| {
104            tracing::warn!(
105                error = e.to_report_string(),
106                "failed to display table definition"
107            )
108        });
109    }
110
111    #[cfg_attr(coverage, coverage(off))]
112    async fn write_catalog_inner(&self, s: &mut String) {
113        let guard = self
114            .metadata_manager
115            .catalog_controller
116            .get_inner_read_guard()
117            .await;
118        let stat = match guard.stats().await {
119            Ok(stat) => stat,
120            Err(err) => {
121                tracing::warn!(error=?err.as_report(), "failed to get catalog stats");
122                return;
123            }
124        };
125        let _ = writeln!(s, "number of fragment: {}", stat.streaming_job_num);
126        let _ = writeln!(s, "number of actor: {}", stat.actor_num);
127        let _ = writeln!(s, "number of source: {}", stat.source_num);
128        let _ = writeln!(s, "number of table: {}", stat.table_num);
129        let _ = writeln!(s, "number of materialized view: {}", stat.mview_num);
130        let _ = writeln!(s, "number of sink: {}", stat.sink_num);
131        let _ = writeln!(s, "number of index: {}", stat.index_num);
132        let _ = writeln!(s, "number of function: {}", stat.function_num);
133    }
134
135    #[cfg_attr(coverage, coverage(off))]
136    async fn write_worker_nodes(&self, s: &mut String) {
137        let Ok(worker_actor_count) = self.metadata_manager.worker_actor_count().await else {
138            tracing::warn!("failed to get worker actor count");
139            return;
140        };
141
142        use comfy_table::{Row, Table};
143        let Ok(worker_nodes) = self.metadata_manager.list_worker_node(None, None).await else {
144            tracing::warn!("failed to get worker nodes");
145            return;
146        };
147        let mut table = Table::new();
148        table.set_header({
149            let mut row = Row::new();
150            row.add_cell("id".into());
151            row.add_cell("host".into());
152            row.add_cell("type".into());
153            row.add_cell("state".into());
154            row.add_cell("parallelism".into());
155            row.add_cell("is_streaming".into());
156            row.add_cell("is_serving".into());
157            row.add_cell("rw_version".into());
158            row.add_cell("total_memory_bytes".into());
159            row.add_cell("total_cpu_cores".into());
160            row.add_cell("started_at".into());
161            row.add_cell("actor_count".into());
162            row
163        });
164        for worker_node in worker_nodes {
165            let mut row = Row::new();
166            row.add_cell(worker_node.id.into());
167            try_add_cell(
168                &mut row,
169                worker_node
170                    .host
171                    .as_ref()
172                    .map(|h| format!("{}:{}", h.host, h.port)),
173            );
174            try_add_cell(
175                &mut row,
176                worker_node.get_type().ok().map(|t| t.as_str_name()),
177            );
178            try_add_cell(
179                &mut row,
180                worker_node.get_state().ok().map(|s| s.as_str_name()),
181            );
182            try_add_cell(&mut row, worker_node.parallelism());
183            try_add_cell(
184                &mut row,
185                worker_node.property.as_ref().map(|p| p.is_streaming),
186            );
187            try_add_cell(
188                &mut row,
189                worker_node.property.as_ref().map(|p| p.is_serving),
190            );
191            try_add_cell(
192                &mut row,
193                worker_node.resource.as_ref().map(|r| r.rw_version.clone()),
194            );
195            try_add_cell(
196                &mut row,
197                worker_node.resource.as_ref().map(|r| r.total_memory_bytes),
198            );
199            try_add_cell(
200                &mut row,
201                worker_node.resource.as_ref().map(|r| r.total_cpu_cores),
202            );
203            try_add_cell(
204                &mut row,
205                worker_node
206                    .started_at
207                    .and_then(|ts| Timestamptz::from_secs(ts as _).map(|t| t.to_string())),
208            );
209            let actor_count = {
210                if let Ok(t) = worker_node.get_type()
211                    && t != WorkerType::ComputeNode
212                {
213                    None
214                } else {
215                    match worker_actor_count.get(&(worker_node.id as _)) {
216                        None => Some(0),
217                        Some(c) => Some(*c),
218                    }
219                }
220            };
221            try_add_cell(&mut row, actor_count);
222            table.add_row(row);
223        }
224        let _ = writeln!(s, "{table}");
225    }
226
227    #[cfg_attr(coverage, coverage(off))]
228    fn write_event_logs(&self, s: &mut String) {
229        let event_logs = self
230            .event_log_manager
231            .list_event_logs()
232            .into_iter()
233            .sorted_by(|a, b| {
234                a.timestamp
235                    .unwrap_or(0)
236                    .cmp(&b.timestamp.unwrap_or(0))
237                    .reverse()
238            })
239            .collect_vec();
240
241        let _ = writeln!(s, "latest barrier completions");
242        Self::write_event_logs_impl(
243            s,
244            event_logs.iter(),
245            |e| {
246                let Event::BarrierComplete(info) = e else {
247                    return None;
248                };
249                Some(json!(info).to_string())
250            },
251            Some(10),
252        );
253
254        let _ = writeln!(s);
255        let _ = writeln!(s, "latest barrier collection failures");
256        Self::write_event_logs_impl(
257            s,
258            event_logs.iter(),
259            |e| {
260                let Event::CollectBarrierFail(info) = e else {
261                    return None;
262                };
263                Some(json!(info).to_string())
264            },
265            Some(3),
266        );
267
268        let _ = writeln!(s);
269        let _ = writeln!(s, "latest barrier injection failures");
270        Self::write_event_logs_impl(
271            s,
272            event_logs.iter(),
273            |e| {
274                let Event::InjectBarrierFail(info) = e else {
275                    return None;
276                };
277                Some(json!(info).to_string())
278            },
279            Some(3),
280        );
281
282        let _ = writeln!(s);
283        let _ = writeln!(s, "latest worker node panics");
284        Self::write_event_logs_impl(
285            s,
286            event_logs.iter(),
287            |e| {
288                let Event::WorkerNodePanic(info) = e else {
289                    return None;
290                };
291                Some(json!(info).to_string())
292            },
293            Some(10),
294        );
295
296        let _ = writeln!(s);
297        let _ = writeln!(s, "latest create stream job failures");
298        Self::write_event_logs_impl(
299            s,
300            event_logs.iter(),
301            |e| {
302                let Event::CreateStreamJobFail(info) = e else {
303                    return None;
304                };
305                Some(json!(info).to_string())
306            },
307            Some(3),
308        );
309
310        let _ = writeln!(s);
311        let _ = writeln!(s, "latest dirty stream job clear-ups");
312        Self::write_event_logs_impl(
313            s,
314            event_logs.iter(),
315            |e| {
316                let Event::DirtyStreamJobClear(info) = e else {
317                    return None;
318                };
319                Some(json!(info).to_string())
320            },
321            Some(3),
322        );
323    }
324
325    #[cfg_attr(coverage, coverage(off))]
326    fn write_event_logs_impl<'a, F>(
327        s: &mut String,
328        event_logs: impl Iterator<Item = &'a EventLog>,
329        get_event_info: F,
330        limit: Option<usize>,
331    ) where
332        F: Fn(&Event) -> Option<String>,
333    {
334        use comfy_table::{Row, Table};
335        let mut table = Table::new();
336        table.set_header({
337            let mut row = Row::new();
338            row.add_cell("created_at".into());
339            row.add_cell("info".into());
340            row
341        });
342        let mut row_count = 0;
343        for event_log in event_logs {
344            let Some(ref inner) = event_log.event else {
345                continue;
346            };
347            if let Some(limit) = limit
348                && row_count >= limit
349            {
350                break;
351            }
352            let mut row = Row::new();
353            let ts = event_log
354                .timestamp
355                .and_then(|ts| Timestamptz::from_millis(ts as _).map(|ts| ts.to_string()));
356            try_add_cell(&mut row, ts);
357            if let Some(info) = get_event_info(inner) {
358                row.add_cell(info.into());
359                row_count += 1;
360            } else {
361                continue;
362            }
363            table.add_row(row);
364        }
365        let _ = writeln!(s, "{table}");
366    }
367
368    #[cfg_attr(coverage, coverage(off))]
369    async fn write_storage(&self, s: &mut String) {
370        let mut sst_num = 0;
371        let mut sst_total_file_size = 0;
372        let back_pressured_compaction_groups = self
373            .hummock_manger
374            .write_limits()
375            .await
376            .into_iter()
377            .filter_map(|(k, v)| {
378                if v.table_ids.is_empty() {
379                    None
380                } else {
381                    Some(k)
382                }
383            })
384            .join(",");
385        if !back_pressured_compaction_groups.is_empty() {
386            let _ = writeln!(
387                s,
388                "back pressured compaction groups: {back_pressured_compaction_groups}"
389            );
390        }
391
392        #[derive(PartialEq, Eq)]
393        struct SstableSort {
394            compaction_group_id: u64,
395            sst_id: HummockSstableId,
396            delete_ratio: u64,
397        }
398        impl PartialOrd for SstableSort {
399            fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
400                Some(self.cmp(other))
401            }
402        }
403        impl Ord for SstableSort {
404            fn cmp(&self, other: &Self) -> Ordering {
405                self.delete_ratio.cmp(&other.delete_ratio)
406            }
407        }
408        fn top_k_sstables(
409            top_k: usize,
410            heap: &mut BinaryHeap<Reverse<SstableSort>>,
411            e: SstableSort,
412        ) {
413            if heap.len() < top_k {
414                heap.push(Reverse(e));
415            } else if let Some(mut p) = heap.peek_mut() {
416                if e.delete_ratio > p.0.delete_ratio {
417                    *p = Reverse(e);
418                }
419            }
420        }
421
422        let top_k = 10;
423        let mut top_tombstone_delete_sst = BinaryHeap::with_capacity(top_k);
424        let compaction_group_num = self
425            .hummock_manger
426            .on_current_version(|version| {
427                for compaction_group in version.levels.values() {
428                    let mut visit_level = |level: &Level| {
429                        sst_num += level.table_infos.len();
430                        sst_total_file_size +=
431                            level.table_infos.iter().map(|t| t.sst_size).sum::<u64>();
432                        for sst in &level.table_infos {
433                            if sst.total_key_count == 0 {
434                                continue;
435                            }
436                            let tombstone_delete_ratio =
437                                sst.stale_key_count * 10000 / sst.total_key_count;
438                            let e = SstableSort {
439                                compaction_group_id: compaction_group.group_id,
440                                sst_id: sst.sst_id,
441                                delete_ratio: tombstone_delete_ratio,
442                            };
443                            top_k_sstables(top_k, &mut top_tombstone_delete_sst, e);
444                        }
445                    };
446                    let l0 = &compaction_group.l0;
447                    // FIXME: why chaining levels iter leads to segmentation fault?
448                    for level in &l0.sub_levels {
449                        visit_level(level);
450                    }
451                    for level in &compaction_group.levels {
452                        visit_level(level);
453                    }
454                }
455                version.levels.len()
456            })
457            .await;
458
459        let _ = writeln!(s, "number of SSTables: {sst_num}");
460        let _ = writeln!(s, "total size of SSTables (byte): {sst_total_file_size}");
461        let _ = writeln!(s, "number of compaction groups: {compaction_group_num}");
462        use comfy_table::{Row, Table};
463        fn format_table(heap: BinaryHeap<Reverse<SstableSort>>) -> Table {
464            let mut table = Table::new();
465            table.set_header({
466                let mut row = Row::new();
467                row.add_cell("compaction group id".into());
468                row.add_cell("sstable id".into());
469                row.add_cell("delete ratio".into());
470                row
471            });
472            for sst in &heap.into_sorted_vec() {
473                let mut row = Row::new();
474                row.add_cell(sst.0.compaction_group_id.into());
475                row.add_cell(sst.0.sst_id.into());
476                row.add_cell(format!("{:.2}%", sst.0.delete_ratio as f64 / 100.0).into());
477                table.add_row(row);
478            }
479            table
480        }
481        let _ = writeln!(s);
482        let _ = writeln!(s, "top tombstone delete ratio");
483        let _ = writeln!(s, "{}", format_table(top_tombstone_delete_sst));
484        let _ = writeln!(s);
485
486        let _ = writeln!(s);
487        self.write_storage_prometheus(s).await;
488    }
489
490    #[cfg_attr(coverage, coverage(off))]
491    async fn write_streaming_prometheus(&self, s: &mut String) {
492        let _ = writeln!(s, "top sources by throughput (rows/s)");
493        let query = format!(
494            "topk(3, sum(rate(stream_source_output_rows_counts{{{}}}[10m]))by (source_name))",
495            self.prometheus_selector
496        );
497        self.write_instant_vector_impl(s, &query, vec!["source_name"])
498            .await;
499
500        let _ = writeln!(s);
501        let _ = writeln!(s, "top materialized views by throughput (rows/s)");
502        let query = format!(
503            "topk(3, sum(rate(stream_mview_input_row_count{{{}}}[10m]))by (table_id))",
504            self.prometheus_selector
505        );
506        self.write_instant_vector_impl(s, &query, vec!["table_id"])
507            .await;
508
509        let _ = writeln!(s);
510        let _ = writeln!(s, "top join executor by matched rows");
511        let query = format!(
512            "topk(3, histogram_quantile(0.9, sum(rate(stream_join_matched_join_keys_bucket{{{}}}[10m])) by (le, fragment_id, table_id)))",
513            self.prometheus_selector
514        );
515        self.write_instant_vector_impl(s, &query, vec!["table_id", "fragment_id"])
516            .await;
517    }
518
519    #[cfg_attr(coverage, coverage(off))]
520    async fn write_storage_prometheus(&self, s: &mut String) {
521        let _ = writeln!(s, "top Hummock Get by duration (second)");
522        let query = format!(
523            "topk(3, histogram_quantile(0.9, sum(rate(state_store_get_duration_bucket{{{}}}[10m])) by (le, table_id)))",
524            self.prometheus_selector
525        );
526        self.write_instant_vector_impl(s, &query, vec!["table_id"])
527            .await;
528
529        let _ = writeln!(s);
530        let _ = writeln!(s, "top Hummock Iter Init by duration (second)");
531        let query = format!(
532            "topk(3, histogram_quantile(0.9, sum(rate(state_store_iter_init_duration_bucket{{{}}}[10m])) by (le, table_id)))",
533            self.prometheus_selector
534        );
535        self.write_instant_vector_impl(s, &query, vec!["table_id"])
536            .await;
537
538        let _ = writeln!(s);
539        let _ = writeln!(s, "top table commit flush by size (byte)");
540        let query = format!(
541            "topk(3, sum(rate(storage_commit_write_throughput{{{}}}[10m])) by (table_id))",
542            self.prometheus_selector
543        );
544        self.write_instant_vector_impl(s, &query, vec!["table_id"])
545            .await;
546
547        let _ = writeln!(s);
548        let _ = writeln!(s, "object store read throughput (bytes/s)");
549        let query = format!(
550            "sum(rate(object_store_read_bytes{{{}}}[10m])) by (job, instance)",
551            merge_prometheus_selector([&self.prometheus_selector, "job=~\"compute|compactor\""])
552        );
553        self.write_instant_vector_impl(s, &query, vec!["job", "instance"])
554            .await;
555
556        let _ = writeln!(s);
557        let _ = writeln!(s, "object store write throughput (bytes/s)");
558        let query = format!(
559            "sum(rate(object_store_write_bytes{{{}}}[10m])) by (job, instance)",
560            merge_prometheus_selector([&self.prometheus_selector, "job=~\"compute|compactor\""])
561        );
562        self.write_instant_vector_impl(s, &query, vec!["job", "instance"])
563            .await;
564
565        let _ = writeln!(s);
566        let _ = writeln!(s, "object store operation rate");
567        let query = format!(
568            "sum(rate(object_store_operation_latency_count{{{}}}[10m])) by (le, type, job, instance)",
569            merge_prometheus_selector([
570                &self.prometheus_selector,
571                "job=~\"compute|compactor\", type!~\"streaming_upload_write_bytes|streaming_read_read_bytes|streaming_read\""
572            ])
573        );
574        self.write_instant_vector_impl(s, &query, vec!["type", "job", "instance"])
575            .await;
576
577        let _ = writeln!(s);
578        let _ = writeln!(s, "object store operation duration (second)");
579        let query = format!(
580            "histogram_quantile(0.9, sum(rate(object_store_operation_latency_bucket{{{}}}[10m])) by (le, type, job, instance))",
581            merge_prometheus_selector([
582                &self.prometheus_selector,
583                "job=~\"compute|compactor\", type!~\"streaming_upload_write_bytes|streaming_read\""
584            ])
585        );
586        self.write_instant_vector_impl(s, &query, vec!["type", "job", "instance"])
587            .await;
588    }
589
590    #[cfg_attr(coverage, coverage(off))]
591    async fn write_instant_vector_impl(&self, s: &mut String, query: &str, labels: Vec<&str>) {
592        let Some(ref client) = self.prometheus_client else {
593            return;
594        };
595        if let Ok(Vector(instant_vec)) = client
596            .query(query)
597            .get()
598            .await
599            .map(|result| result.into_inner().0)
600        {
601            for i in instant_vec {
602                let l = labels
603                    .iter()
604                    .map(|label| {
605                        format!(
606                            "{}={}",
607                            *label,
608                            i.metric()
609                                .get(*label)
610                                .map(|s| s.as_str())
611                                .unwrap_or_default()
612                        )
613                    })
614                    .join(",");
615                let _ = writeln!(s, "{}: {:.3}", l, i.sample().value());
616            }
617        }
618    }
619
620    #[cfg_attr(coverage, coverage(off))]
621    async fn write_await_tree(&self, s: &mut String, actor_traces_format: ActorTracesFormat) {
622        let all = dump_cluster_await_tree(
623            &self.metadata_manager,
624            &self.await_tree_reg,
625            actor_traces_format,
626        )
627        .await;
628
629        if let Ok(all) = all {
630            write!(s, "{}", all.output()).unwrap();
631        } else {
632            tracing::warn!("failed to dump await tree");
633        }
634    }
635
636    async fn write_table_definition(&self, s: &mut String) -> MetaResult<()> {
637        let sources = self
638            .metadata_manager
639            .catalog_controller
640            .list_sources()
641            .await?
642            .into_iter()
643            .map(|s| (s.id, (s.name, s.schema_id, s.definition)))
644            .collect::<BTreeMap<_, _>>();
645        let tables = self
646            .metadata_manager
647            .catalog_controller
648            .list_tables_by_type(TableType::Table)
649            .await?
650            .into_iter()
651            .map(|t| (t.id, (t.name, t.schema_id, t.definition)))
652            .collect::<BTreeMap<_, _>>();
653        let mvs = self
654            .metadata_manager
655            .catalog_controller
656            .list_tables_by_type(TableType::MaterializedView)
657            .await?
658            .into_iter()
659            .map(|t| (t.id, (t.name, t.schema_id, t.definition)))
660            .collect::<BTreeMap<_, _>>();
661        let indexes = self
662            .metadata_manager
663            .catalog_controller
664            .list_tables_by_type(TableType::Index)
665            .await?
666            .into_iter()
667            .map(|t| (t.id, (t.name, t.schema_id, t.definition)))
668            .collect::<BTreeMap<_, _>>();
669        let sinks = self
670            .metadata_manager
671            .catalog_controller
672            .list_sinks()
673            .await?
674            .into_iter()
675            .map(|s| (s.id, (s.name, s.schema_id, s.definition)))
676            .collect::<BTreeMap<_, _>>();
677        let catalogs = [
678            ("SOURCE", sources),
679            ("TABLE", tables),
680            ("MATERIALIZED VIEW", mvs),
681            ("INDEX", indexes),
682            ("SINK", sinks),
683        ];
684        let mut obj_id_to_name = HashMap::new();
685        for (title, items) in catalogs {
686            use comfy_table::{Row, Table};
687            let mut table = Table::new();
688            table.set_header({
689                let mut row = Row::new();
690                row.add_cell("id".into());
691                row.add_cell("name".into());
692                row.add_cell("schema_id".into());
693                row.add_cell("definition".into());
694                row
695            });
696            for (id, (name, schema_id, definition)) in items {
697                obj_id_to_name.insert(id, name.clone());
698                let mut row = Row::new();
699                let may_redact = redact_sql(&definition, self.redact_sql_option_keywords.clone())
700                    .unwrap_or_else(|| "[REDACTED]".into());
701                row.add_cell(id.into());
702                row.add_cell(name.into());
703                row.add_cell(schema_id.into());
704                row.add_cell(may_redact.into());
705                table.add_row(row);
706            }
707            let _ = writeln!(s);
708            let _ = writeln!(s, "{title}");
709            let _ = writeln!(s, "{table}");
710        }
711
712        let actors = self
713            .metadata_manager
714            .catalog_controller
715            .list_actor_info()
716            .await?
717            .into_iter()
718            .map(|(actor_id, fragment_id, job_id, schema_id, obj_type)| {
719                (
720                    actor_id,
721                    (
722                        fragment_id,
723                        job_id,
724                        schema_id,
725                        obj_type,
726                        obj_id_to_name
727                            .get(&(job_id as _))
728                            .cloned()
729                            .unwrap_or_default(),
730                    ),
731                )
732            })
733            .collect::<BTreeMap<_, _>>();
734
735        use comfy_table::{Row, Table};
736        let mut table = Table::new();
737        table.set_header({
738            let mut row = Row::new();
739            row.add_cell("id".into());
740            row.add_cell("fragment_id".into());
741            row.add_cell("job_id".into());
742            row.add_cell("schema_id".into());
743            row.add_cell("type".into());
744            row.add_cell("name".into());
745            row
746        });
747        for (actor_id, (fragment_id, job_id, schema_id, ddl_type, name)) in actors {
748            let mut row = Row::new();
749            row.add_cell(actor_id.into());
750            row.add_cell(fragment_id.into());
751            row.add_cell(job_id.into());
752            row.add_cell(schema_id.into());
753            row.add_cell(ddl_type.as_str().into());
754            row.add_cell(name.into());
755            table.add_row(row);
756        }
757        let _ = writeln!(s);
758        let _ = writeln!(s, "ACTOR");
759        let _ = writeln!(s, "{table}");
760        Ok(())
761    }
762}
763
764#[cfg_attr(coverage, coverage(off))]
765fn try_add_cell<T: Into<comfy_table::Cell>>(row: &mut comfy_table::Row, t: Option<T>) {
766    match t {
767        Some(t) => {
768            row.add_cell(t.into());
769        }
770        None => {
771            row.add_cell("".into());
772        }
773    }
774}
775
776#[cfg_attr(coverage, coverage(off))]
777fn merge_prometheus_selector<'a>(selectors: impl IntoIterator<Item = &'a str>) -> String {
778    selectors.into_iter().filter(|s| !s.is_empty()).join(",")
779}
780
781fn redact_sql(sql: &str, keywords: RedactSqlOptionKeywordsRef) -> Option<String> {
782    match Parser::parse_sql(sql) {
783        Ok(sqls) => Some(
784            sqls.into_iter()
785                .map(|sql| sql.to_redacted_string(keywords.clone()))
786                .join(";"),
787        ),
788        Err(_) => None,
789    }
790}