1use std::cmp::{Ordering, Reverse};
16use std::collections::{BTreeMap, BinaryHeap, HashMap};
17use std::fmt::Write;
18use std::sync::Arc;
19
20use itertools::Itertools;
21use prometheus_http_query::response::Data::Vector;
22use risingwave_common::types::Timestamptz;
23use risingwave_common::util::StackTraceResponseExt;
24use risingwave_hummock_sdk::HummockSstableId;
25use risingwave_hummock_sdk::level::Level;
26use risingwave_meta_model::table::TableType;
27use risingwave_pb::common::WorkerType;
28use risingwave_pb::meta::EventLog;
29use risingwave_pb::meta::event_log::Event;
30use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
31use risingwave_sqlparser::ast::RedactSqlOptionKeywordsRef;
32use risingwave_sqlparser::parser::Parser;
33use serde_json::json;
34use thiserror_ext::AsReport;
35
36use crate::MetaResult;
37use crate::hummock::HummockManagerRef;
38use crate::manager::MetadataManager;
39use crate::manager::event_log::EventLogManagerRef;
40use crate::rpc::await_tree::dump_cluster_await_tree;
41
42pub type DiagnoseCommandRef = Arc<DiagnoseCommand>;
43
44pub struct DiagnoseCommand {
45 metadata_manager: MetadataManager,
46 await_tree_reg: await_tree::Registry,
47 hummock_manger: HummockManagerRef,
48 event_log_manager: EventLogManagerRef,
49 prometheus_client: Option<prometheus_http_query::Client>,
50 prometheus_selector: String,
51 redact_sql_option_keywords: RedactSqlOptionKeywordsRef,
52}
53
54impl DiagnoseCommand {
55 pub fn new(
56 metadata_manager: MetadataManager,
57 await_tree_reg: await_tree::Registry,
58 hummock_manger: HummockManagerRef,
59 event_log_manager: EventLogManagerRef,
60 prometheus_client: Option<prometheus_http_query::Client>,
61 prometheus_selector: String,
62 redact_sql_option_keywords: RedactSqlOptionKeywordsRef,
63 ) -> Self {
64 Self {
65 metadata_manager,
66 await_tree_reg,
67 hummock_manger,
68 event_log_manager,
69 prometheus_client,
70 prometheus_selector,
71 redact_sql_option_keywords,
72 }
73 }
74
75 pub async fn report(&self, actor_traces_format: ActorTracesFormat) -> String {
76 let mut report = String::new();
77 let _ = writeln!(
78 report,
79 "report created at: {}\nversion: {}",
80 chrono::DateTime::<chrono::offset::Utc>::from(std::time::SystemTime::now()),
81 risingwave_common::current_cluster_version(),
82 );
83 let _ = writeln!(report);
84 self.write_catalog(&mut report).await;
85 let _ = writeln!(report);
86 self.write_worker_nodes(&mut report).await;
87 let _ = writeln!(report);
88 self.write_streaming_prometheus(&mut report).await;
89 let _ = writeln!(report);
90 self.write_storage(&mut report).await;
91 let _ = writeln!(report);
92 self.write_await_tree(&mut report, actor_traces_format)
93 .await;
94 let _ = writeln!(report);
95 self.write_event_logs(&mut report);
96 report
97 }
98
99 async fn write_catalog(&self, s: &mut String) {
100 self.write_catalog_inner(s).await;
101 let _ = self.write_table_definition(s).await.inspect_err(|e| {
102 tracing::warn!(
103 error = e.to_report_string(),
104 "failed to display table definition"
105 )
106 });
107 }
108
109 async fn write_catalog_inner(&self, s: &mut String) {
110 let guard = self
111 .metadata_manager
112 .catalog_controller
113 .get_inner_read_guard()
114 .await;
115 let stat = match guard.stats().await {
116 Ok(stat) => stat,
117 Err(err) => {
118 tracing::warn!(error=?err.as_report(), "failed to get catalog stats");
119 return;
120 }
121 };
122 let _ = writeln!(s, "number of fragment: {}", stat.streaming_job_num);
123 let _ = writeln!(s, "number of actor: {}", stat.actor_num);
124 let _ = writeln!(s, "number of source: {}", stat.source_num);
125 let _ = writeln!(s, "number of table: {}", stat.table_num);
126 let _ = writeln!(s, "number of materialized view: {}", stat.mview_num);
127 let _ = writeln!(s, "number of sink: {}", stat.sink_num);
128 let _ = writeln!(s, "number of index: {}", stat.index_num);
129 let _ = writeln!(s, "number of function: {}", stat.function_num);
130 }
131
132 async fn write_worker_nodes(&self, s: &mut String) {
133 let Ok(worker_actor_count) = self.metadata_manager.worker_actor_count().await else {
134 tracing::warn!("failed to get worker actor count");
135 return;
136 };
137
138 use comfy_table::{Row, Table};
139 let Ok(worker_nodes) = self.metadata_manager.list_worker_node(None, None).await else {
140 tracing::warn!("failed to get worker nodes");
141 return;
142 };
143 let mut table = Table::new();
144 table.set_header({
145 let mut row = Row::new();
146 row.add_cell("id".into());
147 row.add_cell("host".into());
148 row.add_cell("type".into());
149 row.add_cell("state".into());
150 row.add_cell("parallelism".into());
151 row.add_cell("is_streaming".into());
152 row.add_cell("is_serving".into());
153 row.add_cell("rw_version".into());
154 row.add_cell("total_memory_bytes".into());
155 row.add_cell("total_cpu_cores".into());
156 row.add_cell("started_at".into());
157 row.add_cell("actor_count".into());
158 row
159 });
160 for worker_node in worker_nodes {
161 let mut row = Row::new();
162 row.add_cell(worker_node.id.into());
163 try_add_cell(
164 &mut row,
165 worker_node
166 .host
167 .as_ref()
168 .map(|h| format!("{}:{}", h.host, h.port)),
169 );
170 try_add_cell(
171 &mut row,
172 worker_node.get_type().ok().map(|t| t.as_str_name()),
173 );
174 try_add_cell(
175 &mut row,
176 worker_node.get_state().ok().map(|s| s.as_str_name()),
177 );
178 try_add_cell(&mut row, worker_node.parallelism());
179 try_add_cell(
180 &mut row,
181 worker_node.property.as_ref().map(|p| p.is_streaming),
182 );
183 try_add_cell(
184 &mut row,
185 worker_node.property.as_ref().map(|p| p.is_serving),
186 );
187 try_add_cell(
188 &mut row,
189 worker_node.resource.as_ref().map(|r| r.rw_version.clone()),
190 );
191 try_add_cell(
192 &mut row,
193 worker_node.resource.as_ref().map(|r| r.total_memory_bytes),
194 );
195 try_add_cell(
196 &mut row,
197 worker_node.resource.as_ref().map(|r| r.total_cpu_cores),
198 );
199 try_add_cell(
200 &mut row,
201 worker_node
202 .started_at
203 .and_then(|ts| Timestamptz::from_secs(ts as _).map(|t| t.to_string())),
204 );
205 let actor_count = {
206 if let Ok(t) = worker_node.get_type()
207 && t != WorkerType::ComputeNode
208 {
209 None
210 } else {
211 match worker_actor_count.get(&(worker_node.id as _)) {
212 None => Some(0),
213 Some(c) => Some(*c),
214 }
215 }
216 };
217 try_add_cell(&mut row, actor_count);
218 table.add_row(row);
219 }
220 let _ = writeln!(s, "{table}");
221 }
222
223 fn write_event_logs(&self, s: &mut String) {
224 let event_logs = self
225 .event_log_manager
226 .list_event_logs()
227 .into_iter()
228 .sorted_by(|a, b| {
229 a.timestamp
230 .unwrap_or(0)
231 .cmp(&b.timestamp.unwrap_or(0))
232 .reverse()
233 })
234 .collect_vec();
235
236 let _ = writeln!(s, "latest barrier completions");
237 Self::write_event_logs_impl(
238 s,
239 event_logs.iter(),
240 |e| {
241 let Event::BarrierComplete(info) = e else {
242 return None;
243 };
244 Some(json!(info).to_string())
245 },
246 Some(10),
247 );
248
249 let _ = writeln!(s);
250 let _ = writeln!(s, "latest barrier collection failures");
251 Self::write_event_logs_impl(
252 s,
253 event_logs.iter(),
254 |e| {
255 let Event::CollectBarrierFail(info) = e else {
256 return None;
257 };
258 Some(json!(info).to_string())
259 },
260 Some(3),
261 );
262
263 let _ = writeln!(s);
264 let _ = writeln!(s, "latest barrier injection failures");
265 Self::write_event_logs_impl(
266 s,
267 event_logs.iter(),
268 |e| {
269 let Event::InjectBarrierFail(info) = e else {
270 return None;
271 };
272 Some(json!(info).to_string())
273 },
274 Some(3),
275 );
276
277 let _ = writeln!(s);
278 let _ = writeln!(s, "latest worker node panics");
279 Self::write_event_logs_impl(
280 s,
281 event_logs.iter(),
282 |e| {
283 let Event::WorkerNodePanic(info) = e else {
284 return None;
285 };
286 Some(json!(info).to_string())
287 },
288 Some(10),
289 );
290
291 let _ = writeln!(s);
292 let _ = writeln!(s, "latest create stream job failures");
293 Self::write_event_logs_impl(
294 s,
295 event_logs.iter(),
296 |e| {
297 let Event::CreateStreamJobFail(info) = e else {
298 return None;
299 };
300 Some(json!(info).to_string())
301 },
302 Some(3),
303 );
304
305 let _ = writeln!(s);
306 let _ = writeln!(s, "latest dirty stream job clear-ups");
307 Self::write_event_logs_impl(
308 s,
309 event_logs.iter(),
310 |e| {
311 let Event::DirtyStreamJobClear(info) = e else {
312 return None;
313 };
314 Some(json!(info).to_string())
315 },
316 Some(3),
317 );
318 }
319
320 fn write_event_logs_impl<'a, F>(
321 s: &mut String,
322 event_logs: impl Iterator<Item = &'a EventLog>,
323 get_event_info: F,
324 limit: Option<usize>,
325 ) where
326 F: Fn(&Event) -> Option<String>,
327 {
328 use comfy_table::{Row, Table};
329 let mut table = Table::new();
330 table.set_header({
331 let mut row = Row::new();
332 row.add_cell("created_at".into());
333 row.add_cell("info".into());
334 row
335 });
336 let mut row_count = 0;
337 for event_log in event_logs {
338 let Some(ref inner) = event_log.event else {
339 continue;
340 };
341 if let Some(limit) = limit
342 && row_count >= limit
343 {
344 break;
345 }
346 let mut row = Row::new();
347 let ts = event_log
348 .timestamp
349 .and_then(|ts| Timestamptz::from_millis(ts as _).map(|ts| ts.to_string()));
350 try_add_cell(&mut row, ts);
351 if let Some(info) = get_event_info(inner) {
352 row.add_cell(info.into());
353 row_count += 1;
354 } else {
355 continue;
356 }
357 table.add_row(row);
358 }
359 let _ = writeln!(s, "{table}");
360 }
361
362 async fn write_storage(&self, s: &mut String) {
363 let mut sst_num = 0;
364 let mut sst_total_file_size = 0;
365 let back_pressured_compaction_groups = self
366 .hummock_manger
367 .write_limits()
368 .await
369 .into_iter()
370 .filter_map(|(k, v)| {
371 if v.table_ids.is_empty() {
372 None
373 } else {
374 Some(k)
375 }
376 })
377 .join(",");
378 if !back_pressured_compaction_groups.is_empty() {
379 let _ = writeln!(
380 s,
381 "back pressured compaction groups: {back_pressured_compaction_groups}"
382 );
383 }
384
385 #[derive(PartialEq, Eq)]
386 struct SstableSort {
387 compaction_group_id: u64,
388 sst_id: HummockSstableId,
389 delete_ratio: u64,
390 }
391 impl PartialOrd for SstableSort {
392 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
393 Some(self.cmp(other))
394 }
395 }
396 impl Ord for SstableSort {
397 fn cmp(&self, other: &Self) -> Ordering {
398 self.delete_ratio.cmp(&other.delete_ratio)
399 }
400 }
401 fn top_k_sstables(
402 top_k: usize,
403 heap: &mut BinaryHeap<Reverse<SstableSort>>,
404 e: SstableSort,
405 ) {
406 if heap.len() < top_k {
407 heap.push(Reverse(e));
408 } else if let Some(mut p) = heap.peek_mut()
409 && e.delete_ratio > p.0.delete_ratio
410 {
411 *p = Reverse(e);
412 }
413 }
414
415 let top_k = 10;
416 let mut top_tombstone_delete_sst = BinaryHeap::with_capacity(top_k);
417 let compaction_group_num = self
418 .hummock_manger
419 .on_current_version(|version| {
420 for compaction_group in version.levels.values() {
421 let mut visit_level = |level: &Level| {
422 sst_num += level.table_infos.len();
423 sst_total_file_size +=
424 level.table_infos.iter().map(|t| t.sst_size).sum::<u64>();
425 for sst in &level.table_infos {
426 if sst.total_key_count == 0 {
427 continue;
428 }
429 let tombstone_delete_ratio =
430 sst.stale_key_count * 10000 / sst.total_key_count;
431 let e = SstableSort {
432 compaction_group_id: compaction_group.group_id,
433 sst_id: sst.sst_id,
434 delete_ratio: tombstone_delete_ratio,
435 };
436 top_k_sstables(top_k, &mut top_tombstone_delete_sst, e);
437 }
438 };
439 let l0 = &compaction_group.l0;
440 for level in &l0.sub_levels {
442 visit_level(level);
443 }
444 for level in &compaction_group.levels {
445 visit_level(level);
446 }
447 }
448 version.levels.len()
449 })
450 .await;
451
452 let _ = writeln!(s, "number of SSTables: {sst_num}");
453 let _ = writeln!(s, "total size of SSTables (byte): {sst_total_file_size}");
454 let _ = writeln!(s, "number of compaction groups: {compaction_group_num}");
455 use comfy_table::{Row, Table};
456 fn format_table(heap: BinaryHeap<Reverse<SstableSort>>) -> Table {
457 let mut table = Table::new();
458 table.set_header({
459 let mut row = Row::new();
460 row.add_cell("compaction group id".into());
461 row.add_cell("sstable id".into());
462 row.add_cell("delete ratio".into());
463 row
464 });
465 for sst in &heap.into_sorted_vec() {
466 let mut row = Row::new();
467 row.add_cell(sst.0.compaction_group_id.into());
468 row.add_cell(sst.0.sst_id.into());
469 row.add_cell(format!("{:.2}%", sst.0.delete_ratio as f64 / 100.0).into());
470 table.add_row(row);
471 }
472 table
473 }
474 let _ = writeln!(s);
475 let _ = writeln!(s, "top tombstone delete ratio");
476 let _ = writeln!(s, "{}", format_table(top_tombstone_delete_sst));
477 let _ = writeln!(s);
478
479 let _ = writeln!(s);
480 self.write_storage_prometheus(s).await;
481 }
482
483 async fn write_streaming_prometheus(&self, s: &mut String) {
484 let _ = writeln!(s, "top sources by throughput (rows/s)");
485 let query = format!(
486 "topk(3, sum(rate(stream_source_output_rows_counts{{{}}}[10m]))by (source_name))",
487 self.prometheus_selector
488 );
489 self.write_instant_vector_impl(s, &query, vec!["source_name"])
490 .await;
491
492 let _ = writeln!(s);
493 let _ = writeln!(s, "top materialized views by throughput (rows/s)");
494 let query = format!(
495 "topk(3, sum(rate(stream_mview_input_row_count{{{}}}[10m]))by (table_id))",
496 self.prometheus_selector
497 );
498 self.write_instant_vector_impl(s, &query, vec!["table_id"])
499 .await;
500
501 let _ = writeln!(s);
502 let _ = writeln!(s, "top join executor by matched rows");
503 let query = format!(
504 "topk(3, histogram_quantile(0.9, sum(rate(stream_join_matched_join_keys_bucket{{{}}}[10m])) by (le, fragment_id, table_id)))",
505 self.prometheus_selector
506 );
507 self.write_instant_vector_impl(s, &query, vec!["table_id", "fragment_id"])
508 .await;
509 }
510
511 async fn write_storage_prometheus(&self, s: &mut String) {
512 let _ = writeln!(s, "top Hummock Get by duration (second)");
513 let query = format!(
514 "topk(3, histogram_quantile(0.9, sum(rate(state_store_get_duration_bucket{{{}}}[10m])) by (le, table_id)))",
515 self.prometheus_selector
516 );
517 self.write_instant_vector_impl(s, &query, vec!["table_id"])
518 .await;
519
520 let _ = writeln!(s);
521 let _ = writeln!(s, "top Hummock Iter Init by duration (second)");
522 let query = format!(
523 "topk(3, histogram_quantile(0.9, sum(rate(state_store_iter_init_duration_bucket{{{}}}[10m])) by (le, table_id)))",
524 self.prometheus_selector
525 );
526 self.write_instant_vector_impl(s, &query, vec!["table_id"])
527 .await;
528
529 let _ = writeln!(s);
530 let _ = writeln!(s, "top table commit flush by size (byte)");
531 let query = format!(
532 "topk(3, sum(rate(storage_commit_write_throughput{{{}}}[10m])) by (table_id))",
533 self.prometheus_selector
534 );
535 self.write_instant_vector_impl(s, &query, vec!["table_id"])
536 .await;
537
538 let _ = writeln!(s);
539 let _ = writeln!(s, "object store read throughput (bytes/s)");
540 let query = format!(
541 "sum(rate(object_store_read_bytes{{{}}}[10m])) by (job, instance)",
542 merge_prometheus_selector([&self.prometheus_selector, "job=~\"compute|compactor\""])
543 );
544 self.write_instant_vector_impl(s, &query, vec!["job", "instance"])
545 .await;
546
547 let _ = writeln!(s);
548 let _ = writeln!(s, "object store write throughput (bytes/s)");
549 let query = format!(
550 "sum(rate(object_store_write_bytes{{{}}}[10m])) by (job, instance)",
551 merge_prometheus_selector([&self.prometheus_selector, "job=~\"compute|compactor\""])
552 );
553 self.write_instant_vector_impl(s, &query, vec!["job", "instance"])
554 .await;
555
556 let _ = writeln!(s);
557 let _ = writeln!(s, "object store operation rate");
558 let query = format!(
559 "sum(rate(object_store_operation_latency_count{{{}}}[10m])) by (le, type, job, instance)",
560 merge_prometheus_selector([
561 &self.prometheus_selector,
562 "job=~\"compute|compactor\", type!~\"streaming_upload_write_bytes|streaming_read_read_bytes|streaming_read\""
563 ])
564 );
565 self.write_instant_vector_impl(s, &query, vec!["type", "job", "instance"])
566 .await;
567
568 let _ = writeln!(s);
569 let _ = writeln!(s, "object store operation duration (second)");
570 let query = format!(
571 "histogram_quantile(0.9, sum(rate(object_store_operation_latency_bucket{{{}}}[10m])) by (le, type, job, instance))",
572 merge_prometheus_selector([
573 &self.prometheus_selector,
574 "job=~\"compute|compactor\", type!~\"streaming_upload_write_bytes|streaming_read\""
575 ])
576 );
577 self.write_instant_vector_impl(s, &query, vec!["type", "job", "instance"])
578 .await;
579 }
580
581 async fn write_instant_vector_impl(&self, s: &mut String, query: &str, labels: Vec<&str>) {
582 let Some(ref client) = self.prometheus_client else {
583 return;
584 };
585 if let Ok(Vector(instant_vec)) = client
586 .query(query)
587 .get()
588 .await
589 .map(|result| result.into_inner().0)
590 {
591 for i in instant_vec {
592 let l = labels
593 .iter()
594 .map(|label| {
595 format!(
596 "{}={}",
597 *label,
598 i.metric()
599 .get(*label)
600 .map(|s| s.as_str())
601 .unwrap_or_default()
602 )
603 })
604 .join(",");
605 let _ = writeln!(s, "{}: {:.3}", l, i.sample().value());
606 }
607 }
608 }
609
610 async fn write_await_tree(&self, s: &mut String, actor_traces_format: ActorTracesFormat) {
611 let all = dump_cluster_await_tree(
612 &self.metadata_manager,
613 &self.await_tree_reg,
614 actor_traces_format,
615 )
616 .await;
617
618 if let Ok(all) = all {
619 write!(s, "{}", all.output()).unwrap();
620 } else {
621 tracing::warn!("failed to dump await tree");
622 }
623 }
624
625 async fn write_table_definition(&self, s: &mut String) -> MetaResult<()> {
626 let sources = self
627 .metadata_manager
628 .catalog_controller
629 .list_sources()
630 .await?
631 .into_iter()
632 .map(|s| (s.id, (s.name, s.schema_id, s.definition)))
633 .collect::<BTreeMap<_, _>>();
634 let tables = self
635 .metadata_manager
636 .catalog_controller
637 .list_tables_by_type(TableType::Table)
638 .await?
639 .into_iter()
640 .map(|t| (t.id, (t.name, t.schema_id, t.definition)))
641 .collect::<BTreeMap<_, _>>();
642 let mvs = self
643 .metadata_manager
644 .catalog_controller
645 .list_tables_by_type(TableType::MaterializedView)
646 .await?
647 .into_iter()
648 .map(|t| (t.id, (t.name, t.schema_id, t.definition)))
649 .collect::<BTreeMap<_, _>>();
650 let indexes = self
651 .metadata_manager
652 .catalog_controller
653 .list_tables_by_type(TableType::Index)
654 .await?
655 .into_iter()
656 .map(|t| (t.id, (t.name, t.schema_id, t.definition)))
657 .collect::<BTreeMap<_, _>>();
658 let sinks = self
659 .metadata_manager
660 .catalog_controller
661 .list_sinks()
662 .await?
663 .into_iter()
664 .map(|s| (s.id, (s.name, s.schema_id, s.definition)))
665 .collect::<BTreeMap<_, _>>();
666 let catalogs = [
667 ("SOURCE", sources),
668 ("TABLE", tables),
669 ("MATERIALIZED VIEW", mvs),
670 ("INDEX", indexes),
671 ("SINK", sinks),
672 ];
673 let mut obj_id_to_name = HashMap::new();
674 for (title, items) in catalogs {
675 use comfy_table::{Row, Table};
676 let mut table = Table::new();
677 table.set_header({
678 let mut row = Row::new();
679 row.add_cell("id".into());
680 row.add_cell("name".into());
681 row.add_cell("schema_id".into());
682 row.add_cell("definition".into());
683 row
684 });
685 for (id, (name, schema_id, definition)) in items {
686 obj_id_to_name.insert(id, name.clone());
687 let mut row = Row::new();
688 let may_redact = redact_sql(&definition, self.redact_sql_option_keywords.clone())
689 .unwrap_or_else(|| "[REDACTED]".into());
690 row.add_cell(id.into());
691 row.add_cell(name.into());
692 row.add_cell(schema_id.into());
693 row.add_cell(may_redact.into());
694 table.add_row(row);
695 }
696 let _ = writeln!(s);
697 let _ = writeln!(s, "{title}");
698 let _ = writeln!(s, "{table}");
699 }
700
701 let actors = self
702 .metadata_manager
703 .catalog_controller
704 .list_actor_info()
705 .await?
706 .into_iter()
707 .map(|(actor_id, fragment_id, job_id, schema_id, obj_type)| {
708 (
709 actor_id,
710 (
711 fragment_id,
712 job_id,
713 schema_id,
714 obj_type,
715 obj_id_to_name
716 .get(&(job_id as _))
717 .cloned()
718 .unwrap_or_default(),
719 ),
720 )
721 })
722 .collect::<BTreeMap<_, _>>();
723
724 use comfy_table::{Row, Table};
725 let mut table = Table::new();
726 table.set_header({
727 let mut row = Row::new();
728 row.add_cell("id".into());
729 row.add_cell("fragment_id".into());
730 row.add_cell("job_id".into());
731 row.add_cell("schema_id".into());
732 row.add_cell("type".into());
733 row.add_cell("name".into());
734 row
735 });
736 for (actor_id, (fragment_id, job_id, schema_id, ddl_type, name)) in actors {
737 let mut row = Row::new();
738 row.add_cell(actor_id.into());
739 row.add_cell(fragment_id.into());
740 row.add_cell(job_id.into());
741 row.add_cell(schema_id.into());
742 row.add_cell(ddl_type.as_str().into());
743 row.add_cell(name.into());
744 table.add_row(row);
745 }
746 let _ = writeln!(s);
747 let _ = writeln!(s, "ACTOR");
748 let _ = writeln!(s, "{table}");
749 Ok(())
750 }
751}
752
753fn try_add_cell<T: Into<comfy_table::Cell>>(row: &mut comfy_table::Row, t: Option<T>) {
754 match t {
755 Some(t) => {
756 row.add_cell(t.into());
757 }
758 None => {
759 row.add_cell("".into());
760 }
761 }
762}
763
764fn merge_prometheus_selector<'a>(selectors: impl IntoIterator<Item = &'a str>) -> String {
765 selectors.into_iter().filter(|s| !s.is_empty()).join(",")
766}
767
768fn redact_sql(sql: &str, keywords: RedactSqlOptionKeywordsRef) -> Option<String> {
769 match Parser::parse_sql(sql) {
770 Ok(sqls) => Some(
771 sqls.into_iter()
772 .map(|sql| sql.to_redacted_string(keywords.clone()))
773 .join(";"),
774 ),
775 Err(_) => None,
776 }
777}