risingwave_meta/manager/
diagnose.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::{Ordering, Reverse};
16use std::collections::{BTreeMap, BinaryHeap, HashMap};
17use std::fmt::Write;
18use std::sync::Arc;
19
20use itertools::Itertools;
21use prometheus_http_query::response::Data::Vector;
22use risingwave_common::types::Timestamptz;
23use risingwave_common::util::StackTraceResponseExt;
24use risingwave_hummock_sdk::level::Level;
25use risingwave_meta_model::table::TableType;
26use risingwave_pb::common::WorkerType;
27use risingwave_pb::meta::EventLog;
28use risingwave_pb::meta::event_log::Event;
29use risingwave_pb::monitor_service::StackTraceResponse;
30use risingwave_rpc_client::ComputeClientPool;
31use risingwave_sqlparser::ast::{CompatibleFormatEncode, Statement, Value};
32use risingwave_sqlparser::parser::Parser;
33use serde_json::json;
34use thiserror_ext::AsReport;
35
36use crate::MetaResult;
37use crate::hummock::HummockManagerRef;
38use crate::manager::MetadataManager;
39use crate::manager::event_log::EventLogManagerRef;
40
41pub type DiagnoseCommandRef = Arc<DiagnoseCommand>;
42
43pub struct DiagnoseCommand {
44    metadata_manager: MetadataManager,
45    hummock_manger: HummockManagerRef,
46    event_log_manager: EventLogManagerRef,
47    prometheus_client: Option<prometheus_http_query::Client>,
48    prometheus_selector: String,
49}
50
51impl DiagnoseCommand {
52    pub fn new(
53        metadata_manager: MetadataManager,
54        hummock_manger: HummockManagerRef,
55        event_log_manager: EventLogManagerRef,
56        prometheus_client: Option<prometheus_http_query::Client>,
57        prometheus_selector: String,
58    ) -> Self {
59        Self {
60            metadata_manager,
61            hummock_manger,
62            event_log_manager,
63            prometheus_client,
64            prometheus_selector,
65        }
66    }
67
68    #[cfg_attr(coverage, coverage(off))]
69    pub async fn report(&self) -> String {
70        let mut report = String::new();
71        let _ = writeln!(
72            report,
73            "report created at: {}\nversion: {}",
74            chrono::DateTime::<chrono::offset::Utc>::from(std::time::SystemTime::now()),
75            risingwave_common::current_cluster_version(),
76        );
77        let _ = writeln!(report);
78        self.write_catalog(&mut report).await;
79        let _ = writeln!(report);
80        self.write_worker_nodes(&mut report).await;
81        let _ = writeln!(report);
82        self.write_streaming_prometheus(&mut report).await;
83        let _ = writeln!(report);
84        self.write_storage(&mut report).await;
85        let _ = writeln!(report);
86        self.write_await_tree(&mut report).await;
87        let _ = writeln!(report);
88        self.write_event_logs(&mut report);
89        report
90    }
91
92    #[cfg_attr(coverage, coverage(off))]
93    async fn write_catalog(&self, s: &mut String) {
94        self.write_catalog_inner(s).await;
95        let _ = self.write_table_definition(s).await.inspect_err(|e| {
96            tracing::warn!(
97                error = e.to_report_string(),
98                "failed to display table definition"
99            )
100        });
101    }
102
103    #[cfg_attr(coverage, coverage(off))]
104    async fn write_catalog_inner(&self, s: &mut String) {
105        let guard = self
106            .metadata_manager
107            .catalog_controller
108            .get_inner_read_guard()
109            .await;
110        let stat = match guard.stats().await {
111            Ok(stat) => stat,
112            Err(err) => {
113                tracing::warn!(error=?err.as_report(), "failed to get catalog stats");
114                return;
115            }
116        };
117        let _ = writeln!(s, "number of fragment: {}", stat.streaming_job_num);
118        let _ = writeln!(s, "number of actor: {}", stat.actor_num);
119        let _ = writeln!(s, "number of source: {}", stat.source_num);
120        let _ = writeln!(s, "number of table: {}", stat.table_num);
121        let _ = writeln!(s, "number of materialized view: {}", stat.mview_num);
122        let _ = writeln!(s, "number of sink: {}", stat.sink_num);
123        let _ = writeln!(s, "number of index: {}", stat.index_num);
124        let _ = writeln!(s, "number of function: {}", stat.function_num);
125    }
126
127    #[cfg_attr(coverage, coverage(off))]
128    async fn write_worker_nodes(&self, s: &mut String) {
129        let Ok(worker_actor_count) = self.metadata_manager.worker_actor_count().await else {
130            tracing::warn!("failed to get worker actor count");
131            return;
132        };
133
134        use comfy_table::{Row, Table};
135        let Ok(worker_nodes) = self.metadata_manager.list_worker_node(None, None).await else {
136            tracing::warn!("failed to get worker nodes");
137            return;
138        };
139        let mut table = Table::new();
140        table.set_header({
141            let mut row = Row::new();
142            row.add_cell("id".into());
143            row.add_cell("host".into());
144            row.add_cell("type".into());
145            row.add_cell("state".into());
146            row.add_cell("parallelism".into());
147            row.add_cell("is_streaming".into());
148            row.add_cell("is_serving".into());
149            row.add_cell("rw_version".into());
150            row.add_cell("total_memory_bytes".into());
151            row.add_cell("total_cpu_cores".into());
152            row.add_cell("started_at".into());
153            row.add_cell("actor_count".into());
154            row
155        });
156        for worker_node in worker_nodes {
157            let mut row = Row::new();
158            row.add_cell(worker_node.id.into());
159            try_add_cell(
160                &mut row,
161                worker_node
162                    .host
163                    .as_ref()
164                    .map(|h| format!("{}:{}", h.host, h.port)),
165            );
166            try_add_cell(
167                &mut row,
168                worker_node.get_type().ok().map(|t| t.as_str_name()),
169            );
170            try_add_cell(
171                &mut row,
172                worker_node.get_state().ok().map(|s| s.as_str_name()),
173            );
174            try_add_cell(&mut row, worker_node.parallelism());
175            try_add_cell(
176                &mut row,
177                worker_node.property.as_ref().map(|p| p.is_streaming),
178            );
179            try_add_cell(
180                &mut row,
181                worker_node.property.as_ref().map(|p| p.is_serving),
182            );
183            try_add_cell(
184                &mut row,
185                worker_node.resource.as_ref().map(|r| r.rw_version.clone()),
186            );
187            try_add_cell(
188                &mut row,
189                worker_node.resource.as_ref().map(|r| r.total_memory_bytes),
190            );
191            try_add_cell(
192                &mut row,
193                worker_node.resource.as_ref().map(|r| r.total_cpu_cores),
194            );
195            try_add_cell(
196                &mut row,
197                worker_node
198                    .started_at
199                    .and_then(|ts| Timestamptz::from_secs(ts as _).map(|t| t.to_string())),
200            );
201            let actor_count = {
202                if let Ok(t) = worker_node.get_type()
203                    && t != WorkerType::ComputeNode
204                {
205                    None
206                } else {
207                    match worker_actor_count.get(&(worker_node.id as _)) {
208                        None => Some(0),
209                        Some(c) => Some(*c),
210                    }
211                }
212            };
213            try_add_cell(&mut row, actor_count);
214            table.add_row(row);
215        }
216        let _ = writeln!(s, "{table}");
217    }
218
219    #[cfg_attr(coverage, coverage(off))]
220    fn write_event_logs(&self, s: &mut String) {
221        let event_logs = self
222            .event_log_manager
223            .list_event_logs()
224            .into_iter()
225            .sorted_by(|a, b| {
226                a.timestamp
227                    .unwrap_or(0)
228                    .cmp(&b.timestamp.unwrap_or(0))
229                    .reverse()
230            })
231            .collect_vec();
232
233        let _ = writeln!(s, "latest barrier completions");
234        Self::write_event_logs_impl(
235            s,
236            event_logs.iter(),
237            |e| {
238                let Event::BarrierComplete(info) = e else {
239                    return None;
240                };
241                Some(json!(info).to_string())
242            },
243            Some(10),
244        );
245
246        let _ = writeln!(s);
247        let _ = writeln!(s, "latest barrier collection failures");
248        Self::write_event_logs_impl(
249            s,
250            event_logs.iter(),
251            |e| {
252                let Event::CollectBarrierFail(info) = e else {
253                    return None;
254                };
255                Some(json!(info).to_string())
256            },
257            Some(3),
258        );
259
260        let _ = writeln!(s);
261        let _ = writeln!(s, "latest barrier injection failures");
262        Self::write_event_logs_impl(
263            s,
264            event_logs.iter(),
265            |e| {
266                let Event::InjectBarrierFail(info) = e else {
267                    return None;
268                };
269                Some(json!(info).to_string())
270            },
271            Some(3),
272        );
273
274        let _ = writeln!(s);
275        let _ = writeln!(s, "latest worker node panics");
276        Self::write_event_logs_impl(
277            s,
278            event_logs.iter(),
279            |e| {
280                let Event::WorkerNodePanic(info) = e else {
281                    return None;
282                };
283                Some(json!(info).to_string())
284            },
285            Some(10),
286        );
287
288        let _ = writeln!(s);
289        let _ = writeln!(s, "latest create stream job failures");
290        Self::write_event_logs_impl(
291            s,
292            event_logs.iter(),
293            |e| {
294                let Event::CreateStreamJobFail(info) = e else {
295                    return None;
296                };
297                Some(json!(info).to_string())
298            },
299            Some(3),
300        );
301
302        let _ = writeln!(s);
303        let _ = writeln!(s, "latest dirty stream job clear-ups");
304        Self::write_event_logs_impl(
305            s,
306            event_logs.iter(),
307            |e| {
308                let Event::DirtyStreamJobClear(info) = e else {
309                    return None;
310                };
311                Some(json!(info).to_string())
312            },
313            Some(3),
314        );
315    }
316
317    #[cfg_attr(coverage, coverage(off))]
318    fn write_event_logs_impl<'a, F>(
319        s: &mut String,
320        event_logs: impl Iterator<Item = &'a EventLog>,
321        get_event_info: F,
322        limit: Option<usize>,
323    ) where
324        F: Fn(&Event) -> Option<String>,
325    {
326        use comfy_table::{Row, Table};
327        let mut table = Table::new();
328        table.set_header({
329            let mut row = Row::new();
330            row.add_cell("created_at".into());
331            row.add_cell("info".into());
332            row
333        });
334        let mut row_count = 0;
335        for event_log in event_logs {
336            let Some(ref inner) = event_log.event else {
337                continue;
338            };
339            if let Some(limit) = limit
340                && row_count >= limit
341            {
342                break;
343            }
344            let mut row = Row::new();
345            let ts = event_log
346                .timestamp
347                .and_then(|ts| Timestamptz::from_millis(ts as _).map(|ts| ts.to_string()));
348            try_add_cell(&mut row, ts);
349            if let Some(info) = get_event_info(inner) {
350                row.add_cell(info.into());
351                row_count += 1;
352            } else {
353                continue;
354            }
355            table.add_row(row);
356        }
357        let _ = writeln!(s, "{table}");
358    }
359
360    #[cfg_attr(coverage, coverage(off))]
361    async fn write_storage(&self, s: &mut String) {
362        let mut sst_num = 0;
363        let mut sst_total_file_size = 0;
364        let back_pressured_compaction_groups = self
365            .hummock_manger
366            .write_limits()
367            .await
368            .into_iter()
369            .filter_map(|(k, v)| {
370                if v.table_ids.is_empty() {
371                    None
372                } else {
373                    Some(k)
374                }
375            })
376            .join(",");
377        if !back_pressured_compaction_groups.is_empty() {
378            let _ = writeln!(
379                s,
380                "back pressured compaction groups: {back_pressured_compaction_groups}"
381            );
382        }
383
384        #[derive(PartialEq, Eq)]
385        struct SstableSort {
386            compaction_group_id: u64,
387            sst_id: u64,
388            delete_ratio: u64,
389        }
390        impl PartialOrd for SstableSort {
391            fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
392                Some(self.cmp(other))
393            }
394        }
395        impl Ord for SstableSort {
396            fn cmp(&self, other: &Self) -> Ordering {
397                self.delete_ratio.cmp(&other.delete_ratio)
398            }
399        }
400        fn top_k_sstables(
401            top_k: usize,
402            heap: &mut BinaryHeap<Reverse<SstableSort>>,
403            e: SstableSort,
404        ) {
405            if heap.len() < top_k {
406                heap.push(Reverse(e));
407            } else if let Some(mut p) = heap.peek_mut() {
408                if e.delete_ratio > p.0.delete_ratio {
409                    *p = Reverse(e);
410                }
411            }
412        }
413
414        let top_k = 10;
415        let mut top_tombstone_delete_sst = BinaryHeap::with_capacity(top_k);
416        let compaction_group_num = self
417            .hummock_manger
418            .on_current_version(|version| {
419                for compaction_group in version.levels.values() {
420                    let mut visit_level = |level: &Level| {
421                        sst_num += level.table_infos.len();
422                        sst_total_file_size +=
423                            level.table_infos.iter().map(|t| t.sst_size).sum::<u64>();
424                        for sst in &level.table_infos {
425                            if sst.total_key_count == 0 {
426                                continue;
427                            }
428                            let tombstone_delete_ratio =
429                                sst.stale_key_count * 10000 / sst.total_key_count;
430                            let e = SstableSort {
431                                compaction_group_id: compaction_group.group_id,
432                                sst_id: sst.sst_id,
433                                delete_ratio: tombstone_delete_ratio,
434                            };
435                            top_k_sstables(top_k, &mut top_tombstone_delete_sst, e);
436                        }
437                    };
438                    let l0 = &compaction_group.l0;
439                    // FIXME: why chaining levels iter leads to segmentation fault?
440                    for level in &l0.sub_levels {
441                        visit_level(level);
442                    }
443                    for level in &compaction_group.levels {
444                        visit_level(level);
445                    }
446                }
447                version.levels.len()
448            })
449            .await;
450
451        let _ = writeln!(s, "number of SSTables: {sst_num}");
452        let _ = writeln!(s, "total size of SSTables (byte): {sst_total_file_size}");
453        let _ = writeln!(s, "number of compaction groups: {compaction_group_num}");
454        use comfy_table::{Row, Table};
455        fn format_table(heap: BinaryHeap<Reverse<SstableSort>>) -> Table {
456            let mut table = Table::new();
457            table.set_header({
458                let mut row = Row::new();
459                row.add_cell("compaction group id".into());
460                row.add_cell("sstable id".into());
461                row.add_cell("delete ratio".into());
462                row
463            });
464            for sst in &heap.into_sorted_vec() {
465                let mut row = Row::new();
466                row.add_cell(sst.0.compaction_group_id.into());
467                row.add_cell(sst.0.sst_id.into());
468                row.add_cell(format!("{:.2}%", sst.0.delete_ratio as f64 / 100.0).into());
469                table.add_row(row);
470            }
471            table
472        }
473        let _ = writeln!(s);
474        let _ = writeln!(s, "top tombstone delete ratio");
475        let _ = writeln!(s, "{}", format_table(top_tombstone_delete_sst));
476        let _ = writeln!(s);
477
478        let _ = writeln!(s);
479        self.write_storage_prometheus(s).await;
480    }
481
482    #[cfg_attr(coverage, coverage(off))]
483    async fn write_streaming_prometheus(&self, s: &mut String) {
484        let _ = writeln!(s, "top sources by throughput (rows/s)");
485        let query = format!(
486            "topk(3, sum(rate(stream_source_output_rows_counts{{{}}}[10m]))by (source_name))",
487            self.prometheus_selector
488        );
489        self.write_instant_vector_impl(s, &query, vec!["source_name"])
490            .await;
491
492        let _ = writeln!(s);
493        let _ = writeln!(s, "top materialized views by throughput (rows/s)");
494        let query = format!(
495            "topk(3, sum(rate(stream_mview_input_row_count{{{}}}[10m]))by (table_id))",
496            self.prometheus_selector
497        );
498        self.write_instant_vector_impl(s, &query, vec!["table_id"])
499            .await;
500
501        let _ = writeln!(s);
502        let _ = writeln!(s, "top join executor by matched rows");
503        let query = format!(
504            "topk(3, histogram_quantile(0.9, sum(rate(stream_join_matched_join_keys_bucket{{{}}}[10m])) by (le, fragment_id, table_id)))",
505            self.prometheus_selector
506        );
507        self.write_instant_vector_impl(s, &query, vec!["table_id", "fragment_id"])
508            .await;
509    }
510
511    #[cfg_attr(coverage, coverage(off))]
512    async fn write_storage_prometheus(&self, s: &mut String) {
513        let _ = writeln!(s, "top Hummock Get by duration (second)");
514        let query = format!(
515            "topk(3, histogram_quantile(0.9, sum(rate(state_store_get_duration_bucket{{{}}}[10m])) by (le, table_id)))",
516            self.prometheus_selector
517        );
518        self.write_instant_vector_impl(s, &query, vec!["table_id"])
519            .await;
520
521        let _ = writeln!(s);
522        let _ = writeln!(s, "top Hummock Iter Init by duration (second)");
523        let query = format!(
524            "topk(3, histogram_quantile(0.9, sum(rate(state_store_iter_init_duration_bucket{{{}}}[10m])) by (le, table_id)))",
525            self.prometheus_selector
526        );
527        self.write_instant_vector_impl(s, &query, vec!["table_id"])
528            .await;
529
530        let _ = writeln!(s);
531        let _ = writeln!(s, "top table commit flush by size (byte)");
532        let query = format!(
533            "topk(3, sum(rate(storage_commit_write_throughput{{{}}}[10m])) by (table_id))",
534            self.prometheus_selector
535        );
536        self.write_instant_vector_impl(s, &query, vec!["table_id"])
537            .await;
538
539        let _ = writeln!(s);
540        let _ = writeln!(s, "object store read throughput (bytes/s)");
541        let query = format!(
542            "sum(rate(object_store_read_bytes{{{}}}[10m])) by (job, instance)",
543            merge_prometheus_selector([&self.prometheus_selector, "job=~\"compute|compactor\""])
544        );
545        self.write_instant_vector_impl(s, &query, vec!["job", "instance"])
546            .await;
547
548        let _ = writeln!(s);
549        let _ = writeln!(s, "object store write throughput (bytes/s)");
550        let query = format!(
551            "sum(rate(object_store_write_bytes{{{}}}[10m])) by (job, instance)",
552            merge_prometheus_selector([&self.prometheus_selector, "job=~\"compute|compactor\""])
553        );
554        self.write_instant_vector_impl(s, &query, vec!["job", "instance"])
555            .await;
556
557        let _ = writeln!(s);
558        let _ = writeln!(s, "object store operation rate");
559        let query = format!(
560            "sum(rate(object_store_operation_latency_count{{{}}}[10m])) by (le, type, job, instance)",
561            merge_prometheus_selector([
562                &self.prometheus_selector,
563                "job=~\"compute|compactor\", type!~\"streaming_upload_write_bytes|streaming_read_read_bytes|streaming_read\""
564            ])
565        );
566        self.write_instant_vector_impl(s, &query, vec!["type", "job", "instance"])
567            .await;
568
569        let _ = writeln!(s);
570        let _ = writeln!(s, "object store operation duration (second)");
571        let query = format!(
572            "histogram_quantile(0.9, sum(rate(object_store_operation_latency_bucket{{{}}}[10m])) by (le, type, job, instance))",
573            merge_prometheus_selector([
574                &self.prometheus_selector,
575                "job=~\"compute|compactor\", type!~\"streaming_upload_write_bytes|streaming_read\""
576            ])
577        );
578        self.write_instant_vector_impl(s, &query, vec!["type", "job", "instance"])
579            .await;
580    }
581
582    #[cfg_attr(coverage, coverage(off))]
583    async fn write_instant_vector_impl(&self, s: &mut String, query: &str, labels: Vec<&str>) {
584        let Some(ref client) = self.prometheus_client else {
585            return;
586        };
587        if let Ok(Vector(instant_vec)) = client
588            .query(query)
589            .get()
590            .await
591            .map(|result| result.into_inner().0)
592        {
593            for i in instant_vec {
594                let l = labels
595                    .iter()
596                    .map(|label| {
597                        format!(
598                            "{}={}",
599                            *label,
600                            i.metric()
601                                .get(*label)
602                                .map(|s| s.as_str())
603                                .unwrap_or_default()
604                        )
605                    })
606                    .join(",");
607                let _ = writeln!(s, "{}: {:.3}", l, i.sample().value());
608            }
609        }
610    }
611
612    #[cfg_attr(coverage, coverage(off))]
613    async fn write_await_tree(&self, s: &mut String) {
614        // Most lines of code are copied from dashboard::handlers::dump_await_tree_all, because the latter cannot be called directly from here.
615        let Ok(worker_nodes) = self
616            .metadata_manager
617            .list_worker_node(Some(WorkerType::ComputeNode), None)
618            .await
619        else {
620            tracing::warn!("failed to get worker nodes");
621            return;
622        };
623
624        let mut all = StackTraceResponse::default();
625
626        let compute_clients = ComputeClientPool::adhoc();
627        for worker_node in &worker_nodes {
628            if let Ok(client) = compute_clients.get(worker_node).await
629                && let Ok(result) = client.stack_trace().await
630            {
631                all.merge_other(result);
632            }
633        }
634
635        write!(s, "{}", all.output()).unwrap();
636    }
637
638    async fn write_table_definition(&self, s: &mut String) -> MetaResult<()> {
639        let sources = self
640            .metadata_manager
641            .catalog_controller
642            .list_sources()
643            .await?
644            .into_iter()
645            .map(|s| (s.id, (s.name, s.schema_id, s.definition)))
646            .collect::<BTreeMap<_, _>>();
647        let tables = self
648            .metadata_manager
649            .catalog_controller
650            .list_tables_by_type(TableType::Table)
651            .await?
652            .into_iter()
653            .map(|t| (t.id, (t.name, t.schema_id, t.definition)))
654            .collect::<BTreeMap<_, _>>();
655        let mvs = self
656            .metadata_manager
657            .catalog_controller
658            .list_tables_by_type(TableType::MaterializedView)
659            .await?
660            .into_iter()
661            .map(|t| (t.id, (t.name, t.schema_id, t.definition)))
662            .collect::<BTreeMap<_, _>>();
663        let indexes = self
664            .metadata_manager
665            .catalog_controller
666            .list_tables_by_type(TableType::Index)
667            .await?
668            .into_iter()
669            .map(|t| (t.id, (t.name, t.schema_id, t.definition)))
670            .collect::<BTreeMap<_, _>>();
671        let sinks = self
672            .metadata_manager
673            .catalog_controller
674            .list_sinks()
675            .await?
676            .into_iter()
677            .map(|s| (s.id, (s.name, s.schema_id, s.definition)))
678            .collect::<BTreeMap<_, _>>();
679        let catalogs = [
680            ("SOURCE", sources),
681            ("TABLE", tables),
682            ("MATERIALIZED VIEW", mvs),
683            ("INDEX", indexes),
684            ("SINK", sinks),
685        ];
686        let mut obj_id_to_name = HashMap::new();
687        for (title, items) in catalogs {
688            use comfy_table::{Row, Table};
689            let mut table = Table::new();
690            table.set_header({
691                let mut row = Row::new();
692                row.add_cell("id".into());
693                row.add_cell("name".into());
694                row.add_cell("schema_id".into());
695                row.add_cell("definition".into());
696                row
697            });
698            for (id, (name, schema_id, definition)) in items {
699                obj_id_to_name.insert(id, name.clone());
700                let mut row = Row::new();
701                let may_redact =
702                    redact_all_sql_options(&definition).unwrap_or_else(|| "[REDACTED]".into());
703                row.add_cell(id.into());
704                row.add_cell(name.into());
705                row.add_cell(schema_id.into());
706                row.add_cell(may_redact.into());
707                table.add_row(row);
708            }
709            let _ = writeln!(s);
710            let _ = writeln!(s, "{title}");
711            let _ = writeln!(s, "{table}");
712        }
713
714        let actors = self
715            .metadata_manager
716            .catalog_controller
717            .list_actor_info()
718            .await?
719            .into_iter()
720            .map(|(actor_id, fragment_id, job_id, schema_id, obj_type)| {
721                (
722                    actor_id,
723                    (
724                        fragment_id,
725                        job_id,
726                        schema_id,
727                        obj_type,
728                        obj_id_to_name
729                            .get(&(job_id as _))
730                            .cloned()
731                            .unwrap_or_default(),
732                    ),
733                )
734            })
735            .collect::<BTreeMap<_, _>>();
736
737        use comfy_table::{Row, Table};
738        let mut table = Table::new();
739        table.set_header({
740            let mut row = Row::new();
741            row.add_cell("id".into());
742            row.add_cell("fragment_id".into());
743            row.add_cell("job_id".into());
744            row.add_cell("schema_id".into());
745            row.add_cell("type".into());
746            row.add_cell("name".into());
747            row
748        });
749        for (actor_id, (fragment_id, job_id, schema_id, ddl_type, name)) in actors {
750            let mut row = Row::new();
751            row.add_cell(actor_id.into());
752            row.add_cell(fragment_id.into());
753            row.add_cell(job_id.into());
754            row.add_cell(schema_id.into());
755            row.add_cell(ddl_type.as_str().into());
756            row.add_cell(name.into());
757            table.add_row(row);
758        }
759        let _ = writeln!(s);
760        let _ = writeln!(s, "ACTOR");
761        let _ = writeln!(s, "{table}");
762        Ok(())
763    }
764}
765
766#[cfg_attr(coverage, coverage(off))]
767fn try_add_cell<T: Into<comfy_table::Cell>>(row: &mut comfy_table::Row, t: Option<T>) {
768    match t {
769        Some(t) => {
770            row.add_cell(t.into());
771        }
772        None => {
773            row.add_cell("".into());
774        }
775    }
776}
777
778#[cfg_attr(coverage, coverage(off))]
779fn merge_prometheus_selector<'a>(selectors: impl IntoIterator<Item = &'a str>) -> String {
780    selectors.into_iter().filter(|s| !s.is_empty()).join(",")
781}
782
783fn redact_all_sql_options(sql: &str) -> Option<String> {
784    let Ok(mut statements) = Parser::parse_sql(sql) else {
785        return None;
786    };
787    let mut redacted = String::new();
788    for statement in &mut statements {
789        let options = match statement {
790            Statement::CreateTable {
791                with_options,
792                format_encode,
793                ..
794            } => {
795                let format_encode = match format_encode {
796                    Some(CompatibleFormatEncode::V2(cs)) => Some(&mut cs.row_options),
797                    _ => None,
798                };
799                (Some(with_options), format_encode)
800            }
801            Statement::CreateSource { stmt } => {
802                let format_encode = match &mut stmt.format_encode {
803                    CompatibleFormatEncode::V2(cs) => Some(&mut cs.row_options),
804                    _ => None,
805                };
806                (Some(&mut stmt.with_properties.0), format_encode)
807            }
808            Statement::CreateSink { stmt } => {
809                let format_encode = match &mut stmt.sink_schema {
810                    Some(cs) => Some(&mut cs.row_options),
811                    _ => None,
812                };
813                (Some(&mut stmt.with_properties.0), format_encode)
814            }
815            _ => (None, None),
816        };
817        if let Some(options) = options.0 {
818            for option in options {
819                option.value = Value::SingleQuotedString("[REDACTED]".into()).into();
820            }
821        }
822        if let Some(options) = options.1 {
823            for option in options {
824                option.value = Value::SingleQuotedString("[REDACTED]".into()).into();
825            }
826        }
827        writeln!(&mut redacted, "{statement}").unwrap();
828    }
829    Some(redacted)
830}