risingwave_meta/manager/
diagnose.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::{Ordering, Reverse};
16use std::collections::{BTreeMap, BinaryHeap, HashMap};
17use std::fmt::Write;
18use std::sync::Arc;
19
20use itertools::Itertools;
21use prometheus_http_query::response::Data::Vector;
22use risingwave_common::types::Timestamptz;
23use risingwave_common::util::StackTraceResponseExt;
24use risingwave_hummock_sdk::HummockSstableId;
25use risingwave_hummock_sdk::level::Level;
26use risingwave_meta_model::table::TableType;
27use risingwave_pb::common::WorkerType;
28use risingwave_pb::meta::EventLog;
29use risingwave_pb::meta::event_log::Event;
30use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
31use risingwave_sqlparser::ast::RedactSqlOptionKeywordsRef;
32use risingwave_sqlparser::parser::Parser;
33use serde_json::json;
34use thiserror_ext::AsReport;
35
36use crate::MetaResult;
37use crate::hummock::HummockManagerRef;
38use crate::manager::MetadataManager;
39use crate::manager::event_log::EventLogManagerRef;
40use crate::rpc::await_tree::dump_cluster_await_tree;
41
42pub type DiagnoseCommandRef = Arc<DiagnoseCommand>;
43
44pub struct DiagnoseCommand {
45    metadata_manager: MetadataManager,
46    await_tree_reg: await_tree::Registry,
47    hummock_manger: HummockManagerRef,
48    event_log_manager: EventLogManagerRef,
49    prometheus_client: Option<prometheus_http_query::Client>,
50    prometheus_selector: String,
51    redact_sql_option_keywords: RedactSqlOptionKeywordsRef,
52}
53
54impl DiagnoseCommand {
55    pub fn new(
56        metadata_manager: MetadataManager,
57        await_tree_reg: await_tree::Registry,
58        hummock_manger: HummockManagerRef,
59        event_log_manager: EventLogManagerRef,
60        prometheus_client: Option<prometheus_http_query::Client>,
61        prometheus_selector: String,
62        redact_sql_option_keywords: RedactSqlOptionKeywordsRef,
63    ) -> Self {
64        Self {
65            metadata_manager,
66            await_tree_reg,
67            hummock_manger,
68            event_log_manager,
69            prometheus_client,
70            prometheus_selector,
71            redact_sql_option_keywords,
72        }
73    }
74
75    pub async fn report(&self, actor_traces_format: ActorTracesFormat) -> String {
76        let mut report = String::new();
77        let _ = writeln!(
78            report,
79            "report created at: {}\nversion: {}",
80            chrono::DateTime::<chrono::offset::Utc>::from(std::time::SystemTime::now()),
81            risingwave_common::current_cluster_version(),
82        );
83        let _ = writeln!(report);
84        self.write_catalog(&mut report).await;
85        let _ = writeln!(report);
86        self.write_worker_nodes(&mut report).await;
87        let _ = writeln!(report);
88        self.write_streaming_prometheus(&mut report).await;
89        let _ = writeln!(report);
90        self.write_storage(&mut report).await;
91        let _ = writeln!(report);
92        self.write_await_tree(&mut report, actor_traces_format)
93            .await;
94        let _ = writeln!(report);
95        self.write_event_logs(&mut report);
96        report
97    }
98
99    async fn write_catalog(&self, s: &mut String) {
100        self.write_catalog_inner(s).await;
101        let _ = self.write_table_definition(s).await.inspect_err(|e| {
102            tracing::warn!(
103                error = e.to_report_string(),
104                "failed to display table definition"
105            )
106        });
107    }
108
109    async fn write_catalog_inner(&self, s: &mut String) {
110        let guard = self
111            .metadata_manager
112            .catalog_controller
113            .get_inner_read_guard()
114            .await;
115        let stat = match guard.stats().await {
116            Ok(stat) => stat,
117            Err(err) => {
118                tracing::warn!(error=?err.as_report(), "failed to get catalog stats");
119                return;
120            }
121        };
122        let _ = writeln!(s, "number of fragment: {}", stat.streaming_job_num);
123        let _ = writeln!(s, "number of actor: {}", stat.actor_num);
124        let _ = writeln!(s, "number of source: {}", stat.source_num);
125        let _ = writeln!(s, "number of table: {}", stat.table_num);
126        let _ = writeln!(s, "number of materialized view: {}", stat.mview_num);
127        let _ = writeln!(s, "number of sink: {}", stat.sink_num);
128        let _ = writeln!(s, "number of index: {}", stat.index_num);
129        let _ = writeln!(s, "number of function: {}", stat.function_num);
130    }
131
132    async fn write_worker_nodes(&self, s: &mut String) {
133        let Ok(worker_actor_count) = self.metadata_manager.worker_actor_count().await else {
134            tracing::warn!("failed to get worker actor count");
135            return;
136        };
137
138        use comfy_table::{Row, Table};
139        let Ok(worker_nodes) = self.metadata_manager.list_worker_node(None, None).await else {
140            tracing::warn!("failed to get worker nodes");
141            return;
142        };
143        let mut table = Table::new();
144        table.set_header({
145            let mut row = Row::new();
146            row.add_cell("id".into());
147            row.add_cell("host".into());
148            row.add_cell("type".into());
149            row.add_cell("state".into());
150            row.add_cell("parallelism".into());
151            row.add_cell("is_streaming".into());
152            row.add_cell("is_serving".into());
153            row.add_cell("rw_version".into());
154            row.add_cell("total_memory_bytes".into());
155            row.add_cell("total_cpu_cores".into());
156            row.add_cell("started_at".into());
157            row.add_cell("actor_count".into());
158            row
159        });
160        for worker_node in worker_nodes {
161            let mut row = Row::new();
162            row.add_cell(worker_node.id.into());
163            try_add_cell(
164                &mut row,
165                worker_node
166                    .host
167                    .as_ref()
168                    .map(|h| format!("{}:{}", h.host, h.port)),
169            );
170            try_add_cell(
171                &mut row,
172                worker_node.get_type().ok().map(|t| t.as_str_name()),
173            );
174            try_add_cell(
175                &mut row,
176                worker_node.get_state().ok().map(|s| s.as_str_name()),
177            );
178            try_add_cell(&mut row, worker_node.parallelism());
179            try_add_cell(
180                &mut row,
181                worker_node.property.as_ref().map(|p| p.is_streaming),
182            );
183            try_add_cell(
184                &mut row,
185                worker_node.property.as_ref().map(|p| p.is_serving),
186            );
187            try_add_cell(
188                &mut row,
189                worker_node.resource.as_ref().map(|r| r.rw_version.clone()),
190            );
191            try_add_cell(
192                &mut row,
193                worker_node.resource.as_ref().map(|r| r.total_memory_bytes),
194            );
195            try_add_cell(
196                &mut row,
197                worker_node.resource.as_ref().map(|r| r.total_cpu_cores),
198            );
199            try_add_cell(
200                &mut row,
201                worker_node
202                    .started_at
203                    .and_then(|ts| Timestamptz::from_secs(ts as _).map(|t| t.to_string())),
204            );
205            let actor_count = {
206                if let Ok(t) = worker_node.get_type()
207                    && t != WorkerType::ComputeNode
208                {
209                    None
210                } else {
211                    match worker_actor_count.get(&(worker_node.id as _)) {
212                        None => Some(0),
213                        Some(c) => Some(*c),
214                    }
215                }
216            };
217            try_add_cell(&mut row, actor_count);
218            table.add_row(row);
219        }
220        let _ = writeln!(s, "{table}");
221    }
222
223    fn write_event_logs(&self, s: &mut String) {
224        let event_logs = self
225            .event_log_manager
226            .list_event_logs()
227            .into_iter()
228            .sorted_by(|a, b| {
229                a.timestamp
230                    .unwrap_or(0)
231                    .cmp(&b.timestamp.unwrap_or(0))
232                    .reverse()
233            })
234            .collect_vec();
235
236        let _ = writeln!(s, "latest barrier completions");
237        Self::write_event_logs_impl(
238            s,
239            event_logs.iter(),
240            |e| {
241                let Event::BarrierComplete(info) = e else {
242                    return None;
243                };
244                Some(json!(info).to_string())
245            },
246            Some(10),
247        );
248
249        let _ = writeln!(s);
250        let _ = writeln!(s, "latest barrier collection failures");
251        Self::write_event_logs_impl(
252            s,
253            event_logs.iter(),
254            |e| {
255                let Event::CollectBarrierFail(info) = e else {
256                    return None;
257                };
258                Some(json!(info).to_string())
259            },
260            Some(3),
261        );
262
263        let _ = writeln!(s);
264        let _ = writeln!(s, "latest barrier injection failures");
265        Self::write_event_logs_impl(
266            s,
267            event_logs.iter(),
268            |e| {
269                let Event::InjectBarrierFail(info) = e else {
270                    return None;
271                };
272                Some(json!(info).to_string())
273            },
274            Some(3),
275        );
276
277        let _ = writeln!(s);
278        let _ = writeln!(s, "latest worker node panics");
279        Self::write_event_logs_impl(
280            s,
281            event_logs.iter(),
282            |e| {
283                let Event::WorkerNodePanic(info) = e else {
284                    return None;
285                };
286                Some(json!(info).to_string())
287            },
288            Some(10),
289        );
290
291        let _ = writeln!(s);
292        let _ = writeln!(s, "latest create stream job failures");
293        Self::write_event_logs_impl(
294            s,
295            event_logs.iter(),
296            |e| {
297                let Event::CreateStreamJobFail(info) = e else {
298                    return None;
299                };
300                Some(json!(info).to_string())
301            },
302            Some(3),
303        );
304
305        let _ = writeln!(s);
306        let _ = writeln!(s, "latest dirty stream job clear-ups");
307        Self::write_event_logs_impl(
308            s,
309            event_logs.iter(),
310            |e| {
311                let Event::DirtyStreamJobClear(info) = e else {
312                    return None;
313                };
314                Some(json!(info).to_string())
315            },
316            Some(3),
317        );
318    }
319
320    fn write_event_logs_impl<'a, F>(
321        s: &mut String,
322        event_logs: impl Iterator<Item = &'a EventLog>,
323        get_event_info: F,
324        limit: Option<usize>,
325    ) where
326        F: Fn(&Event) -> Option<String>,
327    {
328        use comfy_table::{Row, Table};
329        let mut table = Table::new();
330        table.set_header({
331            let mut row = Row::new();
332            row.add_cell("created_at".into());
333            row.add_cell("info".into());
334            row
335        });
336        let mut row_count = 0;
337        for event_log in event_logs {
338            let Some(ref inner) = event_log.event else {
339                continue;
340            };
341            if let Some(limit) = limit
342                && row_count >= limit
343            {
344                break;
345            }
346            let mut row = Row::new();
347            let ts = event_log
348                .timestamp
349                .and_then(|ts| Timestamptz::from_millis(ts as _).map(|ts| ts.to_string()));
350            try_add_cell(&mut row, ts);
351            if let Some(info) = get_event_info(inner) {
352                row.add_cell(info.into());
353                row_count += 1;
354            } else {
355                continue;
356            }
357            table.add_row(row);
358        }
359        let _ = writeln!(s, "{table}");
360    }
361
362    async fn write_storage(&self, s: &mut String) {
363        let mut sst_num = 0;
364        let mut sst_total_file_size = 0;
365        let back_pressured_compaction_groups = self
366            .hummock_manger
367            .write_limits()
368            .await
369            .into_iter()
370            .filter_map(|(k, v)| {
371                if v.table_ids.is_empty() {
372                    None
373                } else {
374                    Some(k)
375                }
376            })
377            .join(",");
378        if !back_pressured_compaction_groups.is_empty() {
379            let _ = writeln!(
380                s,
381                "back pressured compaction groups: {back_pressured_compaction_groups}"
382            );
383        }
384
385        #[derive(PartialEq, Eq)]
386        struct SstableSort {
387            compaction_group_id: u64,
388            sst_id: HummockSstableId,
389            delete_ratio: u64,
390        }
391        impl PartialOrd for SstableSort {
392            fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
393                Some(self.cmp(other))
394            }
395        }
396        impl Ord for SstableSort {
397            fn cmp(&self, other: &Self) -> Ordering {
398                self.delete_ratio.cmp(&other.delete_ratio)
399            }
400        }
401        fn top_k_sstables(
402            top_k: usize,
403            heap: &mut BinaryHeap<Reverse<SstableSort>>,
404            e: SstableSort,
405        ) {
406            if heap.len() < top_k {
407                heap.push(Reverse(e));
408            } else if let Some(mut p) = heap.peek_mut()
409                && e.delete_ratio > p.0.delete_ratio
410            {
411                *p = Reverse(e);
412            }
413        }
414
415        let top_k = 10;
416        let mut top_tombstone_delete_sst = BinaryHeap::with_capacity(top_k);
417        let compaction_group_num = self
418            .hummock_manger
419            .on_current_version(|version| {
420                for compaction_group in version.levels.values() {
421                    let mut visit_level = |level: &Level| {
422                        sst_num += level.table_infos.len();
423                        sst_total_file_size +=
424                            level.table_infos.iter().map(|t| t.sst_size).sum::<u64>();
425                        for sst in &level.table_infos {
426                            if sst.total_key_count == 0 {
427                                continue;
428                            }
429                            let tombstone_delete_ratio =
430                                sst.stale_key_count * 10000 / sst.total_key_count;
431                            let e = SstableSort {
432                                compaction_group_id: compaction_group.group_id,
433                                sst_id: sst.sst_id,
434                                delete_ratio: tombstone_delete_ratio,
435                            };
436                            top_k_sstables(top_k, &mut top_tombstone_delete_sst, e);
437                        }
438                    };
439                    let l0 = &compaction_group.l0;
440                    // FIXME: why chaining levels iter leads to segmentation fault?
441                    for level in &l0.sub_levels {
442                        visit_level(level);
443                    }
444                    for level in &compaction_group.levels {
445                        visit_level(level);
446                    }
447                }
448                version.levels.len()
449            })
450            .await;
451
452        let _ = writeln!(s, "number of SSTables: {sst_num}");
453        let _ = writeln!(s, "total size of SSTables (byte): {sst_total_file_size}");
454        let _ = writeln!(s, "number of compaction groups: {compaction_group_num}");
455        use comfy_table::{Row, Table};
456        fn format_table(heap: BinaryHeap<Reverse<SstableSort>>) -> Table {
457            let mut table = Table::new();
458            table.set_header({
459                let mut row = Row::new();
460                row.add_cell("compaction group id".into());
461                row.add_cell("sstable id".into());
462                row.add_cell("delete ratio".into());
463                row
464            });
465            for sst in &heap.into_sorted_vec() {
466                let mut row = Row::new();
467                row.add_cell(sst.0.compaction_group_id.into());
468                row.add_cell(sst.0.sst_id.into());
469                row.add_cell(format!("{:.2}%", sst.0.delete_ratio as f64 / 100.0).into());
470                table.add_row(row);
471            }
472            table
473        }
474        let _ = writeln!(s);
475        let _ = writeln!(s, "top tombstone delete ratio");
476        let _ = writeln!(s, "{}", format_table(top_tombstone_delete_sst));
477        let _ = writeln!(s);
478
479        let _ = writeln!(s);
480        self.write_storage_prometheus(s).await;
481    }
482
483    async fn write_streaming_prometheus(&self, s: &mut String) {
484        let _ = writeln!(s, "top sources by throughput (rows/s)");
485        let query = format!(
486            "topk(3, sum(rate(stream_source_output_rows_counts{{{}}}[10m]))by (source_name))",
487            self.prometheus_selector
488        );
489        self.write_instant_vector_impl(s, &query, vec!["source_name"])
490            .await;
491
492        let _ = writeln!(s);
493        let _ = writeln!(s, "top materialized views by throughput (rows/s)");
494        let query = format!(
495            "topk(3, sum(rate(stream_mview_input_row_count{{{}}}[10m]))by (table_id))",
496            self.prometheus_selector
497        );
498        self.write_instant_vector_impl(s, &query, vec!["table_id"])
499            .await;
500
501        let _ = writeln!(s);
502        let _ = writeln!(s, "top join executor by matched rows");
503        let query = format!(
504            "topk(3, histogram_quantile(0.9, sum(rate(stream_join_matched_join_keys_bucket{{{}}}[10m])) by (le, fragment_id, table_id)))",
505            self.prometheus_selector
506        );
507        self.write_instant_vector_impl(s, &query, vec!["table_id", "fragment_id"])
508            .await;
509    }
510
511    async fn write_storage_prometheus(&self, s: &mut String) {
512        let _ = writeln!(s, "top Hummock Get by duration (second)");
513        let query = format!(
514            "topk(3, histogram_quantile(0.9, sum(rate(state_store_get_duration_bucket{{{}}}[10m])) by (le, table_id)))",
515            self.prometheus_selector
516        );
517        self.write_instant_vector_impl(s, &query, vec!["table_id"])
518            .await;
519
520        let _ = writeln!(s);
521        let _ = writeln!(s, "top Hummock Iter Init by duration (second)");
522        let query = format!(
523            "topk(3, histogram_quantile(0.9, sum(rate(state_store_iter_init_duration_bucket{{{}}}[10m])) by (le, table_id)))",
524            self.prometheus_selector
525        );
526        self.write_instant_vector_impl(s, &query, vec!["table_id"])
527            .await;
528
529        let _ = writeln!(s);
530        let _ = writeln!(s, "top table commit flush by size (byte)");
531        let query = format!(
532            "topk(3, sum(rate(storage_commit_write_throughput{{{}}}[10m])) by (table_id))",
533            self.prometheus_selector
534        );
535        self.write_instant_vector_impl(s, &query, vec!["table_id"])
536            .await;
537
538        let _ = writeln!(s);
539        let _ = writeln!(s, "object store read throughput (bytes/s)");
540        let query = format!(
541            "sum(rate(object_store_read_bytes{{{}}}[10m])) by (job, instance)",
542            merge_prometheus_selector([&self.prometheus_selector, "job=~\"compute|compactor\""])
543        );
544        self.write_instant_vector_impl(s, &query, vec!["job", "instance"])
545            .await;
546
547        let _ = writeln!(s);
548        let _ = writeln!(s, "object store write throughput (bytes/s)");
549        let query = format!(
550            "sum(rate(object_store_write_bytes{{{}}}[10m])) by (job, instance)",
551            merge_prometheus_selector([&self.prometheus_selector, "job=~\"compute|compactor\""])
552        );
553        self.write_instant_vector_impl(s, &query, vec!["job", "instance"])
554            .await;
555
556        let _ = writeln!(s);
557        let _ = writeln!(s, "object store operation rate");
558        let query = format!(
559            "sum(rate(object_store_operation_latency_count{{{}}}[10m])) by (le, type, job, instance)",
560            merge_prometheus_selector([
561                &self.prometheus_selector,
562                "job=~\"compute|compactor\", type!~\"streaming_upload_write_bytes|streaming_read_read_bytes|streaming_read\""
563            ])
564        );
565        self.write_instant_vector_impl(s, &query, vec!["type", "job", "instance"])
566            .await;
567
568        let _ = writeln!(s);
569        let _ = writeln!(s, "object store operation duration (second)");
570        let query = format!(
571            "histogram_quantile(0.9, sum(rate(object_store_operation_latency_bucket{{{}}}[10m])) by (le, type, job, instance))",
572            merge_prometheus_selector([
573                &self.prometheus_selector,
574                "job=~\"compute|compactor\", type!~\"streaming_upload_write_bytes|streaming_read\""
575            ])
576        );
577        self.write_instant_vector_impl(s, &query, vec!["type", "job", "instance"])
578            .await;
579    }
580
581    async fn write_instant_vector_impl(&self, s: &mut String, query: &str, labels: Vec<&str>) {
582        let Some(ref client) = self.prometheus_client else {
583            return;
584        };
585        if let Ok(Vector(instant_vec)) = client
586            .query(query)
587            .get()
588            .await
589            .map(|result| result.into_inner().0)
590        {
591            for i in instant_vec {
592                let l = labels
593                    .iter()
594                    .map(|label| {
595                        format!(
596                            "{}={}",
597                            *label,
598                            i.metric()
599                                .get(*label)
600                                .map(|s| s.as_str())
601                                .unwrap_or_default()
602                        )
603                    })
604                    .join(",");
605                let _ = writeln!(s, "{}: {:.3}", l, i.sample().value());
606            }
607        }
608    }
609
610    async fn write_await_tree(&self, s: &mut String, actor_traces_format: ActorTracesFormat) {
611        let all = dump_cluster_await_tree(
612            &self.metadata_manager,
613            &self.await_tree_reg,
614            actor_traces_format,
615        )
616        .await;
617
618        if let Ok(all) = all {
619            write!(s, "{}", all.output()).unwrap();
620        } else {
621            tracing::warn!("failed to dump await tree");
622        }
623    }
624
625    async fn write_table_definition(&self, s: &mut String) -> MetaResult<()> {
626        let sources = self
627            .metadata_manager
628            .catalog_controller
629            .list_sources()
630            .await?
631            .into_iter()
632            .map(|s| (s.id, (s.name, s.schema_id, s.definition)))
633            .collect::<BTreeMap<_, _>>();
634        let tables = self
635            .metadata_manager
636            .catalog_controller
637            .list_tables_by_type(TableType::Table)
638            .await?
639            .into_iter()
640            .map(|t| (t.id, (t.name, t.schema_id, t.definition)))
641            .collect::<BTreeMap<_, _>>();
642        let mvs = self
643            .metadata_manager
644            .catalog_controller
645            .list_tables_by_type(TableType::MaterializedView)
646            .await?
647            .into_iter()
648            .map(|t| (t.id, (t.name, t.schema_id, t.definition)))
649            .collect::<BTreeMap<_, _>>();
650        let indexes = self
651            .metadata_manager
652            .catalog_controller
653            .list_tables_by_type(TableType::Index)
654            .await?
655            .into_iter()
656            .map(|t| (t.id, (t.name, t.schema_id, t.definition)))
657            .collect::<BTreeMap<_, _>>();
658        let sinks = self
659            .metadata_manager
660            .catalog_controller
661            .list_sinks()
662            .await?
663            .into_iter()
664            .map(|s| (s.id, (s.name, s.schema_id, s.definition)))
665            .collect::<BTreeMap<_, _>>();
666        let catalogs = [
667            ("SOURCE", sources),
668            ("TABLE", tables),
669            ("MATERIALIZED VIEW", mvs),
670            ("INDEX", indexes),
671            ("SINK", sinks),
672        ];
673        let mut obj_id_to_name = HashMap::new();
674        for (title, items) in catalogs {
675            use comfy_table::{Row, Table};
676            let mut table = Table::new();
677            table.set_header({
678                let mut row = Row::new();
679                row.add_cell("id".into());
680                row.add_cell("name".into());
681                row.add_cell("schema_id".into());
682                row.add_cell("definition".into());
683                row
684            });
685            for (id, (name, schema_id, definition)) in items {
686                obj_id_to_name.insert(id, name.clone());
687                let mut row = Row::new();
688                let may_redact = redact_sql(&definition, self.redact_sql_option_keywords.clone())
689                    .unwrap_or_else(|| "[REDACTED]".into());
690                row.add_cell(id.into());
691                row.add_cell(name.into());
692                row.add_cell(schema_id.into());
693                row.add_cell(may_redact.into());
694                table.add_row(row);
695            }
696            let _ = writeln!(s);
697            let _ = writeln!(s, "{title}");
698            let _ = writeln!(s, "{table}");
699        }
700
701        let actors = self
702            .metadata_manager
703            .catalog_controller
704            .list_actor_info()
705            .await?
706            .into_iter()
707            .map(|(actor_id, fragment_id, job_id, schema_id, obj_type)| {
708                (
709                    actor_id,
710                    (
711                        fragment_id,
712                        job_id,
713                        schema_id,
714                        obj_type,
715                        obj_id_to_name
716                            .get(&(job_id as _))
717                            .cloned()
718                            .unwrap_or_default(),
719                    ),
720                )
721            })
722            .collect::<BTreeMap<_, _>>();
723
724        use comfy_table::{Row, Table};
725        let mut table = Table::new();
726        table.set_header({
727            let mut row = Row::new();
728            row.add_cell("id".into());
729            row.add_cell("fragment_id".into());
730            row.add_cell("job_id".into());
731            row.add_cell("schema_id".into());
732            row.add_cell("type".into());
733            row.add_cell("name".into());
734            row
735        });
736        for (actor_id, (fragment_id, job_id, schema_id, ddl_type, name)) in actors {
737            let mut row = Row::new();
738            row.add_cell(actor_id.into());
739            row.add_cell(fragment_id.into());
740            row.add_cell(job_id.into());
741            row.add_cell(schema_id.into());
742            row.add_cell(ddl_type.as_str().into());
743            row.add_cell(name.into());
744            table.add_row(row);
745        }
746        let _ = writeln!(s);
747        let _ = writeln!(s, "ACTOR");
748        let _ = writeln!(s, "{table}");
749        Ok(())
750    }
751}
752
753fn try_add_cell<T: Into<comfy_table::Cell>>(row: &mut comfy_table::Row, t: Option<T>) {
754    match t {
755        Some(t) => {
756            row.add_cell(t.into());
757        }
758        None => {
759            row.add_cell("".into());
760        }
761    }
762}
763
764fn merge_prometheus_selector<'a>(selectors: impl IntoIterator<Item = &'a str>) -> String {
765    selectors.into_iter().filter(|s| !s.is_empty()).join(",")
766}
767
768fn redact_sql(sql: &str, keywords: RedactSqlOptionKeywordsRef) -> Option<String> {
769    match Parser::parse_sql(sql) {
770        Ok(sqls) => Some(
771            sqls.into_iter()
772                .map(|sql| sql.to_redacted_string(keywords.clone()))
773                .join(";"),
774        ),
775        Err(_) => None,
776    }
777}