1use std::cmp::{Ordering, Reverse};
16use std::collections::{BTreeMap, BinaryHeap, HashMap};
17use std::fmt::Write;
18use std::sync::Arc;
19
20use itertools::Itertools;
21use prometheus_http_query::response::Data::Vector;
22use risingwave_common::types::Timestamptz;
23use risingwave_common::util::StackTraceResponseExt;
24use risingwave_common::util::epoch::Epoch;
25use risingwave_hummock_sdk::HummockSstableId;
26use risingwave_hummock_sdk::level::Level;
27use risingwave_pb::catalog::table::PbTableType;
28use risingwave_pb::common::WorkerType;
29use risingwave_pb::meta::EventLog;
30use risingwave_pb::meta::event_log::Event;
31use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
32use risingwave_sqlparser::ast::RedactSqlOptionKeywordsRef;
33use risingwave_sqlparser::parser::Parser;
34use serde_json::json;
35use thiserror_ext::AsReport;
36
37use crate::MetaResult;
38use crate::hummock::HummockManagerRef;
39use crate::manager::MetadataManager;
40use crate::manager::event_log::EventLogManagerRef;
41use crate::rpc::await_tree::dump_cluster_await_tree;
42
43pub type DiagnoseCommandRef = Arc<DiagnoseCommand>;
44
45pub struct DiagnoseCommand {
46 metadata_manager: MetadataManager,
47 await_tree_reg: await_tree::Registry,
48 hummock_manger: HummockManagerRef,
49 event_log_manager: EventLogManagerRef,
50 prometheus_client: Option<prometheus_http_query::Client>,
51 prometheus_selector: String,
52 redact_sql_option_keywords: RedactSqlOptionKeywordsRef,
53}
54
55impl DiagnoseCommand {
56 pub fn new(
57 metadata_manager: MetadataManager,
58 await_tree_reg: await_tree::Registry,
59 hummock_manger: HummockManagerRef,
60 event_log_manager: EventLogManagerRef,
61 prometheus_client: Option<prometheus_http_query::Client>,
62 prometheus_selector: String,
63 redact_sql_option_keywords: RedactSqlOptionKeywordsRef,
64 ) -> Self {
65 Self {
66 metadata_manager,
67 await_tree_reg,
68 hummock_manger,
69 event_log_manager,
70 prometheus_client,
71 prometheus_selector,
72 redact_sql_option_keywords,
73 }
74 }
75
76 pub async fn report(&self, actor_traces_format: ActorTracesFormat) -> String {
77 let mut report = String::new();
78 let _ = writeln!(
79 report,
80 "report created at: {}\nversion: {}",
81 chrono::DateTime::<chrono::offset::Utc>::from(std::time::SystemTime::now()),
82 risingwave_common::current_cluster_version(),
83 );
84 let _ = writeln!(report);
85 self.write_catalog(&mut report).await;
86 let _ = writeln!(report);
87 self.write_worker_nodes(&mut report).await;
88 let _ = writeln!(report);
89 self.write_streaming_prometheus(&mut report).await;
90 let _ = writeln!(report);
91 self.write_storage(&mut report).await;
92 let _ = writeln!(report);
93 self.write_await_tree(&mut report, actor_traces_format)
94 .await;
95 let _ = writeln!(report);
96 self.write_event_logs(&mut report);
97 report
98 }
99
100 async fn write_catalog(&self, s: &mut String) {
101 self.write_catalog_inner(s).await;
102 let _ = self.write_table_definition(s).await.inspect_err(|e| {
103 tracing::warn!(
104 error = e.to_report_string(),
105 "failed to display table definition"
106 )
107 });
108 }
109
110 async fn write_catalog_inner(&self, s: &mut String) {
111 let guard = self
112 .metadata_manager
113 .catalog_controller
114 .get_inner_read_guard()
115 .await;
116 let stat = match guard.stats().await {
117 Ok(stat) => stat,
118 Err(err) => {
119 tracing::warn!(error=?err.as_report(), "failed to get catalog stats");
120 return;
121 }
122 };
123 let _ = writeln!(s, "number of fragment: {}", stat.streaming_job_num);
124 let _ = writeln!(s, "number of actor: {}", stat.actor_num);
125 let _ = writeln!(s, "number of source: {}", stat.source_num);
126 let _ = writeln!(s, "number of table: {}", stat.table_num);
127 let _ = writeln!(s, "number of materialized view: {}", stat.mview_num);
128 let _ = writeln!(s, "number of sink: {}", stat.sink_num);
129 let _ = writeln!(s, "number of index: {}", stat.index_num);
130 let _ = writeln!(s, "number of function: {}", stat.function_num);
131 }
132
133 async fn write_worker_nodes(&self, s: &mut String) {
134 let Ok(worker_actor_count) = self.metadata_manager.worker_actor_count().await else {
135 tracing::warn!("failed to get worker actor count");
136 return;
137 };
138
139 use comfy_table::{Row, Table};
140 let Ok(worker_nodes) = self.metadata_manager.list_worker_node(None, None).await else {
141 tracing::warn!("failed to get worker nodes");
142 return;
143 };
144 let mut table = Table::new();
145 table.set_header({
146 let mut row = Row::new();
147 row.add_cell("id".into());
148 row.add_cell("host".into());
149 row.add_cell("type".into());
150 row.add_cell("state".into());
151 row.add_cell("parallelism".into());
152 row.add_cell("is_streaming".into());
153 row.add_cell("is_serving".into());
154 row.add_cell("rw_version".into());
155 row.add_cell("total_memory_bytes".into());
156 row.add_cell("total_cpu_cores".into());
157 row.add_cell("started_at".into());
158 row.add_cell("actor_count".into());
159 row
160 });
161 for worker_node in worker_nodes {
162 let mut row = Row::new();
163 row.add_cell(worker_node.id.into());
164 try_add_cell(
165 &mut row,
166 worker_node
167 .host
168 .as_ref()
169 .map(|h| format!("{}:{}", h.host, h.port)),
170 );
171 try_add_cell(
172 &mut row,
173 worker_node.get_type().ok().map(|t| t.as_str_name()),
174 );
175 try_add_cell(
176 &mut row,
177 worker_node.get_state().ok().map(|s| s.as_str_name()),
178 );
179 try_add_cell(&mut row, worker_node.parallelism());
180 try_add_cell(
181 &mut row,
182 worker_node.property.as_ref().map(|p| p.is_streaming),
183 );
184 try_add_cell(
185 &mut row,
186 worker_node.property.as_ref().map(|p| p.is_serving),
187 );
188 try_add_cell(
189 &mut row,
190 worker_node.resource.as_ref().map(|r| r.rw_version.clone()),
191 );
192 try_add_cell(
193 &mut row,
194 worker_node.resource.as_ref().map(|r| r.total_memory_bytes),
195 );
196 try_add_cell(
197 &mut row,
198 worker_node.resource.as_ref().map(|r| r.total_cpu_cores),
199 );
200 try_add_cell(
201 &mut row,
202 worker_node
203 .started_at
204 .and_then(|ts| Timestamptz::from_secs(ts as _).map(|t| t.to_string())),
205 );
206 let actor_count = {
207 if let Ok(t) = worker_node.get_type()
208 && t != WorkerType::ComputeNode
209 {
210 None
211 } else {
212 match worker_actor_count.get(&(worker_node.id as _)) {
213 None => Some(0),
214 Some(c) => Some(*c),
215 }
216 }
217 };
218 try_add_cell(&mut row, actor_count);
219 table.add_row(row);
220 }
221 let _ = writeln!(s, "{table}");
222 }
223
224 fn write_event_logs(&self, s: &mut String) {
225 let event_logs = self
226 .event_log_manager
227 .list_event_logs()
228 .into_iter()
229 .sorted_by(|a, b| {
230 a.timestamp
231 .unwrap_or(0)
232 .cmp(&b.timestamp.unwrap_or(0))
233 .reverse()
234 })
235 .collect_vec();
236
237 let _ = writeln!(s, "latest barrier completions");
238 Self::write_event_logs_impl(
239 s,
240 event_logs.iter(),
241 |e| {
242 let Event::BarrierComplete(info) = e else {
243 return None;
244 };
245 Some(json!(info).to_string())
246 },
247 Some(10),
248 );
249
250 let _ = writeln!(s);
251 let _ = writeln!(s, "latest barrier collection failures");
252 Self::write_event_logs_impl(
253 s,
254 event_logs.iter(),
255 |e| {
256 let Event::CollectBarrierFail(info) = e else {
257 return None;
258 };
259 Some(json!(info).to_string())
260 },
261 Some(3),
262 );
263
264 let _ = writeln!(s);
265 let _ = writeln!(s, "latest barrier injection failures");
266 Self::write_event_logs_impl(
267 s,
268 event_logs.iter(),
269 |e| {
270 let Event::InjectBarrierFail(info) = e else {
271 return None;
272 };
273 Some(json!(info).to_string())
274 },
275 Some(3),
276 );
277
278 let _ = writeln!(s);
279 let _ = writeln!(s, "latest worker node panics");
280 Self::write_event_logs_impl(
281 s,
282 event_logs.iter(),
283 |e| {
284 let Event::WorkerNodePanic(info) = e else {
285 return None;
286 };
287 Some(json!(info).to_string())
288 },
289 Some(10),
290 );
291
292 let _ = writeln!(s);
293 let _ = writeln!(s, "latest create stream job failures");
294 Self::write_event_logs_impl(
295 s,
296 event_logs.iter(),
297 |e| {
298 let Event::CreateStreamJobFail(info) = e else {
299 return None;
300 };
301 Some(json!(info).to_string())
302 },
303 Some(3),
304 );
305
306 let _ = writeln!(s);
307 let _ = writeln!(s, "latest dirty stream job clear-ups");
308 Self::write_event_logs_impl(
309 s,
310 event_logs.iter(),
311 |e| {
312 let Event::DirtyStreamJobClear(info) = e else {
313 return None;
314 };
315 Some(json!(info).to_string())
316 },
317 Some(3),
318 );
319 }
320
321 fn write_event_logs_impl<'a, F>(
322 s: &mut String,
323 event_logs: impl Iterator<Item = &'a EventLog>,
324 get_event_info: F,
325 limit: Option<usize>,
326 ) where
327 F: Fn(&Event) -> Option<String>,
328 {
329 use comfy_table::{Row, Table};
330 let mut table = Table::new();
331 table.set_header({
332 let mut row = Row::new();
333 row.add_cell("created_at".into());
334 row.add_cell("info".into());
335 row
336 });
337 let mut row_count = 0;
338 for event_log in event_logs {
339 let Some(ref inner) = event_log.event else {
340 continue;
341 };
342 if let Some(limit) = limit
343 && row_count >= limit
344 {
345 break;
346 }
347 let mut row = Row::new();
348 let ts = event_log
349 .timestamp
350 .and_then(|ts| Timestamptz::from_millis(ts as _).map(|ts| ts.to_string()));
351 try_add_cell(&mut row, ts);
352 if let Some(info) = get_event_info(inner) {
353 row.add_cell(info.into());
354 row_count += 1;
355 } else {
356 continue;
357 }
358 table.add_row(row);
359 }
360 let _ = writeln!(s, "{table}");
361 }
362
363 async fn write_storage(&self, s: &mut String) {
364 let mut sst_num = 0;
365 let mut sst_total_file_size = 0;
366 let back_pressured_compaction_groups = self
367 .hummock_manger
368 .write_limits()
369 .await
370 .into_iter()
371 .filter_map(|(k, v)| {
372 if v.table_ids.is_empty() {
373 None
374 } else {
375 Some(k)
376 }
377 })
378 .join(",");
379 if !back_pressured_compaction_groups.is_empty() {
380 let _ = writeln!(
381 s,
382 "back pressured compaction groups: {back_pressured_compaction_groups}"
383 );
384 }
385
386 #[derive(PartialEq, Eq)]
387 struct SstableSort {
388 compaction_group_id: u64,
389 sst_id: HummockSstableId,
390 delete_ratio: u64,
391 }
392 impl PartialOrd for SstableSort {
393 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
394 Some(self.cmp(other))
395 }
396 }
397 impl Ord for SstableSort {
398 fn cmp(&self, other: &Self) -> Ordering {
399 self.delete_ratio.cmp(&other.delete_ratio)
400 }
401 }
402 fn top_k_sstables(
403 top_k: usize,
404 heap: &mut BinaryHeap<Reverse<SstableSort>>,
405 e: SstableSort,
406 ) {
407 if heap.len() < top_k {
408 heap.push(Reverse(e));
409 } else if let Some(mut p) = heap.peek_mut()
410 && e.delete_ratio > p.0.delete_ratio
411 {
412 *p = Reverse(e);
413 }
414 }
415
416 let top_k = 10;
417 let mut top_tombstone_delete_sst = BinaryHeap::with_capacity(top_k);
418 let compaction_group_num = self
419 .hummock_manger
420 .on_current_version(|version| {
421 for compaction_group in version.levels.values() {
422 let mut visit_level = |level: &Level| {
423 sst_num += level.table_infos.len();
424 sst_total_file_size +=
425 level.table_infos.iter().map(|t| t.sst_size).sum::<u64>();
426 for sst in &level.table_infos {
427 if sst.total_key_count == 0 {
428 continue;
429 }
430 let tombstone_delete_ratio =
431 sst.stale_key_count * 10000 / sst.total_key_count;
432 let e = SstableSort {
433 compaction_group_id: compaction_group.group_id,
434 sst_id: sst.sst_id,
435 delete_ratio: tombstone_delete_ratio,
436 };
437 top_k_sstables(top_k, &mut top_tombstone_delete_sst, e);
438 }
439 };
440 let l0 = &compaction_group.l0;
441 for level in &l0.sub_levels {
443 visit_level(level);
444 }
445 for level in &compaction_group.levels {
446 visit_level(level);
447 }
448 }
449 version.levels.len()
450 })
451 .await;
452
453 let _ = writeln!(s, "number of SSTables: {sst_num}");
454 let _ = writeln!(s, "total size of SSTables (byte): {sst_total_file_size}");
455 let _ = writeln!(s, "number of compaction groups: {compaction_group_num}");
456 use comfy_table::{Row, Table};
457 fn format_table(heap: BinaryHeap<Reverse<SstableSort>>) -> Table {
458 let mut table = Table::new();
459 table.set_header({
460 let mut row = Row::new();
461 row.add_cell("compaction group id".into());
462 row.add_cell("sstable id".into());
463 row.add_cell("delete ratio".into());
464 row
465 });
466 for sst in &heap.into_sorted_vec() {
467 let mut row = Row::new();
468 row.add_cell(sst.0.compaction_group_id.into());
469 row.add_cell(sst.0.sst_id.into());
470 row.add_cell(format!("{:.2}%", sst.0.delete_ratio as f64 / 100.0).into());
471 table.add_row(row);
472 }
473 table
474 }
475 let _ = writeln!(s);
476 let _ = writeln!(s, "top tombstone delete ratio");
477 let _ = writeln!(s, "{}", format_table(top_tombstone_delete_sst));
478 let _ = writeln!(s);
479
480 let _ = writeln!(s);
481 self.write_storage_prometheus(s).await;
482 }
483
484 async fn write_streaming_prometheus(&self, s: &mut String) {
485 let _ = writeln!(s, "top sources by throughput (rows/s)");
486 let query = format!(
487 "topk(3, sum(rate(stream_source_output_rows_counts{{{}}}[10m]))by (source_name))",
488 self.prometheus_selector
489 );
490 self.write_instant_vector_impl(s, &query, vec!["source_name"])
491 .await;
492
493 let _ = writeln!(s);
494 let _ = writeln!(s, "top materialized views by throughput (rows/s)");
495 let query = format!(
496 "topk(3, sum(rate(stream_mview_input_row_count{{{}}}[10m]))by (table_id))",
497 self.prometheus_selector
498 );
499 self.write_instant_vector_impl(s, &query, vec!["table_id"])
500 .await;
501
502 let _ = writeln!(s);
503 let _ = writeln!(s, "top join executor by matched rows");
504 let query = format!(
505 "topk(3, histogram_quantile(0.9, sum(rate(stream_join_matched_join_keys_bucket{{{}}}[10m])) by (le, fragment_id, table_id)))",
506 self.prometheus_selector
507 );
508 self.write_instant_vector_impl(s, &query, vec!["table_id", "fragment_id"])
509 .await;
510 }
511
512 async fn write_storage_prometheus(&self, s: &mut String) {
513 let _ = writeln!(s, "top Hummock Get by duration (second)");
514 let query = format!(
515 "topk(3, histogram_quantile(0.9, sum(rate(state_store_get_duration_bucket{{{}}}[10m])) by (le, table_id)))",
516 self.prometheus_selector
517 );
518 self.write_instant_vector_impl(s, &query, vec!["table_id"])
519 .await;
520
521 let _ = writeln!(s);
522 let _ = writeln!(s, "top Hummock Iter Init by duration (second)");
523 let query = format!(
524 "topk(3, histogram_quantile(0.9, sum(rate(state_store_iter_init_duration_bucket{{{}}}[10m])) by (le, table_id)))",
525 self.prometheus_selector
526 );
527 self.write_instant_vector_impl(s, &query, vec!["table_id"])
528 .await;
529
530 let _ = writeln!(s);
531 let _ = writeln!(s, "top table commit flush by size (byte)");
532 let query = format!(
533 "topk(3, sum(rate(storage_commit_write_throughput{{{}}}[10m])) by (table_id))",
534 self.prometheus_selector
535 );
536 self.write_instant_vector_impl(s, &query, vec!["table_id"])
537 .await;
538
539 let _ = writeln!(s);
540 let _ = writeln!(s, "object store read throughput (bytes/s)");
541 let query = format!(
542 "sum(rate(object_store_read_bytes{{{}}}[10m])) by (job, instance)",
543 merge_prometheus_selector([&self.prometheus_selector, "job=~\"compute|compactor\""])
544 );
545 self.write_instant_vector_impl(s, &query, vec!["job", "instance"])
546 .await;
547
548 let _ = writeln!(s);
549 let _ = writeln!(s, "object store write throughput (bytes/s)");
550 let query = format!(
551 "sum(rate(object_store_write_bytes{{{}}}[10m])) by (job, instance)",
552 merge_prometheus_selector([&self.prometheus_selector, "job=~\"compute|compactor\""])
553 );
554 self.write_instant_vector_impl(s, &query, vec!["job", "instance"])
555 .await;
556
557 let _ = writeln!(s);
558 let _ = writeln!(s, "object store operation rate");
559 let query = format!(
560 "sum(rate(object_store_operation_latency_count{{{}}}[10m])) by (le, type, job, instance)",
561 merge_prometheus_selector([
562 &self.prometheus_selector,
563 "job=~\"compute|compactor\", type!~\"streaming_upload_write_bytes|streaming_read_read_bytes|streaming_read\""
564 ])
565 );
566 self.write_instant_vector_impl(s, &query, vec!["type", "job", "instance"])
567 .await;
568
569 let _ = writeln!(s);
570 let _ = writeln!(s, "object store operation duration (second)");
571 let query = format!(
572 "histogram_quantile(0.9, sum(rate(object_store_operation_latency_bucket{{{}}}[10m])) by (le, type, job, instance))",
573 merge_prometheus_selector([
574 &self.prometheus_selector,
575 "job=~\"compute|compactor\", type!~\"streaming_upload_write_bytes|streaming_read\""
576 ])
577 );
578 self.write_instant_vector_impl(s, &query, vec!["type", "job", "instance"])
579 .await;
580 }
581
582 async fn write_instant_vector_impl(&self, s: &mut String, query: &str, labels: Vec<&str>) {
583 let Some(ref client) = self.prometheus_client else {
584 return;
585 };
586 if let Ok(Vector(instant_vec)) = client
587 .query(query)
588 .get()
589 .await
590 .map(|result| result.into_inner().0)
591 {
592 for i in instant_vec {
593 let l = labels
594 .iter()
595 .map(|label| {
596 format!(
597 "{}={}",
598 *label,
599 i.metric()
600 .get(*label)
601 .map(|s| s.as_str())
602 .unwrap_or_default()
603 )
604 })
605 .join(",");
606 let _ = writeln!(s, "{}: {:.3}", l, i.sample().value());
607 }
608 }
609 }
610
611 async fn write_await_tree(&self, s: &mut String, actor_traces_format: ActorTracesFormat) {
612 let all = dump_cluster_await_tree(
613 &self.metadata_manager,
614 &self.await_tree_reg,
615 actor_traces_format,
616 )
617 .await;
618
619 if let Ok(all) = all {
620 write!(s, "{}", all.output()).unwrap();
621 } else {
622 tracing::warn!("failed to dump await tree");
623 }
624 }
625
626 async fn write_table_definition(&self, s: &mut String) -> MetaResult<()> {
627 let sources = self
628 .metadata_manager
629 .catalog_controller
630 .list_sources()
631 .await?
632 .into_iter()
633 .map(|s| {
634 (
635 s.id,
636 (s.name, s.schema_id, s.definition, s.created_at_epoch),
637 )
638 })
639 .collect::<BTreeMap<_, _>>();
640 let mut user_tables = BTreeMap::new();
641 let mut mvs = BTreeMap::new();
642 let mut indexes = BTreeMap::new();
643 let mut internal_tables = BTreeMap::new();
644 {
645 let grouped = self
646 .metadata_manager
647 .catalog_controller
648 .list_all_state_tables()
649 .await?
650 .into_iter()
651 .chunk_by(|t| t.table_type());
652 for (table_type, tables) in &grouped {
653 let tables = tables.into_iter().map(|t| {
654 (
655 t.id,
656 (t.name, t.schema_id, t.definition, t.created_at_epoch),
657 )
658 });
659 match table_type {
660 PbTableType::Table => user_tables.extend(tables),
661 PbTableType::MaterializedView => mvs.extend(tables),
662 PbTableType::Index | PbTableType::VectorIndex => indexes.extend(tables),
663 PbTableType::Internal => internal_tables.extend(tables),
664 PbTableType::Unspecified => {
665 tracing::error!("unspecified table type: {:?}", tables.collect_vec());
666 }
667 }
668 }
669 }
670 let sinks = self
671 .metadata_manager
672 .catalog_controller
673 .list_sinks()
674 .await?
675 .into_iter()
676 .map(|s| {
677 (
678 s.id,
679 (s.name, s.schema_id, s.definition, s.created_at_epoch),
680 )
681 })
682 .collect::<BTreeMap<_, _>>();
683 let catalogs = [
684 ("SOURCE", sources),
685 ("TABLE", user_tables),
686 ("MATERIALIZED VIEW", mvs),
687 ("INDEX", indexes),
688 ("SINK", sinks),
689 ("INTERNAL TABLE", internal_tables),
690 ];
691 let mut obj_id_to_name = HashMap::new();
692 for (title, items) in catalogs {
693 use comfy_table::{Row, Table};
694 let mut table = Table::new();
695 table.set_header({
696 let mut row = Row::new();
697 row.add_cell("id".into());
698 row.add_cell("name".into());
699 row.add_cell("schema_id".into());
700 row.add_cell("created_at".into());
701 row.add_cell("definition".into());
702 row
703 });
704 for (id, (name, schema_id, definition, created_at_epoch)) in items {
705 obj_id_to_name.insert(id, name.clone());
706 let mut row = Row::new();
707 let may_redact = redact_sql(&definition, self.redact_sql_option_keywords.clone())
708 .unwrap_or_else(|| "[REDACTED]".into());
709 let created_at = if let Some(created_at_epoch) = created_at_epoch {
710 format!("{}", Epoch::from(created_at_epoch).as_timestamptz())
711 } else {
712 "".into()
713 };
714 row.add_cell(id.into());
715 row.add_cell(name.into());
716 row.add_cell(schema_id.into());
717 row.add_cell(created_at.into());
718 row.add_cell(may_redact.into());
719 table.add_row(row);
720 }
721 let _ = writeln!(s);
722 let _ = writeln!(s, "{title}");
723 let _ = writeln!(s, "{table}");
724 }
725
726 let actors = self
727 .metadata_manager
728 .catalog_controller
729 .list_actor_info()
730 .await?
731 .into_iter()
732 .map(|(actor_id, fragment_id, job_id, schema_id, obj_type)| {
733 (
734 actor_id,
735 (
736 fragment_id,
737 job_id,
738 schema_id,
739 obj_type,
740 obj_id_to_name
741 .get(&(job_id as _))
742 .cloned()
743 .unwrap_or_default(),
744 ),
745 )
746 })
747 .collect::<BTreeMap<_, _>>();
748
749 use comfy_table::{Row, Table};
750 let mut table = Table::new();
751 table.set_header({
752 let mut row = Row::new();
753 row.add_cell("id".into());
754 row.add_cell("fragment_id".into());
755 row.add_cell("job_id".into());
756 row.add_cell("schema_id".into());
757 row.add_cell("type".into());
758 row.add_cell("name".into());
759 row
760 });
761 for (actor_id, (fragment_id, job_id, schema_id, ddl_type, name)) in actors {
762 let mut row = Row::new();
763 row.add_cell(actor_id.into());
764 row.add_cell(fragment_id.into());
765 row.add_cell(job_id.into());
766 row.add_cell(schema_id.into());
767 row.add_cell(ddl_type.as_str().into());
768 row.add_cell(name.into());
769 table.add_row(row);
770 }
771 let _ = writeln!(s);
772 let _ = writeln!(s, "ACTOR");
773 let _ = writeln!(s, "{table}");
774 Ok(())
775 }
776}
777
778fn try_add_cell<T: Into<comfy_table::Cell>>(row: &mut comfy_table::Row, t: Option<T>) {
779 match t {
780 Some(t) => {
781 row.add_cell(t.into());
782 }
783 None => {
784 row.add_cell("".into());
785 }
786 }
787}
788
789fn merge_prometheus_selector<'a>(selectors: impl IntoIterator<Item = &'a str>) -> String {
790 selectors.into_iter().filter(|s| !s.is_empty()).join(",")
791}
792
793fn redact_sql(sql: &str, keywords: RedactSqlOptionKeywordsRef) -> Option<String> {
794 match Parser::parse_sql(sql) {
795 Ok(sqls) => Some(
796 sqls.into_iter()
797 .map(|sql| sql.to_redacted_string(keywords.clone()))
798 .join(";"),
799 ),
800 Err(_) => None,
801 }
802}