1use std::cmp::{Ordering, Reverse};
16use std::collections::{BTreeMap, BinaryHeap, HashMap};
17use std::fmt::Write;
18use std::sync::Arc;
19
20use itertools::Itertools;
21use prometheus_http_query::response::Data::Vector;
22use risingwave_common::types::Timestamptz;
23use risingwave_common::util::StackTraceResponseExt;
24use risingwave_hummock_sdk::level::Level;
25use risingwave_meta_model::table::TableType;
26use risingwave_pb::common::WorkerType;
27use risingwave_pb::meta::EventLog;
28use risingwave_pb::meta::event_log::Event;
29use risingwave_pb::monitor_service::StackTraceResponse;
30use risingwave_rpc_client::ComputeClientPool;
31use risingwave_sqlparser::ast::{CompatibleFormatEncode, Statement, Value};
32use risingwave_sqlparser::parser::Parser;
33use serde_json::json;
34use thiserror_ext::AsReport;
35
36use crate::MetaResult;
37use crate::hummock::HummockManagerRef;
38use crate::manager::MetadataManager;
39use crate::manager::event_log::EventLogManagerRef;
40
41pub type DiagnoseCommandRef = Arc<DiagnoseCommand>;
42
43pub struct DiagnoseCommand {
44 metadata_manager: MetadataManager,
45 hummock_manger: HummockManagerRef,
46 event_log_manager: EventLogManagerRef,
47 prometheus_client: Option<prometheus_http_query::Client>,
48 prometheus_selector: String,
49}
50
51impl DiagnoseCommand {
52 pub fn new(
53 metadata_manager: MetadataManager,
54 hummock_manger: HummockManagerRef,
55 event_log_manager: EventLogManagerRef,
56 prometheus_client: Option<prometheus_http_query::Client>,
57 prometheus_selector: String,
58 ) -> Self {
59 Self {
60 metadata_manager,
61 hummock_manger,
62 event_log_manager,
63 prometheus_client,
64 prometheus_selector,
65 }
66 }
67
68 #[cfg_attr(coverage, coverage(off))]
69 pub async fn report(&self) -> String {
70 let mut report = String::new();
71 let _ = writeln!(
72 report,
73 "report created at: {}\nversion: {}",
74 chrono::DateTime::<chrono::offset::Utc>::from(std::time::SystemTime::now()),
75 risingwave_common::current_cluster_version(),
76 );
77 let _ = writeln!(report);
78 self.write_catalog(&mut report).await;
79 let _ = writeln!(report);
80 self.write_worker_nodes(&mut report).await;
81 let _ = writeln!(report);
82 self.write_streaming_prometheus(&mut report).await;
83 let _ = writeln!(report);
84 self.write_storage(&mut report).await;
85 let _ = writeln!(report);
86 self.write_await_tree(&mut report).await;
87 let _ = writeln!(report);
88 self.write_event_logs(&mut report);
89 report
90 }
91
92 #[cfg_attr(coverage, coverage(off))]
93 async fn write_catalog(&self, s: &mut String) {
94 self.write_catalog_inner(s).await;
95 let _ = self.write_table_definition(s).await.inspect_err(|e| {
96 tracing::warn!(
97 error = e.to_report_string(),
98 "failed to display table definition"
99 )
100 });
101 }
102
103 #[cfg_attr(coverage, coverage(off))]
104 async fn write_catalog_inner(&self, s: &mut String) {
105 let guard = self
106 .metadata_manager
107 .catalog_controller
108 .get_inner_read_guard()
109 .await;
110 let stat = match guard.stats().await {
111 Ok(stat) => stat,
112 Err(err) => {
113 tracing::warn!(error=?err.as_report(), "failed to get catalog stats");
114 return;
115 }
116 };
117 let _ = writeln!(s, "number of fragment: {}", stat.streaming_job_num);
118 let _ = writeln!(s, "number of actor: {}", stat.actor_num);
119 let _ = writeln!(s, "number of source: {}", stat.source_num);
120 let _ = writeln!(s, "number of table: {}", stat.table_num);
121 let _ = writeln!(s, "number of materialized view: {}", stat.mview_num);
122 let _ = writeln!(s, "number of sink: {}", stat.sink_num);
123 let _ = writeln!(s, "number of index: {}", stat.index_num);
124 let _ = writeln!(s, "number of function: {}", stat.function_num);
125 }
126
127 #[cfg_attr(coverage, coverage(off))]
128 async fn write_worker_nodes(&self, s: &mut String) {
129 let Ok(worker_actor_count) = self.metadata_manager.worker_actor_count().await else {
130 tracing::warn!("failed to get worker actor count");
131 return;
132 };
133
134 use comfy_table::{Row, Table};
135 let Ok(worker_nodes) = self.metadata_manager.list_worker_node(None, None).await else {
136 tracing::warn!("failed to get worker nodes");
137 return;
138 };
139 let mut table = Table::new();
140 table.set_header({
141 let mut row = Row::new();
142 row.add_cell("id".into());
143 row.add_cell("host".into());
144 row.add_cell("type".into());
145 row.add_cell("state".into());
146 row.add_cell("parallelism".into());
147 row.add_cell("is_streaming".into());
148 row.add_cell("is_serving".into());
149 row.add_cell("rw_version".into());
150 row.add_cell("total_memory_bytes".into());
151 row.add_cell("total_cpu_cores".into());
152 row.add_cell("started_at".into());
153 row.add_cell("actor_count".into());
154 row
155 });
156 for worker_node in worker_nodes {
157 let mut row = Row::new();
158 row.add_cell(worker_node.id.into());
159 try_add_cell(
160 &mut row,
161 worker_node
162 .host
163 .as_ref()
164 .map(|h| format!("{}:{}", h.host, h.port)),
165 );
166 try_add_cell(
167 &mut row,
168 worker_node.get_type().ok().map(|t| t.as_str_name()),
169 );
170 try_add_cell(
171 &mut row,
172 worker_node.get_state().ok().map(|s| s.as_str_name()),
173 );
174 try_add_cell(&mut row, worker_node.parallelism());
175 try_add_cell(
176 &mut row,
177 worker_node.property.as_ref().map(|p| p.is_streaming),
178 );
179 try_add_cell(
180 &mut row,
181 worker_node.property.as_ref().map(|p| p.is_serving),
182 );
183 try_add_cell(
184 &mut row,
185 worker_node.resource.as_ref().map(|r| r.rw_version.clone()),
186 );
187 try_add_cell(
188 &mut row,
189 worker_node.resource.as_ref().map(|r| r.total_memory_bytes),
190 );
191 try_add_cell(
192 &mut row,
193 worker_node.resource.as_ref().map(|r| r.total_cpu_cores),
194 );
195 try_add_cell(
196 &mut row,
197 worker_node
198 .started_at
199 .and_then(|ts| Timestamptz::from_secs(ts as _).map(|t| t.to_string())),
200 );
201 let actor_count = {
202 if let Ok(t) = worker_node.get_type()
203 && t != WorkerType::ComputeNode
204 {
205 None
206 } else {
207 match worker_actor_count.get(&(worker_node.id as _)) {
208 None => Some(0),
209 Some(c) => Some(*c),
210 }
211 }
212 };
213 try_add_cell(&mut row, actor_count);
214 table.add_row(row);
215 }
216 let _ = writeln!(s, "{table}");
217 }
218
219 #[cfg_attr(coverage, coverage(off))]
220 fn write_event_logs(&self, s: &mut String) {
221 let event_logs = self
222 .event_log_manager
223 .list_event_logs()
224 .into_iter()
225 .sorted_by(|a, b| {
226 a.timestamp
227 .unwrap_or(0)
228 .cmp(&b.timestamp.unwrap_or(0))
229 .reverse()
230 })
231 .collect_vec();
232
233 let _ = writeln!(s, "latest barrier completions");
234 Self::write_event_logs_impl(
235 s,
236 event_logs.iter(),
237 |e| {
238 let Event::BarrierComplete(info) = e else {
239 return None;
240 };
241 Some(json!(info).to_string())
242 },
243 Some(10),
244 );
245
246 let _ = writeln!(s);
247 let _ = writeln!(s, "latest barrier collection failures");
248 Self::write_event_logs_impl(
249 s,
250 event_logs.iter(),
251 |e| {
252 let Event::CollectBarrierFail(info) = e else {
253 return None;
254 };
255 Some(json!(info).to_string())
256 },
257 Some(3),
258 );
259
260 let _ = writeln!(s);
261 let _ = writeln!(s, "latest barrier injection failures");
262 Self::write_event_logs_impl(
263 s,
264 event_logs.iter(),
265 |e| {
266 let Event::InjectBarrierFail(info) = e else {
267 return None;
268 };
269 Some(json!(info).to_string())
270 },
271 Some(3),
272 );
273
274 let _ = writeln!(s);
275 let _ = writeln!(s, "latest worker node panics");
276 Self::write_event_logs_impl(
277 s,
278 event_logs.iter(),
279 |e| {
280 let Event::WorkerNodePanic(info) = e else {
281 return None;
282 };
283 Some(json!(info).to_string())
284 },
285 Some(10),
286 );
287
288 let _ = writeln!(s);
289 let _ = writeln!(s, "latest create stream job failures");
290 Self::write_event_logs_impl(
291 s,
292 event_logs.iter(),
293 |e| {
294 let Event::CreateStreamJobFail(info) = e else {
295 return None;
296 };
297 Some(json!(info).to_string())
298 },
299 Some(3),
300 );
301
302 let _ = writeln!(s);
303 let _ = writeln!(s, "latest dirty stream job clear-ups");
304 Self::write_event_logs_impl(
305 s,
306 event_logs.iter(),
307 |e| {
308 let Event::DirtyStreamJobClear(info) = e else {
309 return None;
310 };
311 Some(json!(info).to_string())
312 },
313 Some(3),
314 );
315 }
316
317 #[cfg_attr(coverage, coverage(off))]
318 fn write_event_logs_impl<'a, F>(
319 s: &mut String,
320 event_logs: impl Iterator<Item = &'a EventLog>,
321 get_event_info: F,
322 limit: Option<usize>,
323 ) where
324 F: Fn(&Event) -> Option<String>,
325 {
326 use comfy_table::{Row, Table};
327 let mut table = Table::new();
328 table.set_header({
329 let mut row = Row::new();
330 row.add_cell("created_at".into());
331 row.add_cell("info".into());
332 row
333 });
334 let mut row_count = 0;
335 for event_log in event_logs {
336 let Some(ref inner) = event_log.event else {
337 continue;
338 };
339 if let Some(limit) = limit
340 && row_count >= limit
341 {
342 break;
343 }
344 let mut row = Row::new();
345 let ts = event_log
346 .timestamp
347 .and_then(|ts| Timestamptz::from_millis(ts as _).map(|ts| ts.to_string()));
348 try_add_cell(&mut row, ts);
349 if let Some(info) = get_event_info(inner) {
350 row.add_cell(info.into());
351 row_count += 1;
352 } else {
353 continue;
354 }
355 table.add_row(row);
356 }
357 let _ = writeln!(s, "{table}");
358 }
359
360 #[cfg_attr(coverage, coverage(off))]
361 async fn write_storage(&self, s: &mut String) {
362 let mut sst_num = 0;
363 let mut sst_total_file_size = 0;
364 let back_pressured_compaction_groups = self
365 .hummock_manger
366 .write_limits()
367 .await
368 .into_iter()
369 .filter_map(|(k, v)| {
370 if v.table_ids.is_empty() {
371 None
372 } else {
373 Some(k)
374 }
375 })
376 .join(",");
377 if !back_pressured_compaction_groups.is_empty() {
378 let _ = writeln!(
379 s,
380 "back pressured compaction groups: {back_pressured_compaction_groups}"
381 );
382 }
383
384 #[derive(PartialEq, Eq)]
385 struct SstableSort {
386 compaction_group_id: u64,
387 sst_id: u64,
388 delete_ratio: u64,
389 }
390 impl PartialOrd for SstableSort {
391 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
392 Some(self.cmp(other))
393 }
394 }
395 impl Ord for SstableSort {
396 fn cmp(&self, other: &Self) -> Ordering {
397 self.delete_ratio.cmp(&other.delete_ratio)
398 }
399 }
400 fn top_k_sstables(
401 top_k: usize,
402 heap: &mut BinaryHeap<Reverse<SstableSort>>,
403 e: SstableSort,
404 ) {
405 if heap.len() < top_k {
406 heap.push(Reverse(e));
407 } else if let Some(mut p) = heap.peek_mut() {
408 if e.delete_ratio > p.0.delete_ratio {
409 *p = Reverse(e);
410 }
411 }
412 }
413
414 let top_k = 10;
415 let mut top_tombstone_delete_sst = BinaryHeap::with_capacity(top_k);
416 let compaction_group_num = self
417 .hummock_manger
418 .on_current_version(|version| {
419 for compaction_group in version.levels.values() {
420 let mut visit_level = |level: &Level| {
421 sst_num += level.table_infos.len();
422 sst_total_file_size +=
423 level.table_infos.iter().map(|t| t.sst_size).sum::<u64>();
424 for sst in &level.table_infos {
425 if sst.total_key_count == 0 {
426 continue;
427 }
428 let tombstone_delete_ratio =
429 sst.stale_key_count * 10000 / sst.total_key_count;
430 let e = SstableSort {
431 compaction_group_id: compaction_group.group_id,
432 sst_id: sst.sst_id,
433 delete_ratio: tombstone_delete_ratio,
434 };
435 top_k_sstables(top_k, &mut top_tombstone_delete_sst, e);
436 }
437 };
438 let l0 = &compaction_group.l0;
439 for level in &l0.sub_levels {
441 visit_level(level);
442 }
443 for level in &compaction_group.levels {
444 visit_level(level);
445 }
446 }
447 version.levels.len()
448 })
449 .await;
450
451 let _ = writeln!(s, "number of SSTables: {sst_num}");
452 let _ = writeln!(s, "total size of SSTables (byte): {sst_total_file_size}");
453 let _ = writeln!(s, "number of compaction groups: {compaction_group_num}");
454 use comfy_table::{Row, Table};
455 fn format_table(heap: BinaryHeap<Reverse<SstableSort>>) -> Table {
456 let mut table = Table::new();
457 table.set_header({
458 let mut row = Row::new();
459 row.add_cell("compaction group id".into());
460 row.add_cell("sstable id".into());
461 row.add_cell("delete ratio".into());
462 row
463 });
464 for sst in &heap.into_sorted_vec() {
465 let mut row = Row::new();
466 row.add_cell(sst.0.compaction_group_id.into());
467 row.add_cell(sst.0.sst_id.into());
468 row.add_cell(format!("{:.2}%", sst.0.delete_ratio as f64 / 100.0).into());
469 table.add_row(row);
470 }
471 table
472 }
473 let _ = writeln!(s);
474 let _ = writeln!(s, "top tombstone delete ratio");
475 let _ = writeln!(s, "{}", format_table(top_tombstone_delete_sst));
476 let _ = writeln!(s);
477
478 let _ = writeln!(s);
479 self.write_storage_prometheus(s).await;
480 }
481
482 #[cfg_attr(coverage, coverage(off))]
483 async fn write_streaming_prometheus(&self, s: &mut String) {
484 let _ = writeln!(s, "top sources by throughput (rows/s)");
485 let query = format!(
486 "topk(3, sum(rate(stream_source_output_rows_counts{{{}}}[10m]))by (source_name))",
487 self.prometheus_selector
488 );
489 self.write_instant_vector_impl(s, &query, vec!["source_name"])
490 .await;
491
492 let _ = writeln!(s);
493 let _ = writeln!(s, "top materialized views by throughput (rows/s)");
494 let query = format!(
495 "topk(3, sum(rate(stream_mview_input_row_count{{{}}}[10m]))by (table_id))",
496 self.prometheus_selector
497 );
498 self.write_instant_vector_impl(s, &query, vec!["table_id"])
499 .await;
500
501 let _ = writeln!(s);
502 let _ = writeln!(s, "top join executor by matched rows");
503 let query = format!(
504 "topk(3, histogram_quantile(0.9, sum(rate(stream_join_matched_join_keys_bucket{{{}}}[10m])) by (le, fragment_id, table_id)))",
505 self.prometheus_selector
506 );
507 self.write_instant_vector_impl(s, &query, vec!["table_id", "fragment_id"])
508 .await;
509 }
510
511 #[cfg_attr(coverage, coverage(off))]
512 async fn write_storage_prometheus(&self, s: &mut String) {
513 let _ = writeln!(s, "top Hummock Get by duration (second)");
514 let query = format!(
515 "topk(3, histogram_quantile(0.9, sum(rate(state_store_get_duration_bucket{{{}}}[10m])) by (le, table_id)))",
516 self.prometheus_selector
517 );
518 self.write_instant_vector_impl(s, &query, vec!["table_id"])
519 .await;
520
521 let _ = writeln!(s);
522 let _ = writeln!(s, "top Hummock Iter Init by duration (second)");
523 let query = format!(
524 "topk(3, histogram_quantile(0.9, sum(rate(state_store_iter_init_duration_bucket{{{}}}[10m])) by (le, table_id)))",
525 self.prometheus_selector
526 );
527 self.write_instant_vector_impl(s, &query, vec!["table_id"])
528 .await;
529
530 let _ = writeln!(s);
531 let _ = writeln!(s, "top table commit flush by size (byte)");
532 let query = format!(
533 "topk(3, sum(rate(storage_commit_write_throughput{{{}}}[10m])) by (table_id))",
534 self.prometheus_selector
535 );
536 self.write_instant_vector_impl(s, &query, vec!["table_id"])
537 .await;
538
539 let _ = writeln!(s);
540 let _ = writeln!(s, "object store read throughput (bytes/s)");
541 let query = format!(
542 "sum(rate(object_store_read_bytes{{{}}}[10m])) by (job, instance)",
543 merge_prometheus_selector([&self.prometheus_selector, "job=~\"compute|compactor\""])
544 );
545 self.write_instant_vector_impl(s, &query, vec!["job", "instance"])
546 .await;
547
548 let _ = writeln!(s);
549 let _ = writeln!(s, "object store write throughput (bytes/s)");
550 let query = format!(
551 "sum(rate(object_store_write_bytes{{{}}}[10m])) by (job, instance)",
552 merge_prometheus_selector([&self.prometheus_selector, "job=~\"compute|compactor\""])
553 );
554 self.write_instant_vector_impl(s, &query, vec!["job", "instance"])
555 .await;
556
557 let _ = writeln!(s);
558 let _ = writeln!(s, "object store operation rate");
559 let query = format!(
560 "sum(rate(object_store_operation_latency_count{{{}}}[10m])) by (le, type, job, instance)",
561 merge_prometheus_selector([
562 &self.prometheus_selector,
563 "job=~\"compute|compactor\", type!~\"streaming_upload_write_bytes|streaming_read_read_bytes|streaming_read\""
564 ])
565 );
566 self.write_instant_vector_impl(s, &query, vec!["type", "job", "instance"])
567 .await;
568
569 let _ = writeln!(s);
570 let _ = writeln!(s, "object store operation duration (second)");
571 let query = format!(
572 "histogram_quantile(0.9, sum(rate(object_store_operation_latency_bucket{{{}}}[10m])) by (le, type, job, instance))",
573 merge_prometheus_selector([
574 &self.prometheus_selector,
575 "job=~\"compute|compactor\", type!~\"streaming_upload_write_bytes|streaming_read\""
576 ])
577 );
578 self.write_instant_vector_impl(s, &query, vec!["type", "job", "instance"])
579 .await;
580 }
581
582 #[cfg_attr(coverage, coverage(off))]
583 async fn write_instant_vector_impl(&self, s: &mut String, query: &str, labels: Vec<&str>) {
584 let Some(ref client) = self.prometheus_client else {
585 return;
586 };
587 if let Ok(Vector(instant_vec)) = client
588 .query(query)
589 .get()
590 .await
591 .map(|result| result.into_inner().0)
592 {
593 for i in instant_vec {
594 let l = labels
595 .iter()
596 .map(|label| {
597 format!(
598 "{}={}",
599 *label,
600 i.metric()
601 .get(*label)
602 .map(|s| s.as_str())
603 .unwrap_or_default()
604 )
605 })
606 .join(",");
607 let _ = writeln!(s, "{}: {:.3}", l, i.sample().value());
608 }
609 }
610 }
611
612 #[cfg_attr(coverage, coverage(off))]
613 async fn write_await_tree(&self, s: &mut String) {
614 let Ok(worker_nodes) = self
616 .metadata_manager
617 .list_worker_node(Some(WorkerType::ComputeNode), None)
618 .await
619 else {
620 tracing::warn!("failed to get worker nodes");
621 return;
622 };
623
624 let mut all = StackTraceResponse::default();
625
626 let compute_clients = ComputeClientPool::adhoc();
627 for worker_node in &worker_nodes {
628 if let Ok(client) = compute_clients.get(worker_node).await
629 && let Ok(result) = client.stack_trace().await
630 {
631 all.merge_other(result);
632 }
633 }
634
635 write!(s, "{}", all.output()).unwrap();
636 }
637
638 async fn write_table_definition(&self, s: &mut String) -> MetaResult<()> {
639 let sources = self
640 .metadata_manager
641 .catalog_controller
642 .list_sources()
643 .await?
644 .into_iter()
645 .map(|s| (s.id, (s.name, s.schema_id, s.definition)))
646 .collect::<BTreeMap<_, _>>();
647 let tables = self
648 .metadata_manager
649 .catalog_controller
650 .list_tables_by_type(TableType::Table)
651 .await?
652 .into_iter()
653 .map(|t| (t.id, (t.name, t.schema_id, t.definition)))
654 .collect::<BTreeMap<_, _>>();
655 let mvs = self
656 .metadata_manager
657 .catalog_controller
658 .list_tables_by_type(TableType::MaterializedView)
659 .await?
660 .into_iter()
661 .map(|t| (t.id, (t.name, t.schema_id, t.definition)))
662 .collect::<BTreeMap<_, _>>();
663 let indexes = self
664 .metadata_manager
665 .catalog_controller
666 .list_tables_by_type(TableType::Index)
667 .await?
668 .into_iter()
669 .map(|t| (t.id, (t.name, t.schema_id, t.definition)))
670 .collect::<BTreeMap<_, _>>();
671 let sinks = self
672 .metadata_manager
673 .catalog_controller
674 .list_sinks()
675 .await?
676 .into_iter()
677 .map(|s| (s.id, (s.name, s.schema_id, s.definition)))
678 .collect::<BTreeMap<_, _>>();
679 let catalogs = [
680 ("SOURCE", sources),
681 ("TABLE", tables),
682 ("MATERIALIZED VIEW", mvs),
683 ("INDEX", indexes),
684 ("SINK", sinks),
685 ];
686 let mut obj_id_to_name = HashMap::new();
687 for (title, items) in catalogs {
688 use comfy_table::{Row, Table};
689 let mut table = Table::new();
690 table.set_header({
691 let mut row = Row::new();
692 row.add_cell("id".into());
693 row.add_cell("name".into());
694 row.add_cell("schema_id".into());
695 row.add_cell("definition".into());
696 row
697 });
698 for (id, (name, schema_id, definition)) in items {
699 obj_id_to_name.insert(id, name.clone());
700 let mut row = Row::new();
701 let may_redact =
702 redact_all_sql_options(&definition).unwrap_or_else(|| "[REDACTED]".into());
703 row.add_cell(id.into());
704 row.add_cell(name.into());
705 row.add_cell(schema_id.into());
706 row.add_cell(may_redact.into());
707 table.add_row(row);
708 }
709 let _ = writeln!(s);
710 let _ = writeln!(s, "{title}");
711 let _ = writeln!(s, "{table}");
712 }
713
714 let actors = self
715 .metadata_manager
716 .catalog_controller
717 .list_actor_info()
718 .await?
719 .into_iter()
720 .map(|(actor_id, fragment_id, job_id, schema_id, obj_type)| {
721 (
722 actor_id,
723 (
724 fragment_id,
725 job_id,
726 schema_id,
727 obj_type,
728 obj_id_to_name
729 .get(&(job_id as _))
730 .cloned()
731 .unwrap_or_default(),
732 ),
733 )
734 })
735 .collect::<BTreeMap<_, _>>();
736
737 use comfy_table::{Row, Table};
738 let mut table = Table::new();
739 table.set_header({
740 let mut row = Row::new();
741 row.add_cell("id".into());
742 row.add_cell("fragment_id".into());
743 row.add_cell("job_id".into());
744 row.add_cell("schema_id".into());
745 row.add_cell("type".into());
746 row.add_cell("name".into());
747 row
748 });
749 for (actor_id, (fragment_id, job_id, schema_id, ddl_type, name)) in actors {
750 let mut row = Row::new();
751 row.add_cell(actor_id.into());
752 row.add_cell(fragment_id.into());
753 row.add_cell(job_id.into());
754 row.add_cell(schema_id.into());
755 row.add_cell(ddl_type.as_str().into());
756 row.add_cell(name.into());
757 table.add_row(row);
758 }
759 let _ = writeln!(s);
760 let _ = writeln!(s, "ACTOR");
761 let _ = writeln!(s, "{table}");
762 Ok(())
763 }
764}
765
766#[cfg_attr(coverage, coverage(off))]
767fn try_add_cell<T: Into<comfy_table::Cell>>(row: &mut comfy_table::Row, t: Option<T>) {
768 match t {
769 Some(t) => {
770 row.add_cell(t.into());
771 }
772 None => {
773 row.add_cell("".into());
774 }
775 }
776}
777
778#[cfg_attr(coverage, coverage(off))]
779fn merge_prometheus_selector<'a>(selectors: impl IntoIterator<Item = &'a str>) -> String {
780 selectors.into_iter().filter(|s| !s.is_empty()).join(",")
781}
782
783fn redact_all_sql_options(sql: &str) -> Option<String> {
784 let Ok(mut statements) = Parser::parse_sql(sql) else {
785 return None;
786 };
787 let mut redacted = String::new();
788 for statement in &mut statements {
789 let options = match statement {
790 Statement::CreateTable {
791 with_options,
792 format_encode,
793 ..
794 } => {
795 let format_encode = match format_encode {
796 Some(CompatibleFormatEncode::V2(cs)) => Some(&mut cs.row_options),
797 _ => None,
798 };
799 (Some(with_options), format_encode)
800 }
801 Statement::CreateSource { stmt } => {
802 let format_encode = match &mut stmt.format_encode {
803 CompatibleFormatEncode::V2(cs) => Some(&mut cs.row_options),
804 _ => None,
805 };
806 (Some(&mut stmt.with_properties.0), format_encode)
807 }
808 Statement::CreateSink { stmt } => {
809 let format_encode = match &mut stmt.sink_schema {
810 Some(cs) => Some(&mut cs.row_options),
811 _ => None,
812 };
813 (Some(&mut stmt.with_properties.0), format_encode)
814 }
815 _ => (None, None),
816 };
817 if let Some(options) = options.0 {
818 for option in options {
819 option.value = Value::SingleQuotedString("[REDACTED]".into()).into();
820 }
821 }
822 if let Some(options) = options.1 {
823 for option in options {
824 option.value = Value::SingleQuotedString("[REDACTED]".into()).into();
825 }
826 }
827 writeln!(&mut redacted, "{statement}").unwrap();
828 }
829 Some(redacted)
830}