1use std::cmp::{Ordering, Reverse};
16use std::collections::{BTreeMap, BinaryHeap, HashMap};
17use std::fmt::Write;
18use std::sync::Arc;
19
20use itertools::Itertools;
21use prometheus_http_query::response::Data::Vector;
22use risingwave_common::types::Timestamptz;
23use risingwave_common::util::StackTraceResponseExt;
24use risingwave_hummock_sdk::HummockSstableId;
25use risingwave_hummock_sdk::level::Level;
26use risingwave_meta_model::table::TableType;
27use risingwave_pb::common::WorkerType;
28use risingwave_pb::meta::EventLog;
29use risingwave_pb::meta::event_log::Event;
30use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
31use risingwave_pb::monitor_service::{StackTraceRequest, StackTraceResponse};
32use risingwave_rpc_client::ComputeClientPool;
33use risingwave_sqlparser::ast::{CompatibleFormatEncode, Statement, Value};
34use risingwave_sqlparser::parser::Parser;
35use serde_json::json;
36use thiserror_ext::AsReport;
37
38use crate::MetaResult;
39use crate::hummock::HummockManagerRef;
40use crate::manager::MetadataManager;
41use crate::manager::event_log::EventLogManagerRef;
42
43pub type DiagnoseCommandRef = Arc<DiagnoseCommand>;
44
45pub struct DiagnoseCommand {
46 metadata_manager: MetadataManager,
47 hummock_manger: HummockManagerRef,
48 event_log_manager: EventLogManagerRef,
49 prometheus_client: Option<prometheus_http_query::Client>,
50 prometheus_selector: String,
51}
52
53impl DiagnoseCommand {
54 pub fn new(
55 metadata_manager: MetadataManager,
56 hummock_manger: HummockManagerRef,
57 event_log_manager: EventLogManagerRef,
58 prometheus_client: Option<prometheus_http_query::Client>,
59 prometheus_selector: String,
60 ) -> Self {
61 Self {
62 metadata_manager,
63 hummock_manger,
64 event_log_manager,
65 prometheus_client,
66 prometheus_selector,
67 }
68 }
69
70 #[cfg_attr(coverage, coverage(off))]
71 pub async fn report(&self, actor_traces_format: ActorTracesFormat) -> String {
72 let mut report = String::new();
73 let _ = writeln!(
74 report,
75 "report created at: {}\nversion: {}",
76 chrono::DateTime::<chrono::offset::Utc>::from(std::time::SystemTime::now()),
77 risingwave_common::current_cluster_version(),
78 );
79 let _ = writeln!(report);
80 self.write_catalog(&mut report).await;
81 let _ = writeln!(report);
82 self.write_worker_nodes(&mut report).await;
83 let _ = writeln!(report);
84 self.write_streaming_prometheus(&mut report).await;
85 let _ = writeln!(report);
86 self.write_storage(&mut report).await;
87 let _ = writeln!(report);
88 self.write_await_tree(&mut report, actor_traces_format)
89 .await;
90 let _ = writeln!(report);
91 self.write_event_logs(&mut report);
92 report
93 }
94
95 #[cfg_attr(coverage, coverage(off))]
96 async fn write_catalog(&self, s: &mut String) {
97 self.write_catalog_inner(s).await;
98 let _ = self.write_table_definition(s).await.inspect_err(|e| {
99 tracing::warn!(
100 error = e.to_report_string(),
101 "failed to display table definition"
102 )
103 });
104 }
105
106 #[cfg_attr(coverage, coverage(off))]
107 async fn write_catalog_inner(&self, s: &mut String) {
108 let guard = self
109 .metadata_manager
110 .catalog_controller
111 .get_inner_read_guard()
112 .await;
113 let stat = match guard.stats().await {
114 Ok(stat) => stat,
115 Err(err) => {
116 tracing::warn!(error=?err.as_report(), "failed to get catalog stats");
117 return;
118 }
119 };
120 let _ = writeln!(s, "number of fragment: {}", stat.streaming_job_num);
121 let _ = writeln!(s, "number of actor: {}", stat.actor_num);
122 let _ = writeln!(s, "number of source: {}", stat.source_num);
123 let _ = writeln!(s, "number of table: {}", stat.table_num);
124 let _ = writeln!(s, "number of materialized view: {}", stat.mview_num);
125 let _ = writeln!(s, "number of sink: {}", stat.sink_num);
126 let _ = writeln!(s, "number of index: {}", stat.index_num);
127 let _ = writeln!(s, "number of function: {}", stat.function_num);
128 }
129
130 #[cfg_attr(coverage, coverage(off))]
131 async fn write_worker_nodes(&self, s: &mut String) {
132 let Ok(worker_actor_count) = self.metadata_manager.worker_actor_count().await else {
133 tracing::warn!("failed to get worker actor count");
134 return;
135 };
136
137 use comfy_table::{Row, Table};
138 let Ok(worker_nodes) = self.metadata_manager.list_worker_node(None, None).await else {
139 tracing::warn!("failed to get worker nodes");
140 return;
141 };
142 let mut table = Table::new();
143 table.set_header({
144 let mut row = Row::new();
145 row.add_cell("id".into());
146 row.add_cell("host".into());
147 row.add_cell("type".into());
148 row.add_cell("state".into());
149 row.add_cell("parallelism".into());
150 row.add_cell("is_streaming".into());
151 row.add_cell("is_serving".into());
152 row.add_cell("rw_version".into());
153 row.add_cell("total_memory_bytes".into());
154 row.add_cell("total_cpu_cores".into());
155 row.add_cell("started_at".into());
156 row.add_cell("actor_count".into());
157 row
158 });
159 for worker_node in worker_nodes {
160 let mut row = Row::new();
161 row.add_cell(worker_node.id.into());
162 try_add_cell(
163 &mut row,
164 worker_node
165 .host
166 .as_ref()
167 .map(|h| format!("{}:{}", h.host, h.port)),
168 );
169 try_add_cell(
170 &mut row,
171 worker_node.get_type().ok().map(|t| t.as_str_name()),
172 );
173 try_add_cell(
174 &mut row,
175 worker_node.get_state().ok().map(|s| s.as_str_name()),
176 );
177 try_add_cell(&mut row, worker_node.parallelism());
178 try_add_cell(
179 &mut row,
180 worker_node.property.as_ref().map(|p| p.is_streaming),
181 );
182 try_add_cell(
183 &mut row,
184 worker_node.property.as_ref().map(|p| p.is_serving),
185 );
186 try_add_cell(
187 &mut row,
188 worker_node.resource.as_ref().map(|r| r.rw_version.clone()),
189 );
190 try_add_cell(
191 &mut row,
192 worker_node.resource.as_ref().map(|r| r.total_memory_bytes),
193 );
194 try_add_cell(
195 &mut row,
196 worker_node.resource.as_ref().map(|r| r.total_cpu_cores),
197 );
198 try_add_cell(
199 &mut row,
200 worker_node
201 .started_at
202 .and_then(|ts| Timestamptz::from_secs(ts as _).map(|t| t.to_string())),
203 );
204 let actor_count = {
205 if let Ok(t) = worker_node.get_type()
206 && t != WorkerType::ComputeNode
207 {
208 None
209 } else {
210 match worker_actor_count.get(&(worker_node.id as _)) {
211 None => Some(0),
212 Some(c) => Some(*c),
213 }
214 }
215 };
216 try_add_cell(&mut row, actor_count);
217 table.add_row(row);
218 }
219 let _ = writeln!(s, "{table}");
220 }
221
222 #[cfg_attr(coverage, coverage(off))]
223 fn write_event_logs(&self, s: &mut String) {
224 let event_logs = self
225 .event_log_manager
226 .list_event_logs()
227 .into_iter()
228 .sorted_by(|a, b| {
229 a.timestamp
230 .unwrap_or(0)
231 .cmp(&b.timestamp.unwrap_or(0))
232 .reverse()
233 })
234 .collect_vec();
235
236 let _ = writeln!(s, "latest barrier completions");
237 Self::write_event_logs_impl(
238 s,
239 event_logs.iter(),
240 |e| {
241 let Event::BarrierComplete(info) = e else {
242 return None;
243 };
244 Some(json!(info).to_string())
245 },
246 Some(10),
247 );
248
249 let _ = writeln!(s);
250 let _ = writeln!(s, "latest barrier collection failures");
251 Self::write_event_logs_impl(
252 s,
253 event_logs.iter(),
254 |e| {
255 let Event::CollectBarrierFail(info) = e else {
256 return None;
257 };
258 Some(json!(info).to_string())
259 },
260 Some(3),
261 );
262
263 let _ = writeln!(s);
264 let _ = writeln!(s, "latest barrier injection failures");
265 Self::write_event_logs_impl(
266 s,
267 event_logs.iter(),
268 |e| {
269 let Event::InjectBarrierFail(info) = e else {
270 return None;
271 };
272 Some(json!(info).to_string())
273 },
274 Some(3),
275 );
276
277 let _ = writeln!(s);
278 let _ = writeln!(s, "latest worker node panics");
279 Self::write_event_logs_impl(
280 s,
281 event_logs.iter(),
282 |e| {
283 let Event::WorkerNodePanic(info) = e else {
284 return None;
285 };
286 Some(json!(info).to_string())
287 },
288 Some(10),
289 );
290
291 let _ = writeln!(s);
292 let _ = writeln!(s, "latest create stream job failures");
293 Self::write_event_logs_impl(
294 s,
295 event_logs.iter(),
296 |e| {
297 let Event::CreateStreamJobFail(info) = e else {
298 return None;
299 };
300 Some(json!(info).to_string())
301 },
302 Some(3),
303 );
304
305 let _ = writeln!(s);
306 let _ = writeln!(s, "latest dirty stream job clear-ups");
307 Self::write_event_logs_impl(
308 s,
309 event_logs.iter(),
310 |e| {
311 let Event::DirtyStreamJobClear(info) = e else {
312 return None;
313 };
314 Some(json!(info).to_string())
315 },
316 Some(3),
317 );
318 }
319
320 #[cfg_attr(coverage, coverage(off))]
321 fn write_event_logs_impl<'a, F>(
322 s: &mut String,
323 event_logs: impl Iterator<Item = &'a EventLog>,
324 get_event_info: F,
325 limit: Option<usize>,
326 ) where
327 F: Fn(&Event) -> Option<String>,
328 {
329 use comfy_table::{Row, Table};
330 let mut table = Table::new();
331 table.set_header({
332 let mut row = Row::new();
333 row.add_cell("created_at".into());
334 row.add_cell("info".into());
335 row
336 });
337 let mut row_count = 0;
338 for event_log in event_logs {
339 let Some(ref inner) = event_log.event else {
340 continue;
341 };
342 if let Some(limit) = limit
343 && row_count >= limit
344 {
345 break;
346 }
347 let mut row = Row::new();
348 let ts = event_log
349 .timestamp
350 .and_then(|ts| Timestamptz::from_millis(ts as _).map(|ts| ts.to_string()));
351 try_add_cell(&mut row, ts);
352 if let Some(info) = get_event_info(inner) {
353 row.add_cell(info.into());
354 row_count += 1;
355 } else {
356 continue;
357 }
358 table.add_row(row);
359 }
360 let _ = writeln!(s, "{table}");
361 }
362
363 #[cfg_attr(coverage, coverage(off))]
364 async fn write_storage(&self, s: &mut String) {
365 let mut sst_num = 0;
366 let mut sst_total_file_size = 0;
367 let back_pressured_compaction_groups = self
368 .hummock_manger
369 .write_limits()
370 .await
371 .into_iter()
372 .filter_map(|(k, v)| {
373 if v.table_ids.is_empty() {
374 None
375 } else {
376 Some(k)
377 }
378 })
379 .join(",");
380 if !back_pressured_compaction_groups.is_empty() {
381 let _ = writeln!(
382 s,
383 "back pressured compaction groups: {back_pressured_compaction_groups}"
384 );
385 }
386
387 #[derive(PartialEq, Eq)]
388 struct SstableSort {
389 compaction_group_id: u64,
390 sst_id: HummockSstableId,
391 delete_ratio: u64,
392 }
393 impl PartialOrd for SstableSort {
394 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
395 Some(self.cmp(other))
396 }
397 }
398 impl Ord for SstableSort {
399 fn cmp(&self, other: &Self) -> Ordering {
400 self.delete_ratio.cmp(&other.delete_ratio)
401 }
402 }
403 fn top_k_sstables(
404 top_k: usize,
405 heap: &mut BinaryHeap<Reverse<SstableSort>>,
406 e: SstableSort,
407 ) {
408 if heap.len() < top_k {
409 heap.push(Reverse(e));
410 } else if let Some(mut p) = heap.peek_mut() {
411 if e.delete_ratio > p.0.delete_ratio {
412 *p = Reverse(e);
413 }
414 }
415 }
416
417 let top_k = 10;
418 let mut top_tombstone_delete_sst = BinaryHeap::with_capacity(top_k);
419 let compaction_group_num = self
420 .hummock_manger
421 .on_current_version(|version| {
422 for compaction_group in version.levels.values() {
423 let mut visit_level = |level: &Level| {
424 sst_num += level.table_infos.len();
425 sst_total_file_size +=
426 level.table_infos.iter().map(|t| t.sst_size).sum::<u64>();
427 for sst in &level.table_infos {
428 if sst.total_key_count == 0 {
429 continue;
430 }
431 let tombstone_delete_ratio =
432 sst.stale_key_count * 10000 / sst.total_key_count;
433 let e = SstableSort {
434 compaction_group_id: compaction_group.group_id,
435 sst_id: sst.sst_id,
436 delete_ratio: tombstone_delete_ratio,
437 };
438 top_k_sstables(top_k, &mut top_tombstone_delete_sst, e);
439 }
440 };
441 let l0 = &compaction_group.l0;
442 for level in &l0.sub_levels {
444 visit_level(level);
445 }
446 for level in &compaction_group.levels {
447 visit_level(level);
448 }
449 }
450 version.levels.len()
451 })
452 .await;
453
454 let _ = writeln!(s, "number of SSTables: {sst_num}");
455 let _ = writeln!(s, "total size of SSTables (byte): {sst_total_file_size}");
456 let _ = writeln!(s, "number of compaction groups: {compaction_group_num}");
457 use comfy_table::{Row, Table};
458 fn format_table(heap: BinaryHeap<Reverse<SstableSort>>) -> Table {
459 let mut table = Table::new();
460 table.set_header({
461 let mut row = Row::new();
462 row.add_cell("compaction group id".into());
463 row.add_cell("sstable id".into());
464 row.add_cell("delete ratio".into());
465 row
466 });
467 for sst in &heap.into_sorted_vec() {
468 let mut row = Row::new();
469 row.add_cell(sst.0.compaction_group_id.into());
470 row.add_cell(sst.0.sst_id.into());
471 row.add_cell(format!("{:.2}%", sst.0.delete_ratio as f64 / 100.0).into());
472 table.add_row(row);
473 }
474 table
475 }
476 let _ = writeln!(s);
477 let _ = writeln!(s, "top tombstone delete ratio");
478 let _ = writeln!(s, "{}", format_table(top_tombstone_delete_sst));
479 let _ = writeln!(s);
480
481 let _ = writeln!(s);
482 self.write_storage_prometheus(s).await;
483 }
484
485 #[cfg_attr(coverage, coverage(off))]
486 async fn write_streaming_prometheus(&self, s: &mut String) {
487 let _ = writeln!(s, "top sources by throughput (rows/s)");
488 let query = format!(
489 "topk(3, sum(rate(stream_source_output_rows_counts{{{}}}[10m]))by (source_name))",
490 self.prometheus_selector
491 );
492 self.write_instant_vector_impl(s, &query, vec!["source_name"])
493 .await;
494
495 let _ = writeln!(s);
496 let _ = writeln!(s, "top materialized views by throughput (rows/s)");
497 let query = format!(
498 "topk(3, sum(rate(stream_mview_input_row_count{{{}}}[10m]))by (table_id))",
499 self.prometheus_selector
500 );
501 self.write_instant_vector_impl(s, &query, vec!["table_id"])
502 .await;
503
504 let _ = writeln!(s);
505 let _ = writeln!(s, "top join executor by matched rows");
506 let query = format!(
507 "topk(3, histogram_quantile(0.9, sum(rate(stream_join_matched_join_keys_bucket{{{}}}[10m])) by (le, fragment_id, table_id)))",
508 self.prometheus_selector
509 );
510 self.write_instant_vector_impl(s, &query, vec!["table_id", "fragment_id"])
511 .await;
512 }
513
514 #[cfg_attr(coverage, coverage(off))]
515 async fn write_storage_prometheus(&self, s: &mut String) {
516 let _ = writeln!(s, "top Hummock Get by duration (second)");
517 let query = format!(
518 "topk(3, histogram_quantile(0.9, sum(rate(state_store_get_duration_bucket{{{}}}[10m])) by (le, table_id)))",
519 self.prometheus_selector
520 );
521 self.write_instant_vector_impl(s, &query, vec!["table_id"])
522 .await;
523
524 let _ = writeln!(s);
525 let _ = writeln!(s, "top Hummock Iter Init by duration (second)");
526 let query = format!(
527 "topk(3, histogram_quantile(0.9, sum(rate(state_store_iter_init_duration_bucket{{{}}}[10m])) by (le, table_id)))",
528 self.prometheus_selector
529 );
530 self.write_instant_vector_impl(s, &query, vec!["table_id"])
531 .await;
532
533 let _ = writeln!(s);
534 let _ = writeln!(s, "top table commit flush by size (byte)");
535 let query = format!(
536 "topk(3, sum(rate(storage_commit_write_throughput{{{}}}[10m])) by (table_id))",
537 self.prometheus_selector
538 );
539 self.write_instant_vector_impl(s, &query, vec!["table_id"])
540 .await;
541
542 let _ = writeln!(s);
543 let _ = writeln!(s, "object store read throughput (bytes/s)");
544 let query = format!(
545 "sum(rate(object_store_read_bytes{{{}}}[10m])) by (job, instance)",
546 merge_prometheus_selector([&self.prometheus_selector, "job=~\"compute|compactor\""])
547 );
548 self.write_instant_vector_impl(s, &query, vec!["job", "instance"])
549 .await;
550
551 let _ = writeln!(s);
552 let _ = writeln!(s, "object store write throughput (bytes/s)");
553 let query = format!(
554 "sum(rate(object_store_write_bytes{{{}}}[10m])) by (job, instance)",
555 merge_prometheus_selector([&self.prometheus_selector, "job=~\"compute|compactor\""])
556 );
557 self.write_instant_vector_impl(s, &query, vec!["job", "instance"])
558 .await;
559
560 let _ = writeln!(s);
561 let _ = writeln!(s, "object store operation rate");
562 let query = format!(
563 "sum(rate(object_store_operation_latency_count{{{}}}[10m])) by (le, type, job, instance)",
564 merge_prometheus_selector([
565 &self.prometheus_selector,
566 "job=~\"compute|compactor\", type!~\"streaming_upload_write_bytes|streaming_read_read_bytes|streaming_read\""
567 ])
568 );
569 self.write_instant_vector_impl(s, &query, vec!["type", "job", "instance"])
570 .await;
571
572 let _ = writeln!(s);
573 let _ = writeln!(s, "object store operation duration (second)");
574 let query = format!(
575 "histogram_quantile(0.9, sum(rate(object_store_operation_latency_bucket{{{}}}[10m])) by (le, type, job, instance))",
576 merge_prometheus_selector([
577 &self.prometheus_selector,
578 "job=~\"compute|compactor\", type!~\"streaming_upload_write_bytes|streaming_read\""
579 ])
580 );
581 self.write_instant_vector_impl(s, &query, vec!["type", "job", "instance"])
582 .await;
583 }
584
585 #[cfg_attr(coverage, coverage(off))]
586 async fn write_instant_vector_impl(&self, s: &mut String, query: &str, labels: Vec<&str>) {
587 let Some(ref client) = self.prometheus_client else {
588 return;
589 };
590 if let Ok(Vector(instant_vec)) = client
591 .query(query)
592 .get()
593 .await
594 .map(|result| result.into_inner().0)
595 {
596 for i in instant_vec {
597 let l = labels
598 .iter()
599 .map(|label| {
600 format!(
601 "{}={}",
602 *label,
603 i.metric()
604 .get(*label)
605 .map(|s| s.as_str())
606 .unwrap_or_default()
607 )
608 })
609 .join(",");
610 let _ = writeln!(s, "{}: {:.3}", l, i.sample().value());
611 }
612 }
613 }
614
615 #[cfg_attr(coverage, coverage(off))]
616 async fn write_await_tree(&self, s: &mut String, actor_traces_format: ActorTracesFormat) {
617 let Ok(worker_nodes) = self
619 .metadata_manager
620 .list_worker_node(Some(WorkerType::ComputeNode), None)
621 .await
622 else {
623 tracing::warn!("failed to get worker nodes");
624 return;
625 };
626
627 let mut all = StackTraceResponse::default();
628
629 let compute_clients = ComputeClientPool::adhoc();
630 for worker_node in &worker_nodes {
631 if let Ok(client) = compute_clients.get(worker_node).await
632 && let Ok(result) = client
633 .stack_trace(StackTraceRequest {
634 actor_traces_format: actor_traces_format as i32,
635 })
636 .await
637 {
638 all.merge_other(result);
639 }
640 }
641
642 write!(s, "{}", all.output()).unwrap();
643 }
644
645 async fn write_table_definition(&self, s: &mut String) -> MetaResult<()> {
646 let sources = self
647 .metadata_manager
648 .catalog_controller
649 .list_sources()
650 .await?
651 .into_iter()
652 .map(|s| (s.id, (s.name, s.schema_id, s.definition)))
653 .collect::<BTreeMap<_, _>>();
654 let tables = self
655 .metadata_manager
656 .catalog_controller
657 .list_tables_by_type(TableType::Table)
658 .await?
659 .into_iter()
660 .map(|t| (t.id, (t.name, t.schema_id, t.definition)))
661 .collect::<BTreeMap<_, _>>();
662 let mvs = self
663 .metadata_manager
664 .catalog_controller
665 .list_tables_by_type(TableType::MaterializedView)
666 .await?
667 .into_iter()
668 .map(|t| (t.id, (t.name, t.schema_id, t.definition)))
669 .collect::<BTreeMap<_, _>>();
670 let indexes = self
671 .metadata_manager
672 .catalog_controller
673 .list_tables_by_type(TableType::Index)
674 .await?
675 .into_iter()
676 .map(|t| (t.id, (t.name, t.schema_id, t.definition)))
677 .collect::<BTreeMap<_, _>>();
678 let sinks = self
679 .metadata_manager
680 .catalog_controller
681 .list_sinks()
682 .await?
683 .into_iter()
684 .map(|s| (s.id, (s.name, s.schema_id, s.definition)))
685 .collect::<BTreeMap<_, _>>();
686 let catalogs = [
687 ("SOURCE", sources),
688 ("TABLE", tables),
689 ("MATERIALIZED VIEW", mvs),
690 ("INDEX", indexes),
691 ("SINK", sinks),
692 ];
693 let mut obj_id_to_name = HashMap::new();
694 for (title, items) in catalogs {
695 use comfy_table::{Row, Table};
696 let mut table = Table::new();
697 table.set_header({
698 let mut row = Row::new();
699 row.add_cell("id".into());
700 row.add_cell("name".into());
701 row.add_cell("schema_id".into());
702 row.add_cell("definition".into());
703 row
704 });
705 for (id, (name, schema_id, definition)) in items {
706 obj_id_to_name.insert(id, name.clone());
707 let mut row = Row::new();
708 let may_redact =
709 redact_all_sql_options(&definition).unwrap_or_else(|| "[REDACTED]".into());
710 row.add_cell(id.into());
711 row.add_cell(name.into());
712 row.add_cell(schema_id.into());
713 row.add_cell(may_redact.into());
714 table.add_row(row);
715 }
716 let _ = writeln!(s);
717 let _ = writeln!(s, "{title}");
718 let _ = writeln!(s, "{table}");
719 }
720
721 let actors = self
722 .metadata_manager
723 .catalog_controller
724 .list_actor_info()
725 .await?
726 .into_iter()
727 .map(|(actor_id, fragment_id, job_id, schema_id, obj_type)| {
728 (
729 actor_id,
730 (
731 fragment_id,
732 job_id,
733 schema_id,
734 obj_type,
735 obj_id_to_name
736 .get(&(job_id as _))
737 .cloned()
738 .unwrap_or_default(),
739 ),
740 )
741 })
742 .collect::<BTreeMap<_, _>>();
743
744 use comfy_table::{Row, Table};
745 let mut table = Table::new();
746 table.set_header({
747 let mut row = Row::new();
748 row.add_cell("id".into());
749 row.add_cell("fragment_id".into());
750 row.add_cell("job_id".into());
751 row.add_cell("schema_id".into());
752 row.add_cell("type".into());
753 row.add_cell("name".into());
754 row
755 });
756 for (actor_id, (fragment_id, job_id, schema_id, ddl_type, name)) in actors {
757 let mut row = Row::new();
758 row.add_cell(actor_id.into());
759 row.add_cell(fragment_id.into());
760 row.add_cell(job_id.into());
761 row.add_cell(schema_id.into());
762 row.add_cell(ddl_type.as_str().into());
763 row.add_cell(name.into());
764 table.add_row(row);
765 }
766 let _ = writeln!(s);
767 let _ = writeln!(s, "ACTOR");
768 let _ = writeln!(s, "{table}");
769 Ok(())
770 }
771}
772
773#[cfg_attr(coverage, coverage(off))]
774fn try_add_cell<T: Into<comfy_table::Cell>>(row: &mut comfy_table::Row, t: Option<T>) {
775 match t {
776 Some(t) => {
777 row.add_cell(t.into());
778 }
779 None => {
780 row.add_cell("".into());
781 }
782 }
783}
784
785#[cfg_attr(coverage, coverage(off))]
786fn merge_prometheus_selector<'a>(selectors: impl IntoIterator<Item = &'a str>) -> String {
787 selectors.into_iter().filter(|s| !s.is_empty()).join(",")
788}
789
790fn redact_all_sql_options(sql: &str) -> Option<String> {
791 let Ok(mut statements) = Parser::parse_sql(sql) else {
792 return None;
793 };
794 let mut redacted = String::new();
795 for statement in &mut statements {
796 let options = match statement {
797 Statement::CreateTable {
798 with_options,
799 format_encode,
800 ..
801 } => {
802 let format_encode = match format_encode {
803 Some(CompatibleFormatEncode::V2(cs)) => Some(&mut cs.row_options),
804 _ => None,
805 };
806 (Some(with_options), format_encode)
807 }
808 Statement::CreateSource { stmt } => {
809 let format_encode = match &mut stmt.format_encode {
810 CompatibleFormatEncode::V2(cs) => Some(&mut cs.row_options),
811 _ => None,
812 };
813 (Some(&mut stmt.with_properties.0), format_encode)
814 }
815 Statement::CreateSink { stmt } => {
816 let format_encode = match &mut stmt.sink_schema {
817 Some(cs) => Some(&mut cs.row_options),
818 _ => None,
819 };
820 (Some(&mut stmt.with_properties.0), format_encode)
821 }
822 _ => (None, None),
823 };
824 if let Some(options) = options.0 {
825 for option in options {
826 option.value = Value::SingleQuotedString("[REDACTED]".into()).into();
827 }
828 }
829 if let Some(options) = options.1 {
830 for option in options {
831 option.value = Value::SingleQuotedString("[REDACTED]".into()).into();
832 }
833 }
834 writeln!(&mut redacted, "{statement}").unwrap();
835 }
836 Some(redacted)
837}