1use std::cmp::{Ordering, Reverse};
16use std::collections::{BTreeMap, BinaryHeap, HashMap};
17use std::fmt::Write;
18use std::sync::Arc;
19
20use itertools::Itertools;
21use prometheus_http_query::response::Data::Vector;
22use risingwave_common::types::Timestamptz;
23use risingwave_common::util::StackTraceResponseExt;
24use risingwave_hummock_sdk::HummockSstableId;
25use risingwave_hummock_sdk::level::Level;
26use risingwave_meta_model::table::TableType;
27use risingwave_pb::common::WorkerType;
28use risingwave_pb::meta::EventLog;
29use risingwave_pb::meta::event_log::Event;
30use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
31use risingwave_sqlparser::ast::RedactSqlOptionKeywordsRef;
32use risingwave_sqlparser::parser::Parser;
33use serde_json::json;
34use thiserror_ext::AsReport;
35
36use crate::MetaResult;
37use crate::hummock::HummockManagerRef;
38use crate::manager::MetadataManager;
39use crate::manager::event_log::EventLogManagerRef;
40use crate::rpc::await_tree::dump_cluster_await_tree;
41
42pub type DiagnoseCommandRef = Arc<DiagnoseCommand>;
43
44pub struct DiagnoseCommand {
45 metadata_manager: MetadataManager,
46 await_tree_reg: await_tree::Registry,
47 hummock_manger: HummockManagerRef,
48 event_log_manager: EventLogManagerRef,
49 prometheus_client: Option<prometheus_http_query::Client>,
50 prometheus_selector: String,
51 redact_sql_option_keywords: RedactSqlOptionKeywordsRef,
52}
53
54impl DiagnoseCommand {
55 pub fn new(
56 metadata_manager: MetadataManager,
57 await_tree_reg: await_tree::Registry,
58 hummock_manger: HummockManagerRef,
59 event_log_manager: EventLogManagerRef,
60 prometheus_client: Option<prometheus_http_query::Client>,
61 prometheus_selector: String,
62 redact_sql_option_keywords: RedactSqlOptionKeywordsRef,
63 ) -> Self {
64 Self {
65 metadata_manager,
66 await_tree_reg,
67 hummock_manger,
68 event_log_manager,
69 prometheus_client,
70 prometheus_selector,
71 redact_sql_option_keywords,
72 }
73 }
74
75 #[cfg_attr(coverage, coverage(off))]
76 pub async fn report(&self, actor_traces_format: ActorTracesFormat) -> String {
77 let mut report = String::new();
78 let _ = writeln!(
79 report,
80 "report created at: {}\nversion: {}",
81 chrono::DateTime::<chrono::offset::Utc>::from(std::time::SystemTime::now()),
82 risingwave_common::current_cluster_version(),
83 );
84 let _ = writeln!(report);
85 self.write_catalog(&mut report).await;
86 let _ = writeln!(report);
87 self.write_worker_nodes(&mut report).await;
88 let _ = writeln!(report);
89 self.write_streaming_prometheus(&mut report).await;
90 let _ = writeln!(report);
91 self.write_storage(&mut report).await;
92 let _ = writeln!(report);
93 self.write_await_tree(&mut report, actor_traces_format)
94 .await;
95 let _ = writeln!(report);
96 self.write_event_logs(&mut report);
97 report
98 }
99
100 #[cfg_attr(coverage, coverage(off))]
101 async fn write_catalog(&self, s: &mut String) {
102 self.write_catalog_inner(s).await;
103 let _ = self.write_table_definition(s).await.inspect_err(|e| {
104 tracing::warn!(
105 error = e.to_report_string(),
106 "failed to display table definition"
107 )
108 });
109 }
110
111 #[cfg_attr(coverage, coverage(off))]
112 async fn write_catalog_inner(&self, s: &mut String) {
113 let guard = self
114 .metadata_manager
115 .catalog_controller
116 .get_inner_read_guard()
117 .await;
118 let stat = match guard.stats().await {
119 Ok(stat) => stat,
120 Err(err) => {
121 tracing::warn!(error=?err.as_report(), "failed to get catalog stats");
122 return;
123 }
124 };
125 let _ = writeln!(s, "number of fragment: {}", stat.streaming_job_num);
126 let _ = writeln!(s, "number of actor: {}", stat.actor_num);
127 let _ = writeln!(s, "number of source: {}", stat.source_num);
128 let _ = writeln!(s, "number of table: {}", stat.table_num);
129 let _ = writeln!(s, "number of materialized view: {}", stat.mview_num);
130 let _ = writeln!(s, "number of sink: {}", stat.sink_num);
131 let _ = writeln!(s, "number of index: {}", stat.index_num);
132 let _ = writeln!(s, "number of function: {}", stat.function_num);
133 }
134
135 #[cfg_attr(coverage, coverage(off))]
136 async fn write_worker_nodes(&self, s: &mut String) {
137 let Ok(worker_actor_count) = self.metadata_manager.worker_actor_count().await else {
138 tracing::warn!("failed to get worker actor count");
139 return;
140 };
141
142 use comfy_table::{Row, Table};
143 let Ok(worker_nodes) = self.metadata_manager.list_worker_node(None, None).await else {
144 tracing::warn!("failed to get worker nodes");
145 return;
146 };
147 let mut table = Table::new();
148 table.set_header({
149 let mut row = Row::new();
150 row.add_cell("id".into());
151 row.add_cell("host".into());
152 row.add_cell("type".into());
153 row.add_cell("state".into());
154 row.add_cell("parallelism".into());
155 row.add_cell("is_streaming".into());
156 row.add_cell("is_serving".into());
157 row.add_cell("rw_version".into());
158 row.add_cell("total_memory_bytes".into());
159 row.add_cell("total_cpu_cores".into());
160 row.add_cell("started_at".into());
161 row.add_cell("actor_count".into());
162 row
163 });
164 for worker_node in worker_nodes {
165 let mut row = Row::new();
166 row.add_cell(worker_node.id.into());
167 try_add_cell(
168 &mut row,
169 worker_node
170 .host
171 .as_ref()
172 .map(|h| format!("{}:{}", h.host, h.port)),
173 );
174 try_add_cell(
175 &mut row,
176 worker_node.get_type().ok().map(|t| t.as_str_name()),
177 );
178 try_add_cell(
179 &mut row,
180 worker_node.get_state().ok().map(|s| s.as_str_name()),
181 );
182 try_add_cell(&mut row, worker_node.parallelism());
183 try_add_cell(
184 &mut row,
185 worker_node.property.as_ref().map(|p| p.is_streaming),
186 );
187 try_add_cell(
188 &mut row,
189 worker_node.property.as_ref().map(|p| p.is_serving),
190 );
191 try_add_cell(
192 &mut row,
193 worker_node.resource.as_ref().map(|r| r.rw_version.clone()),
194 );
195 try_add_cell(
196 &mut row,
197 worker_node.resource.as_ref().map(|r| r.total_memory_bytes),
198 );
199 try_add_cell(
200 &mut row,
201 worker_node.resource.as_ref().map(|r| r.total_cpu_cores),
202 );
203 try_add_cell(
204 &mut row,
205 worker_node
206 .started_at
207 .and_then(|ts| Timestamptz::from_secs(ts as _).map(|t| t.to_string())),
208 );
209 let actor_count = {
210 if let Ok(t) = worker_node.get_type()
211 && t != WorkerType::ComputeNode
212 {
213 None
214 } else {
215 match worker_actor_count.get(&(worker_node.id as _)) {
216 None => Some(0),
217 Some(c) => Some(*c),
218 }
219 }
220 };
221 try_add_cell(&mut row, actor_count);
222 table.add_row(row);
223 }
224 let _ = writeln!(s, "{table}");
225 }
226
227 #[cfg_attr(coverage, coverage(off))]
228 fn write_event_logs(&self, s: &mut String) {
229 let event_logs = self
230 .event_log_manager
231 .list_event_logs()
232 .into_iter()
233 .sorted_by(|a, b| {
234 a.timestamp
235 .unwrap_or(0)
236 .cmp(&b.timestamp.unwrap_or(0))
237 .reverse()
238 })
239 .collect_vec();
240
241 let _ = writeln!(s, "latest barrier completions");
242 Self::write_event_logs_impl(
243 s,
244 event_logs.iter(),
245 |e| {
246 let Event::BarrierComplete(info) = e else {
247 return None;
248 };
249 Some(json!(info).to_string())
250 },
251 Some(10),
252 );
253
254 let _ = writeln!(s);
255 let _ = writeln!(s, "latest barrier collection failures");
256 Self::write_event_logs_impl(
257 s,
258 event_logs.iter(),
259 |e| {
260 let Event::CollectBarrierFail(info) = e else {
261 return None;
262 };
263 Some(json!(info).to_string())
264 },
265 Some(3),
266 );
267
268 let _ = writeln!(s);
269 let _ = writeln!(s, "latest barrier injection failures");
270 Self::write_event_logs_impl(
271 s,
272 event_logs.iter(),
273 |e| {
274 let Event::InjectBarrierFail(info) = e else {
275 return None;
276 };
277 Some(json!(info).to_string())
278 },
279 Some(3),
280 );
281
282 let _ = writeln!(s);
283 let _ = writeln!(s, "latest worker node panics");
284 Self::write_event_logs_impl(
285 s,
286 event_logs.iter(),
287 |e| {
288 let Event::WorkerNodePanic(info) = e else {
289 return None;
290 };
291 Some(json!(info).to_string())
292 },
293 Some(10),
294 );
295
296 let _ = writeln!(s);
297 let _ = writeln!(s, "latest create stream job failures");
298 Self::write_event_logs_impl(
299 s,
300 event_logs.iter(),
301 |e| {
302 let Event::CreateStreamJobFail(info) = e else {
303 return None;
304 };
305 Some(json!(info).to_string())
306 },
307 Some(3),
308 );
309
310 let _ = writeln!(s);
311 let _ = writeln!(s, "latest dirty stream job clear-ups");
312 Self::write_event_logs_impl(
313 s,
314 event_logs.iter(),
315 |e| {
316 let Event::DirtyStreamJobClear(info) = e else {
317 return None;
318 };
319 Some(json!(info).to_string())
320 },
321 Some(3),
322 );
323 }
324
325 #[cfg_attr(coverage, coverage(off))]
326 fn write_event_logs_impl<'a, F>(
327 s: &mut String,
328 event_logs: impl Iterator<Item = &'a EventLog>,
329 get_event_info: F,
330 limit: Option<usize>,
331 ) where
332 F: Fn(&Event) -> Option<String>,
333 {
334 use comfy_table::{Row, Table};
335 let mut table = Table::new();
336 table.set_header({
337 let mut row = Row::new();
338 row.add_cell("created_at".into());
339 row.add_cell("info".into());
340 row
341 });
342 let mut row_count = 0;
343 for event_log in event_logs {
344 let Some(ref inner) = event_log.event else {
345 continue;
346 };
347 if let Some(limit) = limit
348 && row_count >= limit
349 {
350 break;
351 }
352 let mut row = Row::new();
353 let ts = event_log
354 .timestamp
355 .and_then(|ts| Timestamptz::from_millis(ts as _).map(|ts| ts.to_string()));
356 try_add_cell(&mut row, ts);
357 if let Some(info) = get_event_info(inner) {
358 row.add_cell(info.into());
359 row_count += 1;
360 } else {
361 continue;
362 }
363 table.add_row(row);
364 }
365 let _ = writeln!(s, "{table}");
366 }
367
368 #[cfg_attr(coverage, coverage(off))]
369 async fn write_storage(&self, s: &mut String) {
370 let mut sst_num = 0;
371 let mut sst_total_file_size = 0;
372 let back_pressured_compaction_groups = self
373 .hummock_manger
374 .write_limits()
375 .await
376 .into_iter()
377 .filter_map(|(k, v)| {
378 if v.table_ids.is_empty() {
379 None
380 } else {
381 Some(k)
382 }
383 })
384 .join(",");
385 if !back_pressured_compaction_groups.is_empty() {
386 let _ = writeln!(
387 s,
388 "back pressured compaction groups: {back_pressured_compaction_groups}"
389 );
390 }
391
392 #[derive(PartialEq, Eq)]
393 struct SstableSort {
394 compaction_group_id: u64,
395 sst_id: HummockSstableId,
396 delete_ratio: u64,
397 }
398 impl PartialOrd for SstableSort {
399 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
400 Some(self.cmp(other))
401 }
402 }
403 impl Ord for SstableSort {
404 fn cmp(&self, other: &Self) -> Ordering {
405 self.delete_ratio.cmp(&other.delete_ratio)
406 }
407 }
408 fn top_k_sstables(
409 top_k: usize,
410 heap: &mut BinaryHeap<Reverse<SstableSort>>,
411 e: SstableSort,
412 ) {
413 if heap.len() < top_k {
414 heap.push(Reverse(e));
415 } else if let Some(mut p) = heap.peek_mut() {
416 if e.delete_ratio > p.0.delete_ratio {
417 *p = Reverse(e);
418 }
419 }
420 }
421
422 let top_k = 10;
423 let mut top_tombstone_delete_sst = BinaryHeap::with_capacity(top_k);
424 let compaction_group_num = self
425 .hummock_manger
426 .on_current_version(|version| {
427 for compaction_group in version.levels.values() {
428 let mut visit_level = |level: &Level| {
429 sst_num += level.table_infos.len();
430 sst_total_file_size +=
431 level.table_infos.iter().map(|t| t.sst_size).sum::<u64>();
432 for sst in &level.table_infos {
433 if sst.total_key_count == 0 {
434 continue;
435 }
436 let tombstone_delete_ratio =
437 sst.stale_key_count * 10000 / sst.total_key_count;
438 let e = SstableSort {
439 compaction_group_id: compaction_group.group_id,
440 sst_id: sst.sst_id,
441 delete_ratio: tombstone_delete_ratio,
442 };
443 top_k_sstables(top_k, &mut top_tombstone_delete_sst, e);
444 }
445 };
446 let l0 = &compaction_group.l0;
447 for level in &l0.sub_levels {
449 visit_level(level);
450 }
451 for level in &compaction_group.levels {
452 visit_level(level);
453 }
454 }
455 version.levels.len()
456 })
457 .await;
458
459 let _ = writeln!(s, "number of SSTables: {sst_num}");
460 let _ = writeln!(s, "total size of SSTables (byte): {sst_total_file_size}");
461 let _ = writeln!(s, "number of compaction groups: {compaction_group_num}");
462 use comfy_table::{Row, Table};
463 fn format_table(heap: BinaryHeap<Reverse<SstableSort>>) -> Table {
464 let mut table = Table::new();
465 table.set_header({
466 let mut row = Row::new();
467 row.add_cell("compaction group id".into());
468 row.add_cell("sstable id".into());
469 row.add_cell("delete ratio".into());
470 row
471 });
472 for sst in &heap.into_sorted_vec() {
473 let mut row = Row::new();
474 row.add_cell(sst.0.compaction_group_id.into());
475 row.add_cell(sst.0.sst_id.into());
476 row.add_cell(format!("{:.2}%", sst.0.delete_ratio as f64 / 100.0).into());
477 table.add_row(row);
478 }
479 table
480 }
481 let _ = writeln!(s);
482 let _ = writeln!(s, "top tombstone delete ratio");
483 let _ = writeln!(s, "{}", format_table(top_tombstone_delete_sst));
484 let _ = writeln!(s);
485
486 let _ = writeln!(s);
487 self.write_storage_prometheus(s).await;
488 }
489
490 #[cfg_attr(coverage, coverage(off))]
491 async fn write_streaming_prometheus(&self, s: &mut String) {
492 let _ = writeln!(s, "top sources by throughput (rows/s)");
493 let query = format!(
494 "topk(3, sum(rate(stream_source_output_rows_counts{{{}}}[10m]))by (source_name))",
495 self.prometheus_selector
496 );
497 self.write_instant_vector_impl(s, &query, vec!["source_name"])
498 .await;
499
500 let _ = writeln!(s);
501 let _ = writeln!(s, "top materialized views by throughput (rows/s)");
502 let query = format!(
503 "topk(3, sum(rate(stream_mview_input_row_count{{{}}}[10m]))by (table_id))",
504 self.prometheus_selector
505 );
506 self.write_instant_vector_impl(s, &query, vec!["table_id"])
507 .await;
508
509 let _ = writeln!(s);
510 let _ = writeln!(s, "top join executor by matched rows");
511 let query = format!(
512 "topk(3, histogram_quantile(0.9, sum(rate(stream_join_matched_join_keys_bucket{{{}}}[10m])) by (le, fragment_id, table_id)))",
513 self.prometheus_selector
514 );
515 self.write_instant_vector_impl(s, &query, vec!["table_id", "fragment_id"])
516 .await;
517 }
518
519 #[cfg_attr(coverage, coverage(off))]
520 async fn write_storage_prometheus(&self, s: &mut String) {
521 let _ = writeln!(s, "top Hummock Get by duration (second)");
522 let query = format!(
523 "topk(3, histogram_quantile(0.9, sum(rate(state_store_get_duration_bucket{{{}}}[10m])) by (le, table_id)))",
524 self.prometheus_selector
525 );
526 self.write_instant_vector_impl(s, &query, vec!["table_id"])
527 .await;
528
529 let _ = writeln!(s);
530 let _ = writeln!(s, "top Hummock Iter Init by duration (second)");
531 let query = format!(
532 "topk(3, histogram_quantile(0.9, sum(rate(state_store_iter_init_duration_bucket{{{}}}[10m])) by (le, table_id)))",
533 self.prometheus_selector
534 );
535 self.write_instant_vector_impl(s, &query, vec!["table_id"])
536 .await;
537
538 let _ = writeln!(s);
539 let _ = writeln!(s, "top table commit flush by size (byte)");
540 let query = format!(
541 "topk(3, sum(rate(storage_commit_write_throughput{{{}}}[10m])) by (table_id))",
542 self.prometheus_selector
543 );
544 self.write_instant_vector_impl(s, &query, vec!["table_id"])
545 .await;
546
547 let _ = writeln!(s);
548 let _ = writeln!(s, "object store read throughput (bytes/s)");
549 let query = format!(
550 "sum(rate(object_store_read_bytes{{{}}}[10m])) by (job, instance)",
551 merge_prometheus_selector([&self.prometheus_selector, "job=~\"compute|compactor\""])
552 );
553 self.write_instant_vector_impl(s, &query, vec!["job", "instance"])
554 .await;
555
556 let _ = writeln!(s);
557 let _ = writeln!(s, "object store write throughput (bytes/s)");
558 let query = format!(
559 "sum(rate(object_store_write_bytes{{{}}}[10m])) by (job, instance)",
560 merge_prometheus_selector([&self.prometheus_selector, "job=~\"compute|compactor\""])
561 );
562 self.write_instant_vector_impl(s, &query, vec!["job", "instance"])
563 .await;
564
565 let _ = writeln!(s);
566 let _ = writeln!(s, "object store operation rate");
567 let query = format!(
568 "sum(rate(object_store_operation_latency_count{{{}}}[10m])) by (le, type, job, instance)",
569 merge_prometheus_selector([
570 &self.prometheus_selector,
571 "job=~\"compute|compactor\", type!~\"streaming_upload_write_bytes|streaming_read_read_bytes|streaming_read\""
572 ])
573 );
574 self.write_instant_vector_impl(s, &query, vec!["type", "job", "instance"])
575 .await;
576
577 let _ = writeln!(s);
578 let _ = writeln!(s, "object store operation duration (second)");
579 let query = format!(
580 "histogram_quantile(0.9, sum(rate(object_store_operation_latency_bucket{{{}}}[10m])) by (le, type, job, instance))",
581 merge_prometheus_selector([
582 &self.prometheus_selector,
583 "job=~\"compute|compactor\", type!~\"streaming_upload_write_bytes|streaming_read\""
584 ])
585 );
586 self.write_instant_vector_impl(s, &query, vec!["type", "job", "instance"])
587 .await;
588 }
589
590 #[cfg_attr(coverage, coverage(off))]
591 async fn write_instant_vector_impl(&self, s: &mut String, query: &str, labels: Vec<&str>) {
592 let Some(ref client) = self.prometheus_client else {
593 return;
594 };
595 if let Ok(Vector(instant_vec)) = client
596 .query(query)
597 .get()
598 .await
599 .map(|result| result.into_inner().0)
600 {
601 for i in instant_vec {
602 let l = labels
603 .iter()
604 .map(|label| {
605 format!(
606 "{}={}",
607 *label,
608 i.metric()
609 .get(*label)
610 .map(|s| s.as_str())
611 .unwrap_or_default()
612 )
613 })
614 .join(",");
615 let _ = writeln!(s, "{}: {:.3}", l, i.sample().value());
616 }
617 }
618 }
619
620 #[cfg_attr(coverage, coverage(off))]
621 async fn write_await_tree(&self, s: &mut String, actor_traces_format: ActorTracesFormat) {
622 let all = dump_cluster_await_tree(
623 &self.metadata_manager,
624 &self.await_tree_reg,
625 actor_traces_format,
626 )
627 .await;
628
629 if let Ok(all) = all {
630 write!(s, "{}", all.output()).unwrap();
631 } else {
632 tracing::warn!("failed to dump await tree");
633 }
634 }
635
636 async fn write_table_definition(&self, s: &mut String) -> MetaResult<()> {
637 let sources = self
638 .metadata_manager
639 .catalog_controller
640 .list_sources()
641 .await?
642 .into_iter()
643 .map(|s| (s.id, (s.name, s.schema_id, s.definition)))
644 .collect::<BTreeMap<_, _>>();
645 let tables = self
646 .metadata_manager
647 .catalog_controller
648 .list_tables_by_type(TableType::Table)
649 .await?
650 .into_iter()
651 .map(|t| (t.id, (t.name, t.schema_id, t.definition)))
652 .collect::<BTreeMap<_, _>>();
653 let mvs = self
654 .metadata_manager
655 .catalog_controller
656 .list_tables_by_type(TableType::MaterializedView)
657 .await?
658 .into_iter()
659 .map(|t| (t.id, (t.name, t.schema_id, t.definition)))
660 .collect::<BTreeMap<_, _>>();
661 let indexes = self
662 .metadata_manager
663 .catalog_controller
664 .list_tables_by_type(TableType::Index)
665 .await?
666 .into_iter()
667 .map(|t| (t.id, (t.name, t.schema_id, t.definition)))
668 .collect::<BTreeMap<_, _>>();
669 let sinks = self
670 .metadata_manager
671 .catalog_controller
672 .list_sinks()
673 .await?
674 .into_iter()
675 .map(|s| (s.id, (s.name, s.schema_id, s.definition)))
676 .collect::<BTreeMap<_, _>>();
677 let catalogs = [
678 ("SOURCE", sources),
679 ("TABLE", tables),
680 ("MATERIALIZED VIEW", mvs),
681 ("INDEX", indexes),
682 ("SINK", sinks),
683 ];
684 let mut obj_id_to_name = HashMap::new();
685 for (title, items) in catalogs {
686 use comfy_table::{Row, Table};
687 let mut table = Table::new();
688 table.set_header({
689 let mut row = Row::new();
690 row.add_cell("id".into());
691 row.add_cell("name".into());
692 row.add_cell("schema_id".into());
693 row.add_cell("definition".into());
694 row
695 });
696 for (id, (name, schema_id, definition)) in items {
697 obj_id_to_name.insert(id, name.clone());
698 let mut row = Row::new();
699 let may_redact = redact_sql(&definition, self.redact_sql_option_keywords.clone())
700 .unwrap_or_else(|| "[REDACTED]".into());
701 row.add_cell(id.into());
702 row.add_cell(name.into());
703 row.add_cell(schema_id.into());
704 row.add_cell(may_redact.into());
705 table.add_row(row);
706 }
707 let _ = writeln!(s);
708 let _ = writeln!(s, "{title}");
709 let _ = writeln!(s, "{table}");
710 }
711
712 let actors = self
713 .metadata_manager
714 .catalog_controller
715 .list_actor_info()
716 .await?
717 .into_iter()
718 .map(|(actor_id, fragment_id, job_id, schema_id, obj_type)| {
719 (
720 actor_id,
721 (
722 fragment_id,
723 job_id,
724 schema_id,
725 obj_type,
726 obj_id_to_name
727 .get(&(job_id as _))
728 .cloned()
729 .unwrap_or_default(),
730 ),
731 )
732 })
733 .collect::<BTreeMap<_, _>>();
734
735 use comfy_table::{Row, Table};
736 let mut table = Table::new();
737 table.set_header({
738 let mut row = Row::new();
739 row.add_cell("id".into());
740 row.add_cell("fragment_id".into());
741 row.add_cell("job_id".into());
742 row.add_cell("schema_id".into());
743 row.add_cell("type".into());
744 row.add_cell("name".into());
745 row
746 });
747 for (actor_id, (fragment_id, job_id, schema_id, ddl_type, name)) in actors {
748 let mut row = Row::new();
749 row.add_cell(actor_id.into());
750 row.add_cell(fragment_id.into());
751 row.add_cell(job_id.into());
752 row.add_cell(schema_id.into());
753 row.add_cell(ddl_type.as_str().into());
754 row.add_cell(name.into());
755 table.add_row(row);
756 }
757 let _ = writeln!(s);
758 let _ = writeln!(s, "ACTOR");
759 let _ = writeln!(s, "{table}");
760 Ok(())
761 }
762}
763
764#[cfg_attr(coverage, coverage(off))]
765fn try_add_cell<T: Into<comfy_table::Cell>>(row: &mut comfy_table::Row, t: Option<T>) {
766 match t {
767 Some(t) => {
768 row.add_cell(t.into());
769 }
770 None => {
771 row.add_cell("".into());
772 }
773 }
774}
775
776#[cfg_attr(coverage, coverage(off))]
777fn merge_prometheus_selector<'a>(selectors: impl IntoIterator<Item = &'a str>) -> String {
778 selectors.into_iter().filter(|s| !s.is_empty()).join(",")
779}
780
781fn redact_sql(sql: &str, keywords: RedactSqlOptionKeywordsRef) -> Option<String> {
782 match Parser::parse_sql(sql) {
783 Ok(sqls) => Some(
784 sqls.into_iter()
785 .map(|sql| sql.to_redacted_string(keywords.clone()))
786 .join(";"),
787 ),
788 Err(_) => None,
789 }
790}