1use std::cmp::{Ordering, Reverse};
16use std::collections::{BTreeMap, BinaryHeap, HashMap};
17use std::fmt::Write;
18use std::sync::Arc;
19
20use itertools::Itertools;
21use prometheus_http_query::response::Data::Vector;
22use risingwave_common::id::ObjectId;
23use risingwave_common::types::Timestamptz;
24use risingwave_common::util::StackTraceResponseExt;
25use risingwave_common::util::epoch::Epoch;
26use risingwave_hummock_sdk::HummockSstableId;
27use risingwave_hummock_sdk::level::Level;
28use risingwave_pb::catalog::table::PbTableType;
29use risingwave_pb::common::WorkerType;
30use risingwave_pb::meta::EventLog;
31use risingwave_pb::meta::event_log::Event;
32use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
33use risingwave_sqlparser::ast::RedactSqlOptionKeywordsRef;
34use risingwave_sqlparser::parser::Parser;
35use serde_json::json;
36use thiserror_ext::AsReport;
37
38use crate::MetaResult;
39use crate::hummock::HummockManagerRef;
40use crate::manager::MetadataManager;
41use crate::manager::event_log::EventLogManagerRef;
42use crate::rpc::await_tree::dump_cluster_await_tree;
43
44pub type DiagnoseCommandRef = Arc<DiagnoseCommand>;
45
46pub struct DiagnoseCommand {
47 metadata_manager: MetadataManager,
48 await_tree_reg: await_tree::Registry,
49 hummock_manger: HummockManagerRef,
50 event_log_manager: EventLogManagerRef,
51 prometheus_client: Option<prometheus_http_query::Client>,
52 prometheus_selector: String,
53 redact_sql_option_keywords: RedactSqlOptionKeywordsRef,
54}
55
56impl DiagnoseCommand {
57 pub fn new(
58 metadata_manager: MetadataManager,
59 await_tree_reg: await_tree::Registry,
60 hummock_manger: HummockManagerRef,
61 event_log_manager: EventLogManagerRef,
62 prometheus_client: Option<prometheus_http_query::Client>,
63 prometheus_selector: String,
64 redact_sql_option_keywords: RedactSqlOptionKeywordsRef,
65 ) -> Self {
66 Self {
67 metadata_manager,
68 await_tree_reg,
69 hummock_manger,
70 event_log_manager,
71 prometheus_client,
72 prometheus_selector,
73 redact_sql_option_keywords,
74 }
75 }
76
77 pub async fn report(&self, actor_traces_format: ActorTracesFormat) -> String {
78 let mut report = String::new();
79 let _ = writeln!(
80 report,
81 "report created at: {}\nversion: {}",
82 chrono::DateTime::<chrono::offset::Utc>::from(std::time::SystemTime::now()),
83 risingwave_common::current_cluster_version(),
84 );
85 let _ = writeln!(report);
86 self.write_catalog(&mut report).await;
87 let _ = writeln!(report);
88 self.write_worker_nodes(&mut report).await;
89 let _ = writeln!(report);
90 self.write_streaming_prometheus(&mut report).await;
91 let _ = writeln!(report);
92 self.write_storage(&mut report).await;
93 let _ = writeln!(report);
94 self.write_await_tree(&mut report, actor_traces_format)
95 .await;
96 let _ = writeln!(report);
97 self.write_event_logs(&mut report);
98 report
99 }
100
101 async fn write_catalog(&self, s: &mut String) {
102 self.write_catalog_inner(s).await;
103 let _ = self.write_table_definition(s).await.inspect_err(|e| {
104 tracing::warn!(
105 error = e.to_report_string(),
106 "failed to display table definition"
107 )
108 });
109 }
110
111 async fn write_catalog_inner(&self, s: &mut String) {
112 let stats = self.metadata_manager.catalog_controller.stats().await;
113
114 let stat = match stats {
115 Ok(stat) => stat,
116 Err(err) => {
117 tracing::warn!(error=?err.as_report(), "failed to get catalog stats");
118 return;
119 }
120 };
121 let _ = writeln!(s, "number of fragment: {}", stat.streaming_job_num);
122 let _ = writeln!(s, "number of actor: {}", stat.actor_num);
123 let _ = writeln!(s, "number of source: {}", stat.source_num);
124 let _ = writeln!(s, "number of table: {}", stat.table_num);
125 let _ = writeln!(s, "number of materialized view: {}", stat.mview_num);
126 let _ = writeln!(s, "number of sink: {}", stat.sink_num);
127 let _ = writeln!(s, "number of index: {}", stat.index_num);
128 let _ = writeln!(s, "number of function: {}", stat.function_num);
129 }
130
131 async fn write_worker_nodes(&self, s: &mut String) {
132 let Ok(worker_actor_count) = self.metadata_manager.worker_actor_count() else {
133 tracing::warn!("failed to get worker actor count");
134 return;
135 };
136
137 use comfy_table::{Row, Table};
138 let Ok(worker_nodes) = self.metadata_manager.list_worker_node(None, None).await else {
139 tracing::warn!("failed to get worker nodes");
140 return;
141 };
142 let mut table = Table::new();
143 table.set_header({
144 let mut row = Row::new();
145 row.add_cell("id".into());
146 row.add_cell("host".into());
147 row.add_cell("type".into());
148 row.add_cell("state".into());
149 row.add_cell("parallelism".into());
150 row.add_cell("is_streaming".into());
151 row.add_cell("is_serving".into());
152 row.add_cell("rw_version".into());
153 row.add_cell("total_memory_bytes".into());
154 row.add_cell("total_cpu_cores".into());
155 row.add_cell("started_at".into());
156 row.add_cell("actor_count".into());
157 row
158 });
159 for worker_node in worker_nodes {
160 let mut row = Row::new();
161 row.add_cell(worker_node.id.into());
162 try_add_cell(
163 &mut row,
164 worker_node
165 .host
166 .as_ref()
167 .map(|h| format!("{}:{}", h.host, h.port)),
168 );
169 try_add_cell(
170 &mut row,
171 worker_node.get_type().ok().map(|t| t.as_str_name()),
172 );
173 try_add_cell(
174 &mut row,
175 worker_node.get_state().ok().map(|s| s.as_str_name()),
176 );
177 try_add_cell(&mut row, worker_node.parallelism());
178 try_add_cell(
179 &mut row,
180 worker_node.property.as_ref().map(|p| p.is_streaming),
181 );
182 try_add_cell(
183 &mut row,
184 worker_node.property.as_ref().map(|p| p.is_serving),
185 );
186 try_add_cell(
187 &mut row,
188 worker_node.resource.as_ref().map(|r| r.rw_version.clone()),
189 );
190 try_add_cell(
191 &mut row,
192 worker_node.resource.as_ref().map(|r| r.total_memory_bytes),
193 );
194 try_add_cell(
195 &mut row,
196 worker_node.resource.as_ref().map(|r| r.total_cpu_cores),
197 );
198 try_add_cell(
199 &mut row,
200 worker_node
201 .started_at
202 .and_then(|ts| Timestamptz::from_secs(ts as _).map(|t| t.to_string())),
203 );
204 let actor_count = {
205 if let Ok(t) = worker_node.get_type()
206 && t != WorkerType::ComputeNode
207 {
208 None
209 } else {
210 match worker_actor_count.get(&worker_node.id) {
211 None => Some(0),
212 Some(c) => Some(*c),
213 }
214 }
215 };
216 try_add_cell(&mut row, actor_count);
217 table.add_row(row);
218 }
219 let _ = writeln!(s, "{table}");
220 }
221
222 fn write_event_logs(&self, s: &mut String) {
223 let event_logs = self
224 .event_log_manager
225 .list_event_logs()
226 .into_iter()
227 .sorted_by(|a, b| {
228 a.timestamp
229 .unwrap_or(0)
230 .cmp(&b.timestamp.unwrap_or(0))
231 .reverse()
232 })
233 .collect_vec();
234
235 let _ = writeln!(s, "latest barrier completions");
236 Self::write_event_logs_impl(
237 s,
238 event_logs.iter(),
239 |e| {
240 let Event::BarrierComplete(info) = e else {
241 return None;
242 };
243 Some(json!(info).to_string())
244 },
245 Some(10),
246 );
247
248 let _ = writeln!(s);
249 let _ = writeln!(s, "latest barrier collection failures");
250 Self::write_event_logs_impl(
251 s,
252 event_logs.iter(),
253 |e| {
254 let Event::CollectBarrierFail(info) = e else {
255 return None;
256 };
257 Some(json!(info).to_string())
258 },
259 Some(3),
260 );
261
262 let _ = writeln!(s);
263 let _ = writeln!(s, "latest barrier injection failures");
264 Self::write_event_logs_impl(
265 s,
266 event_logs.iter(),
267 |e| {
268 let Event::InjectBarrierFail(info) = e else {
269 return None;
270 };
271 Some(json!(info).to_string())
272 },
273 Some(3),
274 );
275
276 let _ = writeln!(s);
277 let _ = writeln!(s, "latest worker node panics");
278 Self::write_event_logs_impl(
279 s,
280 event_logs.iter(),
281 |e| {
282 let Event::WorkerNodePanic(info) = e else {
283 return None;
284 };
285 Some(json!(info).to_string())
286 },
287 Some(10),
288 );
289
290 let _ = writeln!(s);
291 let _ = writeln!(s, "latest create stream job failures");
292 Self::write_event_logs_impl(
293 s,
294 event_logs.iter(),
295 |e| {
296 let Event::CreateStreamJobFail(info) = e else {
297 return None;
298 };
299 Some(json!(info).to_string())
300 },
301 Some(3),
302 );
303
304 let _ = writeln!(s);
305 let _ = writeln!(s, "latest dirty stream job clear-ups");
306 Self::write_event_logs_impl(
307 s,
308 event_logs.iter(),
309 |e| {
310 let Event::DirtyStreamJobClear(info) = e else {
311 return None;
312 };
313 Some(json!(info).to_string())
314 },
315 Some(3),
316 );
317 }
318
319 fn write_event_logs_impl<'a, F>(
320 s: &mut String,
321 event_logs: impl Iterator<Item = &'a EventLog>,
322 get_event_info: F,
323 limit: Option<usize>,
324 ) where
325 F: Fn(&Event) -> Option<String>,
326 {
327 use comfy_table::{Row, Table};
328 let mut table = Table::new();
329 table.set_header({
330 let mut row = Row::new();
331 row.add_cell("created_at".into());
332 row.add_cell("info".into());
333 row
334 });
335 let mut row_count = 0;
336 for event_log in event_logs {
337 let Some(ref inner) = event_log.event else {
338 continue;
339 };
340 if let Some(limit) = limit
341 && row_count >= limit
342 {
343 break;
344 }
345 let mut row = Row::new();
346 let ts = event_log
347 .timestamp
348 .and_then(|ts| Timestamptz::from_millis(ts as _).map(|ts| ts.to_string()));
349 try_add_cell(&mut row, ts);
350 if let Some(info) = get_event_info(inner) {
351 row.add_cell(info.into());
352 row_count += 1;
353 } else {
354 continue;
355 }
356 table.add_row(row);
357 }
358 let _ = writeln!(s, "{table}");
359 }
360
361 async fn write_storage(&self, s: &mut String) {
362 let mut sst_num = 0;
363 let mut sst_total_file_size = 0;
364 let back_pressured_compaction_groups = self
365 .hummock_manger
366 .write_limits()
367 .await
368 .into_iter()
369 .filter_map(|(k, v)| {
370 if v.table_ids.is_empty() {
371 None
372 } else {
373 Some(k)
374 }
375 })
376 .join(",");
377 if !back_pressured_compaction_groups.is_empty() {
378 let _ = writeln!(
379 s,
380 "back pressured compaction groups: {back_pressured_compaction_groups}"
381 );
382 }
383
384 #[derive(PartialEq, Eq)]
385 struct SstableSort {
386 compaction_group_id: u64,
387 sst_id: HummockSstableId,
388 delete_ratio: u64,
389 }
390 impl PartialOrd for SstableSort {
391 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
392 Some(self.cmp(other))
393 }
394 }
395 impl Ord for SstableSort {
396 fn cmp(&self, other: &Self) -> Ordering {
397 self.delete_ratio.cmp(&other.delete_ratio)
398 }
399 }
400 fn top_k_sstables(
401 top_k: usize,
402 heap: &mut BinaryHeap<Reverse<SstableSort>>,
403 e: SstableSort,
404 ) {
405 if heap.len() < top_k {
406 heap.push(Reverse(e));
407 } else if let Some(mut p) = heap.peek_mut()
408 && e.delete_ratio > p.0.delete_ratio
409 {
410 *p = Reverse(e);
411 }
412 }
413
414 let top_k = 10;
415 let mut top_tombstone_delete_sst = BinaryHeap::with_capacity(top_k);
416 let compaction_group_num = self
417 .hummock_manger
418 .on_current_version(|version| {
419 for compaction_group in version.levels.values() {
420 let mut visit_level = |level: &Level| {
421 sst_num += level.table_infos.len();
422 sst_total_file_size +=
423 level.table_infos.iter().map(|t| t.sst_size).sum::<u64>();
424 for sst in &level.table_infos {
425 if sst.total_key_count == 0 {
426 continue;
427 }
428 let tombstone_delete_ratio =
429 sst.stale_key_count * 10000 / sst.total_key_count;
430 let e = SstableSort {
431 compaction_group_id: compaction_group.group_id,
432 sst_id: sst.sst_id,
433 delete_ratio: tombstone_delete_ratio,
434 };
435 top_k_sstables(top_k, &mut top_tombstone_delete_sst, e);
436 }
437 };
438 let l0 = &compaction_group.l0;
439 for level in &l0.sub_levels {
441 visit_level(level);
442 }
443 for level in &compaction_group.levels {
444 visit_level(level);
445 }
446 }
447 version.levels.len()
448 })
449 .await;
450
451 let _ = writeln!(s, "number of SSTables: {sst_num}");
452 let _ = writeln!(s, "total size of SSTables (byte): {sst_total_file_size}");
453 let _ = writeln!(s, "number of compaction groups: {compaction_group_num}");
454 use comfy_table::{Row, Table};
455 fn format_table(heap: BinaryHeap<Reverse<SstableSort>>) -> Table {
456 let mut table = Table::new();
457 table.set_header({
458 let mut row = Row::new();
459 row.add_cell("compaction group id".into());
460 row.add_cell("sstable id".into());
461 row.add_cell("delete ratio".into());
462 row
463 });
464 for sst in &heap.into_sorted_vec() {
465 let mut row = Row::new();
466 row.add_cell(sst.0.compaction_group_id.into());
467 row.add_cell(sst.0.sst_id.into());
468 row.add_cell(format!("{:.2}%", sst.0.delete_ratio as f64 / 100.0).into());
469 table.add_row(row);
470 }
471 table
472 }
473 let _ = writeln!(s);
474 let _ = writeln!(s, "top tombstone delete ratio");
475 let _ = writeln!(s, "{}", format_table(top_tombstone_delete_sst));
476 let _ = writeln!(s);
477
478 let _ = writeln!(s);
479 self.write_storage_prometheus(s).await;
480 }
481
482 async fn write_streaming_prometheus(&self, s: &mut String) {
483 let _ = writeln!(s, "top sources by throughput (rows/s)");
484 let query = format!(
485 "topk(3, sum(rate(stream_source_output_rows_counts{{{}}}[10m]))by (source_name))",
486 self.prometheus_selector
487 );
488 self.write_instant_vector_impl(s, &query, vec!["source_name"])
489 .await;
490
491 let _ = writeln!(s);
492 let _ = writeln!(s, "top materialized views by throughput (rows/s)");
493 let query = format!(
494 "topk(3, sum(rate(stream_mview_input_row_count{{{}}}[10m]))by (table_id))",
495 self.prometheus_selector
496 );
497 self.write_instant_vector_impl(s, &query, vec!["table_id"])
498 .await;
499
500 let _ = writeln!(s);
501 let _ = writeln!(s, "top join executor by matched rows");
502 let query = format!(
503 "topk(3, histogram_quantile(0.9, sum(rate(stream_join_matched_join_keys_bucket{{{}}}[10m])) by (le, fragment_id, table_id)))",
504 self.prometheus_selector
505 );
506 self.write_instant_vector_impl(s, &query, vec!["table_id", "fragment_id"])
507 .await;
508 }
509
510 async fn write_storage_prometheus(&self, s: &mut String) {
511 let _ = writeln!(s, "top Hummock Get by duration (second)");
512 let query = format!(
513 "topk(3, histogram_quantile(0.9, sum(rate(state_store_get_duration_bucket{{{}}}[10m])) by (le, table_id)))",
514 self.prometheus_selector
515 );
516 self.write_instant_vector_impl(s, &query, vec!["table_id"])
517 .await;
518
519 let _ = writeln!(s);
520 let _ = writeln!(s, "top Hummock Iter Init by duration (second)");
521 let query = format!(
522 "topk(3, histogram_quantile(0.9, sum(rate(state_store_iter_init_duration_bucket{{{}}}[10m])) by (le, table_id)))",
523 self.prometheus_selector
524 );
525 self.write_instant_vector_impl(s, &query, vec!["table_id"])
526 .await;
527
528 let _ = writeln!(s);
529 let _ = writeln!(s, "top table commit flush by size (byte)");
530 let query = format!(
531 "topk(3, sum(rate(storage_commit_write_throughput{{{}}}[10m])) by (table_id))",
532 self.prometheus_selector
533 );
534 self.write_instant_vector_impl(s, &query, vec!["table_id"])
535 .await;
536
537 let _ = writeln!(s);
538 let _ = writeln!(s, "object store read throughput (bytes/s)");
539 let query = format!(
540 "sum(rate(object_store_read_bytes{{{}}}[10m])) by (job, instance)",
541 merge_prometheus_selector([&self.prometheus_selector, "job=~\"compute|compactor\""])
542 );
543 self.write_instant_vector_impl(s, &query, vec!["job", "instance"])
544 .await;
545
546 let _ = writeln!(s);
547 let _ = writeln!(s, "object store write throughput (bytes/s)");
548 let query = format!(
549 "sum(rate(object_store_write_bytes{{{}}}[10m])) by (job, instance)",
550 merge_prometheus_selector([&self.prometheus_selector, "job=~\"compute|compactor\""])
551 );
552 self.write_instant_vector_impl(s, &query, vec!["job", "instance"])
553 .await;
554
555 let _ = writeln!(s);
556 let _ = writeln!(s, "object store operation rate");
557 let query = format!(
558 "sum(rate(object_store_operation_latency_count{{{}}}[10m])) by (le, type, job, instance)",
559 merge_prometheus_selector([
560 &self.prometheus_selector,
561 "job=~\"compute|compactor\", type!~\"streaming_upload_write_bytes|streaming_read_read_bytes|streaming_read\""
562 ])
563 );
564 self.write_instant_vector_impl(s, &query, vec!["type", "job", "instance"])
565 .await;
566
567 let _ = writeln!(s);
568 let _ = writeln!(s, "object store operation duration (second)");
569 let query = format!(
570 "histogram_quantile(0.9, sum(rate(object_store_operation_latency_bucket{{{}}}[10m])) by (le, type, job, instance))",
571 merge_prometheus_selector([
572 &self.prometheus_selector,
573 "job=~\"compute|compactor\", type!~\"streaming_upload_write_bytes|streaming_read\""
574 ])
575 );
576 self.write_instant_vector_impl(s, &query, vec!["type", "job", "instance"])
577 .await;
578 }
579
580 async fn write_instant_vector_impl(&self, s: &mut String, query: &str, labels: Vec<&str>) {
581 let Some(ref client) = self.prometheus_client else {
582 return;
583 };
584 if let Ok(Vector(instant_vec)) = client
585 .query(query)
586 .get()
587 .await
588 .map(|result| result.into_inner().0)
589 {
590 for i in instant_vec {
591 let l = labels
592 .iter()
593 .map(|label| {
594 format!(
595 "{}={}",
596 *label,
597 i.metric()
598 .get(*label)
599 .map(|s| s.as_str())
600 .unwrap_or_default()
601 )
602 })
603 .join(",");
604 let _ = writeln!(s, "{}: {:.3}", l, i.sample().value());
605 }
606 }
607 }
608
609 async fn write_await_tree(&self, s: &mut String, actor_traces_format: ActorTracesFormat) {
610 let all = dump_cluster_await_tree(
611 &self.metadata_manager,
612 &self.await_tree_reg,
613 actor_traces_format,
614 )
615 .await;
616
617 if let Ok(all) = all {
618 write!(s, "{}", all.output()).unwrap();
619 } else {
620 tracing::warn!("failed to dump await tree");
621 }
622 }
623
624 async fn write_table_definition(&self, s: &mut String) -> MetaResult<()> {
625 let sources = self
626 .metadata_manager
627 .catalog_controller
628 .list_sources()
629 .await?
630 .into_iter()
631 .map(|s| {
632 (
633 s.id.into(),
634 (s.name, s.schema_id, s.definition, s.created_at_epoch),
635 )
636 })
637 .collect::<BTreeMap<_, _>>();
638 let mut user_tables = BTreeMap::new();
639 let mut mvs = BTreeMap::new();
640 let mut indexes = BTreeMap::new();
641 let mut internal_tables = BTreeMap::new();
642 {
643 let grouped = self
644 .metadata_manager
645 .catalog_controller
646 .list_all_state_tables()
647 .await?
648 .into_iter()
649 .chunk_by(|t| t.table_type());
650 for (table_type, tables) in &grouped {
651 let tables = tables.into_iter().map(|t| {
652 (
653 t.id.into(),
654 (t.name, t.schema_id, t.definition, t.created_at_epoch),
655 )
656 });
657 match table_type {
658 PbTableType::Table => user_tables.extend(tables),
659 PbTableType::MaterializedView => mvs.extend(tables),
660 PbTableType::Index | PbTableType::VectorIndex => indexes.extend(tables),
661 PbTableType::Internal => internal_tables.extend(tables),
662 PbTableType::Unspecified => {
663 tracing::error!("unspecified table type: {:?}", tables.collect_vec());
664 }
665 }
666 }
667 }
668 let sinks = self
669 .metadata_manager
670 .catalog_controller
671 .list_sinks()
672 .await?
673 .into_iter()
674 .map(|s| {
675 (
676 s.id.into(),
677 (s.name, s.schema_id, s.definition, s.created_at_epoch),
678 )
679 })
680 .collect::<BTreeMap<_, _>>();
681 let catalogs = [
682 ("SOURCE", sources),
683 ("TABLE", user_tables),
684 ("MATERIALIZED VIEW", mvs),
685 ("INDEX", indexes),
686 ("SINK", sinks),
687 ("INTERNAL TABLE", internal_tables),
688 ];
689 let mut obj_id_to_name: HashMap<ObjectId, _> = HashMap::new();
690 for (title, items) in catalogs {
691 use comfy_table::{Row, Table};
692 let mut table = Table::new();
693 table.set_header({
694 let mut row = Row::new();
695 row.add_cell("id".into());
696 row.add_cell("name".into());
697 row.add_cell("schema_id".into());
698 row.add_cell("created_at".into());
699 row.add_cell("definition".into());
700 row
701 });
702 for (id, (name, schema_id, definition, created_at_epoch)) in items {
703 obj_id_to_name.insert(id, name.clone());
704 let mut row = Row::new();
705 let may_redact = redact_sql(&definition, self.redact_sql_option_keywords.clone())
706 .unwrap_or_else(|| "[REDACTED]".into());
707 let created_at = if let Some(created_at_epoch) = created_at_epoch {
708 format!("{}", Epoch::from(created_at_epoch).as_timestamptz())
709 } else {
710 "".into()
711 };
712 row.add_cell(id.into());
713 row.add_cell(name.into());
714 row.add_cell(schema_id.into());
715 row.add_cell(created_at.into());
716 row.add_cell(may_redact.into());
717 table.add_row(row);
718 }
719 let _ = writeln!(s);
720 let _ = writeln!(s, "{title}");
721 let _ = writeln!(s, "{table}");
722 }
723
724 let actors = self
725 .metadata_manager
726 .catalog_controller
727 .list_actor_info()
728 .await?
729 .into_iter()
730 .map(|(actor_id, fragment_id, job_id, schema_id, obj_type)| {
731 (
732 actor_id,
733 (
734 fragment_id,
735 job_id,
736 schema_id,
737 obj_type,
738 obj_id_to_name.get(&job_id).cloned().unwrap_or_default(),
739 ),
740 )
741 })
742 .collect::<BTreeMap<_, _>>();
743
744 use comfy_table::{Row, Table};
745 let mut table = Table::new();
746 table.set_header({
747 let mut row = Row::new();
748 row.add_cell("id".into());
749 row.add_cell("fragment_id".into());
750 row.add_cell("job_id".into());
751 row.add_cell("schema_id".into());
752 row.add_cell("type".into());
753 row.add_cell("name".into());
754 row
755 });
756 for (actor_id, (fragment_id, job_id, schema_id, ddl_type, name)) in actors {
757 let mut row = Row::new();
758 row.add_cell(actor_id.into());
759 row.add_cell(fragment_id.into());
760 row.add_cell(job_id.into());
761 row.add_cell(schema_id.into());
762 row.add_cell(ddl_type.as_str().into());
763 row.add_cell(name.into());
764 table.add_row(row);
765 }
766 let _ = writeln!(s);
767 let _ = writeln!(s, "ACTOR");
768 let _ = writeln!(s, "{table}");
769 Ok(())
770 }
771}
772
773fn try_add_cell<T: Into<comfy_table::Cell>>(row: &mut comfy_table::Row, t: Option<T>) {
774 match t {
775 Some(t) => {
776 row.add_cell(t.into());
777 }
778 None => {
779 row.add_cell("".into());
780 }
781 }
782}
783
784fn merge_prometheus_selector<'a>(selectors: impl IntoIterator<Item = &'a str>) -> String {
785 selectors.into_iter().filter(|s| !s.is_empty()).join(",")
786}
787
788fn redact_sql(sql: &str, keywords: RedactSqlOptionKeywordsRef) -> Option<String> {
789 match Parser::parse_sql(sql) {
790 Ok(sqls) => Some(
791 sqls.into_iter()
792 .map(|sql| sql.to_redacted_string(keywords.clone()))
793 .join(";"),
794 ),
795 Err(_) => None,
796 }
797}