1use std::cmp::{Ordering, Reverse};
16use std::collections::{BTreeMap, BinaryHeap, HashMap};
17use std::fmt::Write;
18use std::sync::Arc;
19
20use itertools::Itertools;
21use prometheus_http_query::response::Data::Vector;
22use risingwave_common::types::Timestamptz;
23use risingwave_common::util::StackTraceResponseExt;
24use risingwave_hummock_sdk::level::Level;
25use risingwave_meta_model::table::TableType;
26use risingwave_pb::common::WorkerType;
27use risingwave_pb::meta::EventLog;
28use risingwave_pb::meta::event_log::Event;
29use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
30use risingwave_pb::monitor_service::{StackTraceRequest, StackTraceResponse};
31use risingwave_rpc_client::ComputeClientPool;
32use risingwave_sqlparser::ast::{CompatibleFormatEncode, Statement, Value};
33use risingwave_sqlparser::parser::Parser;
34use serde_json::json;
35use thiserror_ext::AsReport;
36
37use crate::MetaResult;
38use crate::hummock::HummockManagerRef;
39use crate::manager::MetadataManager;
40use crate::manager::event_log::EventLogManagerRef;
41
42pub type DiagnoseCommandRef = Arc<DiagnoseCommand>;
43
44pub struct DiagnoseCommand {
45 metadata_manager: MetadataManager,
46 hummock_manger: HummockManagerRef,
47 event_log_manager: EventLogManagerRef,
48 prometheus_client: Option<prometheus_http_query::Client>,
49 prometheus_selector: String,
50}
51
52impl DiagnoseCommand {
53 pub fn new(
54 metadata_manager: MetadataManager,
55 hummock_manger: HummockManagerRef,
56 event_log_manager: EventLogManagerRef,
57 prometheus_client: Option<prometheus_http_query::Client>,
58 prometheus_selector: String,
59 ) -> Self {
60 Self {
61 metadata_manager,
62 hummock_manger,
63 event_log_manager,
64 prometheus_client,
65 prometheus_selector,
66 }
67 }
68
69 #[cfg_attr(coverage, coverage(off))]
70 pub async fn report(&self, actor_traces_format: ActorTracesFormat) -> String {
71 let mut report = String::new();
72 let _ = writeln!(
73 report,
74 "report created at: {}\nversion: {}",
75 chrono::DateTime::<chrono::offset::Utc>::from(std::time::SystemTime::now()),
76 risingwave_common::current_cluster_version(),
77 );
78 let _ = writeln!(report);
79 self.write_catalog(&mut report).await;
80 let _ = writeln!(report);
81 self.write_worker_nodes(&mut report).await;
82 let _ = writeln!(report);
83 self.write_streaming_prometheus(&mut report).await;
84 let _ = writeln!(report);
85 self.write_storage(&mut report).await;
86 let _ = writeln!(report);
87 self.write_await_tree(&mut report, actor_traces_format)
88 .await;
89 let _ = writeln!(report);
90 self.write_event_logs(&mut report);
91 report
92 }
93
94 #[cfg_attr(coverage, coverage(off))]
95 async fn write_catalog(&self, s: &mut String) {
96 self.write_catalog_inner(s).await;
97 let _ = self.write_table_definition(s).await.inspect_err(|e| {
98 tracing::warn!(
99 error = e.to_report_string(),
100 "failed to display table definition"
101 )
102 });
103 }
104
105 #[cfg_attr(coverage, coverage(off))]
106 async fn write_catalog_inner(&self, s: &mut String) {
107 let guard = self
108 .metadata_manager
109 .catalog_controller
110 .get_inner_read_guard()
111 .await;
112 let stat = match guard.stats().await {
113 Ok(stat) => stat,
114 Err(err) => {
115 tracing::warn!(error=?err.as_report(), "failed to get catalog stats");
116 return;
117 }
118 };
119 let _ = writeln!(s, "number of fragment: {}", stat.streaming_job_num);
120 let _ = writeln!(s, "number of actor: {}", stat.actor_num);
121 let _ = writeln!(s, "number of source: {}", stat.source_num);
122 let _ = writeln!(s, "number of table: {}", stat.table_num);
123 let _ = writeln!(s, "number of materialized view: {}", stat.mview_num);
124 let _ = writeln!(s, "number of sink: {}", stat.sink_num);
125 let _ = writeln!(s, "number of index: {}", stat.index_num);
126 let _ = writeln!(s, "number of function: {}", stat.function_num);
127 }
128
129 #[cfg_attr(coverage, coverage(off))]
130 async fn write_worker_nodes(&self, s: &mut String) {
131 let Ok(worker_actor_count) = self.metadata_manager.worker_actor_count().await else {
132 tracing::warn!("failed to get worker actor count");
133 return;
134 };
135
136 use comfy_table::{Row, Table};
137 let Ok(worker_nodes) = self.metadata_manager.list_worker_node(None, None).await else {
138 tracing::warn!("failed to get worker nodes");
139 return;
140 };
141 let mut table = Table::new();
142 table.set_header({
143 let mut row = Row::new();
144 row.add_cell("id".into());
145 row.add_cell("host".into());
146 row.add_cell("type".into());
147 row.add_cell("state".into());
148 row.add_cell("parallelism".into());
149 row.add_cell("is_streaming".into());
150 row.add_cell("is_serving".into());
151 row.add_cell("rw_version".into());
152 row.add_cell("total_memory_bytes".into());
153 row.add_cell("total_cpu_cores".into());
154 row.add_cell("started_at".into());
155 row.add_cell("actor_count".into());
156 row
157 });
158 for worker_node in worker_nodes {
159 let mut row = Row::new();
160 row.add_cell(worker_node.id.into());
161 try_add_cell(
162 &mut row,
163 worker_node
164 .host
165 .as_ref()
166 .map(|h| format!("{}:{}", h.host, h.port)),
167 );
168 try_add_cell(
169 &mut row,
170 worker_node.get_type().ok().map(|t| t.as_str_name()),
171 );
172 try_add_cell(
173 &mut row,
174 worker_node.get_state().ok().map(|s| s.as_str_name()),
175 );
176 try_add_cell(&mut row, worker_node.parallelism());
177 try_add_cell(
178 &mut row,
179 worker_node.property.as_ref().map(|p| p.is_streaming),
180 );
181 try_add_cell(
182 &mut row,
183 worker_node.property.as_ref().map(|p| p.is_serving),
184 );
185 try_add_cell(
186 &mut row,
187 worker_node.resource.as_ref().map(|r| r.rw_version.clone()),
188 );
189 try_add_cell(
190 &mut row,
191 worker_node.resource.as_ref().map(|r| r.total_memory_bytes),
192 );
193 try_add_cell(
194 &mut row,
195 worker_node.resource.as_ref().map(|r| r.total_cpu_cores),
196 );
197 try_add_cell(
198 &mut row,
199 worker_node
200 .started_at
201 .and_then(|ts| Timestamptz::from_secs(ts as _).map(|t| t.to_string())),
202 );
203 let actor_count = {
204 if let Ok(t) = worker_node.get_type()
205 && t != WorkerType::ComputeNode
206 {
207 None
208 } else {
209 match worker_actor_count.get(&(worker_node.id as _)) {
210 None => Some(0),
211 Some(c) => Some(*c),
212 }
213 }
214 };
215 try_add_cell(&mut row, actor_count);
216 table.add_row(row);
217 }
218 let _ = writeln!(s, "{table}");
219 }
220
221 #[cfg_attr(coverage, coverage(off))]
222 fn write_event_logs(&self, s: &mut String) {
223 let event_logs = self
224 .event_log_manager
225 .list_event_logs()
226 .into_iter()
227 .sorted_by(|a, b| {
228 a.timestamp
229 .unwrap_or(0)
230 .cmp(&b.timestamp.unwrap_or(0))
231 .reverse()
232 })
233 .collect_vec();
234
235 let _ = writeln!(s, "latest barrier completions");
236 Self::write_event_logs_impl(
237 s,
238 event_logs.iter(),
239 |e| {
240 let Event::BarrierComplete(info) = e else {
241 return None;
242 };
243 Some(json!(info).to_string())
244 },
245 Some(10),
246 );
247
248 let _ = writeln!(s);
249 let _ = writeln!(s, "latest barrier collection failures");
250 Self::write_event_logs_impl(
251 s,
252 event_logs.iter(),
253 |e| {
254 let Event::CollectBarrierFail(info) = e else {
255 return None;
256 };
257 Some(json!(info).to_string())
258 },
259 Some(3),
260 );
261
262 let _ = writeln!(s);
263 let _ = writeln!(s, "latest barrier injection failures");
264 Self::write_event_logs_impl(
265 s,
266 event_logs.iter(),
267 |e| {
268 let Event::InjectBarrierFail(info) = e else {
269 return None;
270 };
271 Some(json!(info).to_string())
272 },
273 Some(3),
274 );
275
276 let _ = writeln!(s);
277 let _ = writeln!(s, "latest worker node panics");
278 Self::write_event_logs_impl(
279 s,
280 event_logs.iter(),
281 |e| {
282 let Event::WorkerNodePanic(info) = e else {
283 return None;
284 };
285 Some(json!(info).to_string())
286 },
287 Some(10),
288 );
289
290 let _ = writeln!(s);
291 let _ = writeln!(s, "latest create stream job failures");
292 Self::write_event_logs_impl(
293 s,
294 event_logs.iter(),
295 |e| {
296 let Event::CreateStreamJobFail(info) = e else {
297 return None;
298 };
299 Some(json!(info).to_string())
300 },
301 Some(3),
302 );
303
304 let _ = writeln!(s);
305 let _ = writeln!(s, "latest dirty stream job clear-ups");
306 Self::write_event_logs_impl(
307 s,
308 event_logs.iter(),
309 |e| {
310 let Event::DirtyStreamJobClear(info) = e else {
311 return None;
312 };
313 Some(json!(info).to_string())
314 },
315 Some(3),
316 );
317 }
318
319 #[cfg_attr(coverage, coverage(off))]
320 fn write_event_logs_impl<'a, F>(
321 s: &mut String,
322 event_logs: impl Iterator<Item = &'a EventLog>,
323 get_event_info: F,
324 limit: Option<usize>,
325 ) where
326 F: Fn(&Event) -> Option<String>,
327 {
328 use comfy_table::{Row, Table};
329 let mut table = Table::new();
330 table.set_header({
331 let mut row = Row::new();
332 row.add_cell("created_at".into());
333 row.add_cell("info".into());
334 row
335 });
336 let mut row_count = 0;
337 for event_log in event_logs {
338 let Some(ref inner) = event_log.event else {
339 continue;
340 };
341 if let Some(limit) = limit
342 && row_count >= limit
343 {
344 break;
345 }
346 let mut row = Row::new();
347 let ts = event_log
348 .timestamp
349 .and_then(|ts| Timestamptz::from_millis(ts as _).map(|ts| ts.to_string()));
350 try_add_cell(&mut row, ts);
351 if let Some(info) = get_event_info(inner) {
352 row.add_cell(info.into());
353 row_count += 1;
354 } else {
355 continue;
356 }
357 table.add_row(row);
358 }
359 let _ = writeln!(s, "{table}");
360 }
361
362 #[cfg_attr(coverage, coverage(off))]
363 async fn write_storage(&self, s: &mut String) {
364 let mut sst_num = 0;
365 let mut sst_total_file_size = 0;
366 let back_pressured_compaction_groups = self
367 .hummock_manger
368 .write_limits()
369 .await
370 .into_iter()
371 .filter_map(|(k, v)| {
372 if v.table_ids.is_empty() {
373 None
374 } else {
375 Some(k)
376 }
377 })
378 .join(",");
379 if !back_pressured_compaction_groups.is_empty() {
380 let _ = writeln!(
381 s,
382 "back pressured compaction groups: {back_pressured_compaction_groups}"
383 );
384 }
385
386 #[derive(PartialEq, Eq)]
387 struct SstableSort {
388 compaction_group_id: u64,
389 sst_id: u64,
390 delete_ratio: u64,
391 }
392 impl PartialOrd for SstableSort {
393 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
394 Some(self.cmp(other))
395 }
396 }
397 impl Ord for SstableSort {
398 fn cmp(&self, other: &Self) -> Ordering {
399 self.delete_ratio.cmp(&other.delete_ratio)
400 }
401 }
402 fn top_k_sstables(
403 top_k: usize,
404 heap: &mut BinaryHeap<Reverse<SstableSort>>,
405 e: SstableSort,
406 ) {
407 if heap.len() < top_k {
408 heap.push(Reverse(e));
409 } else if let Some(mut p) = heap.peek_mut() {
410 if e.delete_ratio > p.0.delete_ratio {
411 *p = Reverse(e);
412 }
413 }
414 }
415
416 let top_k = 10;
417 let mut top_tombstone_delete_sst = BinaryHeap::with_capacity(top_k);
418 let compaction_group_num = self
419 .hummock_manger
420 .on_current_version(|version| {
421 for compaction_group in version.levels.values() {
422 let mut visit_level = |level: &Level| {
423 sst_num += level.table_infos.len();
424 sst_total_file_size +=
425 level.table_infos.iter().map(|t| t.sst_size).sum::<u64>();
426 for sst in &level.table_infos {
427 if sst.total_key_count == 0 {
428 continue;
429 }
430 let tombstone_delete_ratio =
431 sst.stale_key_count * 10000 / sst.total_key_count;
432 let e = SstableSort {
433 compaction_group_id: compaction_group.group_id,
434 sst_id: sst.sst_id,
435 delete_ratio: tombstone_delete_ratio,
436 };
437 top_k_sstables(top_k, &mut top_tombstone_delete_sst, e);
438 }
439 };
440 let l0 = &compaction_group.l0;
441 for level in &l0.sub_levels {
443 visit_level(level);
444 }
445 for level in &compaction_group.levels {
446 visit_level(level);
447 }
448 }
449 version.levels.len()
450 })
451 .await;
452
453 let _ = writeln!(s, "number of SSTables: {sst_num}");
454 let _ = writeln!(s, "total size of SSTables (byte): {sst_total_file_size}");
455 let _ = writeln!(s, "number of compaction groups: {compaction_group_num}");
456 use comfy_table::{Row, Table};
457 fn format_table(heap: BinaryHeap<Reverse<SstableSort>>) -> Table {
458 let mut table = Table::new();
459 table.set_header({
460 let mut row = Row::new();
461 row.add_cell("compaction group id".into());
462 row.add_cell("sstable id".into());
463 row.add_cell("delete ratio".into());
464 row
465 });
466 for sst in &heap.into_sorted_vec() {
467 let mut row = Row::new();
468 row.add_cell(sst.0.compaction_group_id.into());
469 row.add_cell(sst.0.sst_id.into());
470 row.add_cell(format!("{:.2}%", sst.0.delete_ratio as f64 / 100.0).into());
471 table.add_row(row);
472 }
473 table
474 }
475 let _ = writeln!(s);
476 let _ = writeln!(s, "top tombstone delete ratio");
477 let _ = writeln!(s, "{}", format_table(top_tombstone_delete_sst));
478 let _ = writeln!(s);
479
480 let _ = writeln!(s);
481 self.write_storage_prometheus(s).await;
482 }
483
484 #[cfg_attr(coverage, coverage(off))]
485 async fn write_streaming_prometheus(&self, s: &mut String) {
486 let _ = writeln!(s, "top sources by throughput (rows/s)");
487 let query = format!(
488 "topk(3, sum(rate(stream_source_output_rows_counts{{{}}}[10m]))by (source_name))",
489 self.prometheus_selector
490 );
491 self.write_instant_vector_impl(s, &query, vec!["source_name"])
492 .await;
493
494 let _ = writeln!(s);
495 let _ = writeln!(s, "top materialized views by throughput (rows/s)");
496 let query = format!(
497 "topk(3, sum(rate(stream_mview_input_row_count{{{}}}[10m]))by (table_id))",
498 self.prometheus_selector
499 );
500 self.write_instant_vector_impl(s, &query, vec!["table_id"])
501 .await;
502
503 let _ = writeln!(s);
504 let _ = writeln!(s, "top join executor by matched rows");
505 let query = format!(
506 "topk(3, histogram_quantile(0.9, sum(rate(stream_join_matched_join_keys_bucket{{{}}}[10m])) by (le, fragment_id, table_id)))",
507 self.prometheus_selector
508 );
509 self.write_instant_vector_impl(s, &query, vec!["table_id", "fragment_id"])
510 .await;
511 }
512
513 #[cfg_attr(coverage, coverage(off))]
514 async fn write_storage_prometheus(&self, s: &mut String) {
515 let _ = writeln!(s, "top Hummock Get by duration (second)");
516 let query = format!(
517 "topk(3, histogram_quantile(0.9, sum(rate(state_store_get_duration_bucket{{{}}}[10m])) by (le, table_id)))",
518 self.prometheus_selector
519 );
520 self.write_instant_vector_impl(s, &query, vec!["table_id"])
521 .await;
522
523 let _ = writeln!(s);
524 let _ = writeln!(s, "top Hummock Iter Init by duration (second)");
525 let query = format!(
526 "topk(3, histogram_quantile(0.9, sum(rate(state_store_iter_init_duration_bucket{{{}}}[10m])) by (le, table_id)))",
527 self.prometheus_selector
528 );
529 self.write_instant_vector_impl(s, &query, vec!["table_id"])
530 .await;
531
532 let _ = writeln!(s);
533 let _ = writeln!(s, "top table commit flush by size (byte)");
534 let query = format!(
535 "topk(3, sum(rate(storage_commit_write_throughput{{{}}}[10m])) by (table_id))",
536 self.prometheus_selector
537 );
538 self.write_instant_vector_impl(s, &query, vec!["table_id"])
539 .await;
540
541 let _ = writeln!(s);
542 let _ = writeln!(s, "object store read throughput (bytes/s)");
543 let query = format!(
544 "sum(rate(object_store_read_bytes{{{}}}[10m])) by (job, instance)",
545 merge_prometheus_selector([&self.prometheus_selector, "job=~\"compute|compactor\""])
546 );
547 self.write_instant_vector_impl(s, &query, vec!["job", "instance"])
548 .await;
549
550 let _ = writeln!(s);
551 let _ = writeln!(s, "object store write throughput (bytes/s)");
552 let query = format!(
553 "sum(rate(object_store_write_bytes{{{}}}[10m])) by (job, instance)",
554 merge_prometheus_selector([&self.prometheus_selector, "job=~\"compute|compactor\""])
555 );
556 self.write_instant_vector_impl(s, &query, vec!["job", "instance"])
557 .await;
558
559 let _ = writeln!(s);
560 let _ = writeln!(s, "object store operation rate");
561 let query = format!(
562 "sum(rate(object_store_operation_latency_count{{{}}}[10m])) by (le, type, job, instance)",
563 merge_prometheus_selector([
564 &self.prometheus_selector,
565 "job=~\"compute|compactor\", type!~\"streaming_upload_write_bytes|streaming_read_read_bytes|streaming_read\""
566 ])
567 );
568 self.write_instant_vector_impl(s, &query, vec!["type", "job", "instance"])
569 .await;
570
571 let _ = writeln!(s);
572 let _ = writeln!(s, "object store operation duration (second)");
573 let query = format!(
574 "histogram_quantile(0.9, sum(rate(object_store_operation_latency_bucket{{{}}}[10m])) by (le, type, job, instance))",
575 merge_prometheus_selector([
576 &self.prometheus_selector,
577 "job=~\"compute|compactor\", type!~\"streaming_upload_write_bytes|streaming_read\""
578 ])
579 );
580 self.write_instant_vector_impl(s, &query, vec!["type", "job", "instance"])
581 .await;
582 }
583
584 #[cfg_attr(coverage, coverage(off))]
585 async fn write_instant_vector_impl(&self, s: &mut String, query: &str, labels: Vec<&str>) {
586 let Some(ref client) = self.prometheus_client else {
587 return;
588 };
589 if let Ok(Vector(instant_vec)) = client
590 .query(query)
591 .get()
592 .await
593 .map(|result| result.into_inner().0)
594 {
595 for i in instant_vec {
596 let l = labels
597 .iter()
598 .map(|label| {
599 format!(
600 "{}={}",
601 *label,
602 i.metric()
603 .get(*label)
604 .map(|s| s.as_str())
605 .unwrap_or_default()
606 )
607 })
608 .join(",");
609 let _ = writeln!(s, "{}: {:.3}", l, i.sample().value());
610 }
611 }
612 }
613
614 #[cfg_attr(coverage, coverage(off))]
615 async fn write_await_tree(&self, s: &mut String, actor_traces_format: ActorTracesFormat) {
616 let Ok(worker_nodes) = self
618 .metadata_manager
619 .list_worker_node(Some(WorkerType::ComputeNode), None)
620 .await
621 else {
622 tracing::warn!("failed to get worker nodes");
623 return;
624 };
625
626 let mut all = StackTraceResponse::default();
627
628 let compute_clients = ComputeClientPool::adhoc();
629 for worker_node in &worker_nodes {
630 if let Ok(client) = compute_clients.get(worker_node).await
631 && let Ok(result) = client
632 .stack_trace(StackTraceRequest {
633 actor_traces_format: actor_traces_format as i32,
634 })
635 .await
636 {
637 all.merge_other(result);
638 }
639 }
640
641 write!(s, "{}", all.output()).unwrap();
642 }
643
644 async fn write_table_definition(&self, s: &mut String) -> MetaResult<()> {
645 let sources = self
646 .metadata_manager
647 .catalog_controller
648 .list_sources()
649 .await?
650 .into_iter()
651 .map(|s| (s.id, (s.name, s.schema_id, s.definition)))
652 .collect::<BTreeMap<_, _>>();
653 let tables = self
654 .metadata_manager
655 .catalog_controller
656 .list_tables_by_type(TableType::Table)
657 .await?
658 .into_iter()
659 .map(|t| (t.id, (t.name, t.schema_id, t.definition)))
660 .collect::<BTreeMap<_, _>>();
661 let mvs = self
662 .metadata_manager
663 .catalog_controller
664 .list_tables_by_type(TableType::MaterializedView)
665 .await?
666 .into_iter()
667 .map(|t| (t.id, (t.name, t.schema_id, t.definition)))
668 .collect::<BTreeMap<_, _>>();
669 let indexes = self
670 .metadata_manager
671 .catalog_controller
672 .list_tables_by_type(TableType::Index)
673 .await?
674 .into_iter()
675 .map(|t| (t.id, (t.name, t.schema_id, t.definition)))
676 .collect::<BTreeMap<_, _>>();
677 let sinks = self
678 .metadata_manager
679 .catalog_controller
680 .list_sinks()
681 .await?
682 .into_iter()
683 .map(|s| (s.id, (s.name, s.schema_id, s.definition)))
684 .collect::<BTreeMap<_, _>>();
685 let catalogs = [
686 ("SOURCE", sources),
687 ("TABLE", tables),
688 ("MATERIALIZED VIEW", mvs),
689 ("INDEX", indexes),
690 ("SINK", sinks),
691 ];
692 let mut obj_id_to_name = HashMap::new();
693 for (title, items) in catalogs {
694 use comfy_table::{Row, Table};
695 let mut table = Table::new();
696 table.set_header({
697 let mut row = Row::new();
698 row.add_cell("id".into());
699 row.add_cell("name".into());
700 row.add_cell("schema_id".into());
701 row.add_cell("definition".into());
702 row
703 });
704 for (id, (name, schema_id, definition)) in items {
705 obj_id_to_name.insert(id, name.clone());
706 let mut row = Row::new();
707 let may_redact =
708 redact_all_sql_options(&definition).unwrap_or_else(|| "[REDACTED]".into());
709 row.add_cell(id.into());
710 row.add_cell(name.into());
711 row.add_cell(schema_id.into());
712 row.add_cell(may_redact.into());
713 table.add_row(row);
714 }
715 let _ = writeln!(s);
716 let _ = writeln!(s, "{title}");
717 let _ = writeln!(s, "{table}");
718 }
719
720 let actors = self
721 .metadata_manager
722 .catalog_controller
723 .list_actor_info()
724 .await?
725 .into_iter()
726 .map(|(actor_id, fragment_id, job_id, schema_id, obj_type)| {
727 (
728 actor_id,
729 (
730 fragment_id,
731 job_id,
732 schema_id,
733 obj_type,
734 obj_id_to_name
735 .get(&(job_id as _))
736 .cloned()
737 .unwrap_or_default(),
738 ),
739 )
740 })
741 .collect::<BTreeMap<_, _>>();
742
743 use comfy_table::{Row, Table};
744 let mut table = Table::new();
745 table.set_header({
746 let mut row = Row::new();
747 row.add_cell("id".into());
748 row.add_cell("fragment_id".into());
749 row.add_cell("job_id".into());
750 row.add_cell("schema_id".into());
751 row.add_cell("type".into());
752 row.add_cell("name".into());
753 row
754 });
755 for (actor_id, (fragment_id, job_id, schema_id, ddl_type, name)) in actors {
756 let mut row = Row::new();
757 row.add_cell(actor_id.into());
758 row.add_cell(fragment_id.into());
759 row.add_cell(job_id.into());
760 row.add_cell(schema_id.into());
761 row.add_cell(ddl_type.as_str().into());
762 row.add_cell(name.into());
763 table.add_row(row);
764 }
765 let _ = writeln!(s);
766 let _ = writeln!(s, "ACTOR");
767 let _ = writeln!(s, "{table}");
768 Ok(())
769 }
770}
771
772#[cfg_attr(coverage, coverage(off))]
773fn try_add_cell<T: Into<comfy_table::Cell>>(row: &mut comfy_table::Row, t: Option<T>) {
774 match t {
775 Some(t) => {
776 row.add_cell(t.into());
777 }
778 None => {
779 row.add_cell("".into());
780 }
781 }
782}
783
784#[cfg_attr(coverage, coverage(off))]
785fn merge_prometheus_selector<'a>(selectors: impl IntoIterator<Item = &'a str>) -> String {
786 selectors.into_iter().filter(|s| !s.is_empty()).join(",")
787}
788
789fn redact_all_sql_options(sql: &str) -> Option<String> {
790 let Ok(mut statements) = Parser::parse_sql(sql) else {
791 return None;
792 };
793 let mut redacted = String::new();
794 for statement in &mut statements {
795 let options = match statement {
796 Statement::CreateTable {
797 with_options,
798 format_encode,
799 ..
800 } => {
801 let format_encode = match format_encode {
802 Some(CompatibleFormatEncode::V2(cs)) => Some(&mut cs.row_options),
803 _ => None,
804 };
805 (Some(with_options), format_encode)
806 }
807 Statement::CreateSource { stmt } => {
808 let format_encode = match &mut stmt.format_encode {
809 CompatibleFormatEncode::V2(cs) => Some(&mut cs.row_options),
810 _ => None,
811 };
812 (Some(&mut stmt.with_properties.0), format_encode)
813 }
814 Statement::CreateSink { stmt } => {
815 let format_encode = match &mut stmt.sink_schema {
816 Some(cs) => Some(&mut cs.row_options),
817 _ => None,
818 };
819 (Some(&mut stmt.with_properties.0), format_encode)
820 }
821 _ => (None, None),
822 };
823 if let Some(options) = options.0 {
824 for option in options {
825 option.value = Value::SingleQuotedString("[REDACTED]".into()).into();
826 }
827 }
828 if let Some(options) = options.1 {
829 for option in options {
830 option.value = Value::SingleQuotedString("[REDACTED]".into()).into();
831 }
832 }
833 writeln!(&mut redacted, "{statement}").unwrap();
834 }
835 Some(redacted)
836}