1use std::cmp::{Ordering, Reverse};
16use std::collections::{BTreeMap, BinaryHeap, HashMap};
17use std::fmt::Write;
18use std::sync::Arc;
19
20use itertools::Itertools;
21use prometheus_http_query::response::Data::Vector;
22use risingwave_common::types::Timestamptz;
23use risingwave_common::util::StackTraceResponseExt;
24use risingwave_common::util::epoch::Epoch;
25use risingwave_hummock_sdk::HummockSstableId;
26use risingwave_hummock_sdk::level::Level;
27use risingwave_pb::catalog::table::PbTableType;
28use risingwave_pb::common::WorkerType;
29use risingwave_pb::meta::EventLog;
30use risingwave_pb::meta::event_log::Event;
31use risingwave_pb::monitor_service::stack_trace_request::ActorTracesFormat;
32use risingwave_sqlparser::ast::RedactSqlOptionKeywordsRef;
33use risingwave_sqlparser::parser::Parser;
34use serde_json::json;
35use thiserror_ext::AsReport;
36
37use crate::MetaResult;
38use crate::hummock::HummockManagerRef;
39use crate::manager::MetadataManager;
40use crate::manager::event_log::EventLogManagerRef;
41use crate::rpc::await_tree::dump_cluster_await_tree;
42
43pub type DiagnoseCommandRef = Arc<DiagnoseCommand>;
44
45pub struct DiagnoseCommand {
46 metadata_manager: MetadataManager,
47 await_tree_reg: await_tree::Registry,
48 hummock_manger: HummockManagerRef,
49 event_log_manager: EventLogManagerRef,
50 prometheus_client: Option<prometheus_http_query::Client>,
51 prometheus_selector: String,
52 redact_sql_option_keywords: RedactSqlOptionKeywordsRef,
53}
54
55impl DiagnoseCommand {
56 pub fn new(
57 metadata_manager: MetadataManager,
58 await_tree_reg: await_tree::Registry,
59 hummock_manger: HummockManagerRef,
60 event_log_manager: EventLogManagerRef,
61 prometheus_client: Option<prometheus_http_query::Client>,
62 prometheus_selector: String,
63 redact_sql_option_keywords: RedactSqlOptionKeywordsRef,
64 ) -> Self {
65 Self {
66 metadata_manager,
67 await_tree_reg,
68 hummock_manger,
69 event_log_manager,
70 prometheus_client,
71 prometheus_selector,
72 redact_sql_option_keywords,
73 }
74 }
75
76 pub async fn report(&self, actor_traces_format: ActorTracesFormat) -> String {
77 let mut report = String::new();
78 let _ = writeln!(
79 report,
80 "report created at: {}\nversion: {}",
81 chrono::DateTime::<chrono::offset::Utc>::from(std::time::SystemTime::now()),
82 risingwave_common::current_cluster_version(),
83 );
84 let _ = writeln!(report);
85 self.write_catalog(&mut report).await;
86 let _ = writeln!(report);
87 self.write_worker_nodes(&mut report).await;
88 let _ = writeln!(report);
89 self.write_streaming_prometheus(&mut report).await;
90 let _ = writeln!(report);
91 self.write_storage(&mut report).await;
92 let _ = writeln!(report);
93 self.write_await_tree(&mut report, actor_traces_format)
94 .await;
95 let _ = writeln!(report);
96 self.write_event_logs(&mut report);
97 report
98 }
99
100 async fn write_catalog(&self, s: &mut String) {
101 self.write_catalog_inner(s).await;
102 let _ = self.write_table_definition(s).await.inspect_err(|e| {
103 tracing::warn!(
104 error = e.to_report_string(),
105 "failed to display table definition"
106 )
107 });
108 }
109
110 async fn write_catalog_inner(&self, s: &mut String) {
111 let stats = self.metadata_manager.catalog_controller.stats().await;
112
113 let stat = match stats {
114 Ok(stat) => stat,
115 Err(err) => {
116 tracing::warn!(error=?err.as_report(), "failed to get catalog stats");
117 return;
118 }
119 };
120 let _ = writeln!(s, "number of fragment: {}", stat.streaming_job_num);
121 let _ = writeln!(s, "number of actor: {}", stat.actor_num);
122 let _ = writeln!(s, "number of source: {}", stat.source_num);
123 let _ = writeln!(s, "number of table: {}", stat.table_num);
124 let _ = writeln!(s, "number of materialized view: {}", stat.mview_num);
125 let _ = writeln!(s, "number of sink: {}", stat.sink_num);
126 let _ = writeln!(s, "number of index: {}", stat.index_num);
127 let _ = writeln!(s, "number of function: {}", stat.function_num);
128 }
129
130 async fn write_worker_nodes(&self, s: &mut String) {
131 let Ok(worker_actor_count) = self.metadata_manager.worker_actor_count() else {
132 tracing::warn!("failed to get worker actor count");
133 return;
134 };
135
136 use comfy_table::{Row, Table};
137 let Ok(worker_nodes) = self.metadata_manager.list_worker_node(None, None).await else {
138 tracing::warn!("failed to get worker nodes");
139 return;
140 };
141 let mut table = Table::new();
142 table.set_header({
143 let mut row = Row::new();
144 row.add_cell("id".into());
145 row.add_cell("host".into());
146 row.add_cell("type".into());
147 row.add_cell("state".into());
148 row.add_cell("parallelism".into());
149 row.add_cell("is_streaming".into());
150 row.add_cell("is_serving".into());
151 row.add_cell("rw_version".into());
152 row.add_cell("total_memory_bytes".into());
153 row.add_cell("total_cpu_cores".into());
154 row.add_cell("started_at".into());
155 row.add_cell("actor_count".into());
156 row
157 });
158 for worker_node in worker_nodes {
159 let mut row = Row::new();
160 row.add_cell(worker_node.id.into());
161 try_add_cell(
162 &mut row,
163 worker_node
164 .host
165 .as_ref()
166 .map(|h| format!("{}:{}", h.host, h.port)),
167 );
168 try_add_cell(
169 &mut row,
170 worker_node.get_type().ok().map(|t| t.as_str_name()),
171 );
172 try_add_cell(
173 &mut row,
174 worker_node.get_state().ok().map(|s| s.as_str_name()),
175 );
176 try_add_cell(&mut row, worker_node.parallelism());
177 try_add_cell(
178 &mut row,
179 worker_node.property.as_ref().map(|p| p.is_streaming),
180 );
181 try_add_cell(
182 &mut row,
183 worker_node.property.as_ref().map(|p| p.is_serving),
184 );
185 try_add_cell(
186 &mut row,
187 worker_node.resource.as_ref().map(|r| r.rw_version.clone()),
188 );
189 try_add_cell(
190 &mut row,
191 worker_node.resource.as_ref().map(|r| r.total_memory_bytes),
192 );
193 try_add_cell(
194 &mut row,
195 worker_node.resource.as_ref().map(|r| r.total_cpu_cores),
196 );
197 try_add_cell(
198 &mut row,
199 worker_node
200 .started_at
201 .and_then(|ts| Timestamptz::from_secs(ts as _).map(|t| t.to_string())),
202 );
203 let actor_count = {
204 if let Ok(t) = worker_node.get_type()
205 && t != WorkerType::ComputeNode
206 {
207 None
208 } else {
209 match worker_actor_count.get(&(worker_node.id as _)) {
210 None => Some(0),
211 Some(c) => Some(*c),
212 }
213 }
214 };
215 try_add_cell(&mut row, actor_count);
216 table.add_row(row);
217 }
218 let _ = writeln!(s, "{table}");
219 }
220
221 fn write_event_logs(&self, s: &mut String) {
222 let event_logs = self
223 .event_log_manager
224 .list_event_logs()
225 .into_iter()
226 .sorted_by(|a, b| {
227 a.timestamp
228 .unwrap_or(0)
229 .cmp(&b.timestamp.unwrap_or(0))
230 .reverse()
231 })
232 .collect_vec();
233
234 let _ = writeln!(s, "latest barrier completions");
235 Self::write_event_logs_impl(
236 s,
237 event_logs.iter(),
238 |e| {
239 let Event::BarrierComplete(info) = e else {
240 return None;
241 };
242 Some(json!(info).to_string())
243 },
244 Some(10),
245 );
246
247 let _ = writeln!(s);
248 let _ = writeln!(s, "latest barrier collection failures");
249 Self::write_event_logs_impl(
250 s,
251 event_logs.iter(),
252 |e| {
253 let Event::CollectBarrierFail(info) = e else {
254 return None;
255 };
256 Some(json!(info).to_string())
257 },
258 Some(3),
259 );
260
261 let _ = writeln!(s);
262 let _ = writeln!(s, "latest barrier injection failures");
263 Self::write_event_logs_impl(
264 s,
265 event_logs.iter(),
266 |e| {
267 let Event::InjectBarrierFail(info) = e else {
268 return None;
269 };
270 Some(json!(info).to_string())
271 },
272 Some(3),
273 );
274
275 let _ = writeln!(s);
276 let _ = writeln!(s, "latest worker node panics");
277 Self::write_event_logs_impl(
278 s,
279 event_logs.iter(),
280 |e| {
281 let Event::WorkerNodePanic(info) = e else {
282 return None;
283 };
284 Some(json!(info).to_string())
285 },
286 Some(10),
287 );
288
289 let _ = writeln!(s);
290 let _ = writeln!(s, "latest create stream job failures");
291 Self::write_event_logs_impl(
292 s,
293 event_logs.iter(),
294 |e| {
295 let Event::CreateStreamJobFail(info) = e else {
296 return None;
297 };
298 Some(json!(info).to_string())
299 },
300 Some(3),
301 );
302
303 let _ = writeln!(s);
304 let _ = writeln!(s, "latest dirty stream job clear-ups");
305 Self::write_event_logs_impl(
306 s,
307 event_logs.iter(),
308 |e| {
309 let Event::DirtyStreamJobClear(info) = e else {
310 return None;
311 };
312 Some(json!(info).to_string())
313 },
314 Some(3),
315 );
316 }
317
318 fn write_event_logs_impl<'a, F>(
319 s: &mut String,
320 event_logs: impl Iterator<Item = &'a EventLog>,
321 get_event_info: F,
322 limit: Option<usize>,
323 ) where
324 F: Fn(&Event) -> Option<String>,
325 {
326 use comfy_table::{Row, Table};
327 let mut table = Table::new();
328 table.set_header({
329 let mut row = Row::new();
330 row.add_cell("created_at".into());
331 row.add_cell("info".into());
332 row
333 });
334 let mut row_count = 0;
335 for event_log in event_logs {
336 let Some(ref inner) = event_log.event else {
337 continue;
338 };
339 if let Some(limit) = limit
340 && row_count >= limit
341 {
342 break;
343 }
344 let mut row = Row::new();
345 let ts = event_log
346 .timestamp
347 .and_then(|ts| Timestamptz::from_millis(ts as _).map(|ts| ts.to_string()));
348 try_add_cell(&mut row, ts);
349 if let Some(info) = get_event_info(inner) {
350 row.add_cell(info.into());
351 row_count += 1;
352 } else {
353 continue;
354 }
355 table.add_row(row);
356 }
357 let _ = writeln!(s, "{table}");
358 }
359
360 async fn write_storage(&self, s: &mut String) {
361 let mut sst_num = 0;
362 let mut sst_total_file_size = 0;
363 let back_pressured_compaction_groups = self
364 .hummock_manger
365 .write_limits()
366 .await
367 .into_iter()
368 .filter_map(|(k, v)| {
369 if v.table_ids.is_empty() {
370 None
371 } else {
372 Some(k)
373 }
374 })
375 .join(",");
376 if !back_pressured_compaction_groups.is_empty() {
377 let _ = writeln!(
378 s,
379 "back pressured compaction groups: {back_pressured_compaction_groups}"
380 );
381 }
382
383 #[derive(PartialEq, Eq)]
384 struct SstableSort {
385 compaction_group_id: u64,
386 sst_id: HummockSstableId,
387 delete_ratio: u64,
388 }
389 impl PartialOrd for SstableSort {
390 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
391 Some(self.cmp(other))
392 }
393 }
394 impl Ord for SstableSort {
395 fn cmp(&self, other: &Self) -> Ordering {
396 self.delete_ratio.cmp(&other.delete_ratio)
397 }
398 }
399 fn top_k_sstables(
400 top_k: usize,
401 heap: &mut BinaryHeap<Reverse<SstableSort>>,
402 e: SstableSort,
403 ) {
404 if heap.len() < top_k {
405 heap.push(Reverse(e));
406 } else if let Some(mut p) = heap.peek_mut()
407 && e.delete_ratio > p.0.delete_ratio
408 {
409 *p = Reverse(e);
410 }
411 }
412
413 let top_k = 10;
414 let mut top_tombstone_delete_sst = BinaryHeap::with_capacity(top_k);
415 let compaction_group_num = self
416 .hummock_manger
417 .on_current_version(|version| {
418 for compaction_group in version.levels.values() {
419 let mut visit_level = |level: &Level| {
420 sst_num += level.table_infos.len();
421 sst_total_file_size +=
422 level.table_infos.iter().map(|t| t.sst_size).sum::<u64>();
423 for sst in &level.table_infos {
424 if sst.total_key_count == 0 {
425 continue;
426 }
427 let tombstone_delete_ratio =
428 sst.stale_key_count * 10000 / sst.total_key_count;
429 let e = SstableSort {
430 compaction_group_id: compaction_group.group_id,
431 sst_id: sst.sst_id,
432 delete_ratio: tombstone_delete_ratio,
433 };
434 top_k_sstables(top_k, &mut top_tombstone_delete_sst, e);
435 }
436 };
437 let l0 = &compaction_group.l0;
438 for level in &l0.sub_levels {
440 visit_level(level);
441 }
442 for level in &compaction_group.levels {
443 visit_level(level);
444 }
445 }
446 version.levels.len()
447 })
448 .await;
449
450 let _ = writeln!(s, "number of SSTables: {sst_num}");
451 let _ = writeln!(s, "total size of SSTables (byte): {sst_total_file_size}");
452 let _ = writeln!(s, "number of compaction groups: {compaction_group_num}");
453 use comfy_table::{Row, Table};
454 fn format_table(heap: BinaryHeap<Reverse<SstableSort>>) -> Table {
455 let mut table = Table::new();
456 table.set_header({
457 let mut row = Row::new();
458 row.add_cell("compaction group id".into());
459 row.add_cell("sstable id".into());
460 row.add_cell("delete ratio".into());
461 row
462 });
463 for sst in &heap.into_sorted_vec() {
464 let mut row = Row::new();
465 row.add_cell(sst.0.compaction_group_id.into());
466 row.add_cell(sst.0.sst_id.into());
467 row.add_cell(format!("{:.2}%", sst.0.delete_ratio as f64 / 100.0).into());
468 table.add_row(row);
469 }
470 table
471 }
472 let _ = writeln!(s);
473 let _ = writeln!(s, "top tombstone delete ratio");
474 let _ = writeln!(s, "{}", format_table(top_tombstone_delete_sst));
475 let _ = writeln!(s);
476
477 let _ = writeln!(s);
478 self.write_storage_prometheus(s).await;
479 }
480
481 async fn write_streaming_prometheus(&self, s: &mut String) {
482 let _ = writeln!(s, "top sources by throughput (rows/s)");
483 let query = format!(
484 "topk(3, sum(rate(stream_source_output_rows_counts{{{}}}[10m]))by (source_name))",
485 self.prometheus_selector
486 );
487 self.write_instant_vector_impl(s, &query, vec!["source_name"])
488 .await;
489
490 let _ = writeln!(s);
491 let _ = writeln!(s, "top materialized views by throughput (rows/s)");
492 let query = format!(
493 "topk(3, sum(rate(stream_mview_input_row_count{{{}}}[10m]))by (table_id))",
494 self.prometheus_selector
495 );
496 self.write_instant_vector_impl(s, &query, vec!["table_id"])
497 .await;
498
499 let _ = writeln!(s);
500 let _ = writeln!(s, "top join executor by matched rows");
501 let query = format!(
502 "topk(3, histogram_quantile(0.9, sum(rate(stream_join_matched_join_keys_bucket{{{}}}[10m])) by (le, fragment_id, table_id)))",
503 self.prometheus_selector
504 );
505 self.write_instant_vector_impl(s, &query, vec!["table_id", "fragment_id"])
506 .await;
507 }
508
509 async fn write_storage_prometheus(&self, s: &mut String) {
510 let _ = writeln!(s, "top Hummock Get by duration (second)");
511 let query = format!(
512 "topk(3, histogram_quantile(0.9, sum(rate(state_store_get_duration_bucket{{{}}}[10m])) by (le, table_id)))",
513 self.prometheus_selector
514 );
515 self.write_instant_vector_impl(s, &query, vec!["table_id"])
516 .await;
517
518 let _ = writeln!(s);
519 let _ = writeln!(s, "top Hummock Iter Init by duration (second)");
520 let query = format!(
521 "topk(3, histogram_quantile(0.9, sum(rate(state_store_iter_init_duration_bucket{{{}}}[10m])) by (le, table_id)))",
522 self.prometheus_selector
523 );
524 self.write_instant_vector_impl(s, &query, vec!["table_id"])
525 .await;
526
527 let _ = writeln!(s);
528 let _ = writeln!(s, "top table commit flush by size (byte)");
529 let query = format!(
530 "topk(3, sum(rate(storage_commit_write_throughput{{{}}}[10m])) by (table_id))",
531 self.prometheus_selector
532 );
533 self.write_instant_vector_impl(s, &query, vec!["table_id"])
534 .await;
535
536 let _ = writeln!(s);
537 let _ = writeln!(s, "object store read throughput (bytes/s)");
538 let query = format!(
539 "sum(rate(object_store_read_bytes{{{}}}[10m])) by (job, instance)",
540 merge_prometheus_selector([&self.prometheus_selector, "job=~\"compute|compactor\""])
541 );
542 self.write_instant_vector_impl(s, &query, vec!["job", "instance"])
543 .await;
544
545 let _ = writeln!(s);
546 let _ = writeln!(s, "object store write throughput (bytes/s)");
547 let query = format!(
548 "sum(rate(object_store_write_bytes{{{}}}[10m])) by (job, instance)",
549 merge_prometheus_selector([&self.prometheus_selector, "job=~\"compute|compactor\""])
550 );
551 self.write_instant_vector_impl(s, &query, vec!["job", "instance"])
552 .await;
553
554 let _ = writeln!(s);
555 let _ = writeln!(s, "object store operation rate");
556 let query = format!(
557 "sum(rate(object_store_operation_latency_count{{{}}}[10m])) by (le, type, job, instance)",
558 merge_prometheus_selector([
559 &self.prometheus_selector,
560 "job=~\"compute|compactor\", type!~\"streaming_upload_write_bytes|streaming_read_read_bytes|streaming_read\""
561 ])
562 );
563 self.write_instant_vector_impl(s, &query, vec!["type", "job", "instance"])
564 .await;
565
566 let _ = writeln!(s);
567 let _ = writeln!(s, "object store operation duration (second)");
568 let query = format!(
569 "histogram_quantile(0.9, sum(rate(object_store_operation_latency_bucket{{{}}}[10m])) by (le, type, job, instance))",
570 merge_prometheus_selector([
571 &self.prometheus_selector,
572 "job=~\"compute|compactor\", type!~\"streaming_upload_write_bytes|streaming_read\""
573 ])
574 );
575 self.write_instant_vector_impl(s, &query, vec!["type", "job", "instance"])
576 .await;
577 }
578
579 async fn write_instant_vector_impl(&self, s: &mut String, query: &str, labels: Vec<&str>) {
580 let Some(ref client) = self.prometheus_client else {
581 return;
582 };
583 if let Ok(Vector(instant_vec)) = client
584 .query(query)
585 .get()
586 .await
587 .map(|result| result.into_inner().0)
588 {
589 for i in instant_vec {
590 let l = labels
591 .iter()
592 .map(|label| {
593 format!(
594 "{}={}",
595 *label,
596 i.metric()
597 .get(*label)
598 .map(|s| s.as_str())
599 .unwrap_or_default()
600 )
601 })
602 .join(",");
603 let _ = writeln!(s, "{}: {:.3}", l, i.sample().value());
604 }
605 }
606 }
607
608 async fn write_await_tree(&self, s: &mut String, actor_traces_format: ActorTracesFormat) {
609 let all = dump_cluster_await_tree(
610 &self.metadata_manager,
611 &self.await_tree_reg,
612 actor_traces_format,
613 )
614 .await;
615
616 if let Ok(all) = all {
617 write!(s, "{}", all.output()).unwrap();
618 } else {
619 tracing::warn!("failed to dump await tree");
620 }
621 }
622
623 async fn write_table_definition(&self, s: &mut String) -> MetaResult<()> {
624 let sources = self
625 .metadata_manager
626 .catalog_controller
627 .list_sources()
628 .await?
629 .into_iter()
630 .map(|s| {
631 (
632 s.id,
633 (s.name, s.schema_id, s.definition, s.created_at_epoch),
634 )
635 })
636 .collect::<BTreeMap<_, _>>();
637 let mut user_tables = BTreeMap::new();
638 let mut mvs = BTreeMap::new();
639 let mut indexes = BTreeMap::new();
640 let mut internal_tables = BTreeMap::new();
641 {
642 let grouped = self
643 .metadata_manager
644 .catalog_controller
645 .list_all_state_tables()
646 .await?
647 .into_iter()
648 .chunk_by(|t| t.table_type());
649 for (table_type, tables) in &grouped {
650 let tables = tables.into_iter().map(|t| {
651 (
652 t.id,
653 (t.name, t.schema_id, t.definition, t.created_at_epoch),
654 )
655 });
656 match table_type {
657 PbTableType::Table => user_tables.extend(tables),
658 PbTableType::MaterializedView => mvs.extend(tables),
659 PbTableType::Index | PbTableType::VectorIndex => indexes.extend(tables),
660 PbTableType::Internal => internal_tables.extend(tables),
661 PbTableType::Unspecified => {
662 tracing::error!("unspecified table type: {:?}", tables.collect_vec());
663 }
664 }
665 }
666 }
667 let sinks = self
668 .metadata_manager
669 .catalog_controller
670 .list_sinks()
671 .await?
672 .into_iter()
673 .map(|s| {
674 (
675 s.id,
676 (s.name, s.schema_id, s.definition, s.created_at_epoch),
677 )
678 })
679 .collect::<BTreeMap<_, _>>();
680 let catalogs = [
681 ("SOURCE", sources),
682 ("TABLE", user_tables),
683 ("MATERIALIZED VIEW", mvs),
684 ("INDEX", indexes),
685 ("SINK", sinks),
686 ("INTERNAL TABLE", internal_tables),
687 ];
688 let mut obj_id_to_name = HashMap::new();
689 for (title, items) in catalogs {
690 use comfy_table::{Row, Table};
691 let mut table = Table::new();
692 table.set_header({
693 let mut row = Row::new();
694 row.add_cell("id".into());
695 row.add_cell("name".into());
696 row.add_cell("schema_id".into());
697 row.add_cell("created_at".into());
698 row.add_cell("definition".into());
699 row
700 });
701 for (id, (name, schema_id, definition, created_at_epoch)) in items {
702 obj_id_to_name.insert(id, name.clone());
703 let mut row = Row::new();
704 let may_redact = redact_sql(&definition, self.redact_sql_option_keywords.clone())
705 .unwrap_or_else(|| "[REDACTED]".into());
706 let created_at = if let Some(created_at_epoch) = created_at_epoch {
707 format!("{}", Epoch::from(created_at_epoch).as_timestamptz())
708 } else {
709 "".into()
710 };
711 row.add_cell(id.into());
712 row.add_cell(name.into());
713 row.add_cell(schema_id.into());
714 row.add_cell(created_at.into());
715 row.add_cell(may_redact.into());
716 table.add_row(row);
717 }
718 let _ = writeln!(s);
719 let _ = writeln!(s, "{title}");
720 let _ = writeln!(s, "{table}");
721 }
722
723 let actors = self
724 .metadata_manager
725 .catalog_controller
726 .list_actor_info()
727 .await?
728 .into_iter()
729 .map(|(actor_id, fragment_id, job_id, schema_id, obj_type)| {
730 (
731 actor_id,
732 (
733 fragment_id,
734 job_id,
735 schema_id,
736 obj_type,
737 obj_id_to_name
738 .get(&(job_id as _))
739 .cloned()
740 .unwrap_or_default(),
741 ),
742 )
743 })
744 .collect::<BTreeMap<_, _>>();
745
746 use comfy_table::{Row, Table};
747 let mut table = Table::new();
748 table.set_header({
749 let mut row = Row::new();
750 row.add_cell("id".into());
751 row.add_cell("fragment_id".into());
752 row.add_cell("job_id".into());
753 row.add_cell("schema_id".into());
754 row.add_cell("type".into());
755 row.add_cell("name".into());
756 row
757 });
758 for (actor_id, (fragment_id, job_id, schema_id, ddl_type, name)) in actors {
759 let mut row = Row::new();
760 row.add_cell(actor_id.into());
761 row.add_cell(fragment_id.into());
762 row.add_cell(job_id.into());
763 row.add_cell(schema_id.into());
764 row.add_cell(ddl_type.as_str().into());
765 row.add_cell(name.into());
766 table.add_row(row);
767 }
768 let _ = writeln!(s);
769 let _ = writeln!(s, "ACTOR");
770 let _ = writeln!(s, "{table}");
771 Ok(())
772 }
773}
774
775fn try_add_cell<T: Into<comfy_table::Cell>>(row: &mut comfy_table::Row, t: Option<T>) {
776 match t {
777 Some(t) => {
778 row.add_cell(t.into());
779 }
780 None => {
781 row.add_cell("".into());
782 }
783 }
784}
785
786fn merge_prometheus_selector<'a>(selectors: impl IntoIterator<Item = &'a str>) -> String {
787 selectors.into_iter().filter(|s| !s.is_empty()).join(",")
788}
789
790fn redact_sql(sql: &str, keywords: RedactSqlOptionKeywordsRef) -> Option<String> {
791 match Parser::parse_sql(sql) {
792 Ok(sqls) => Some(
793 sqls.into_iter()
794 .map(|sql| sql.to_redacted_string(keywords.clone()))
795 .join(";"),
796 ),
797 Err(_) => None,
798 }
799}