risingwave_ctl/cmd_impl/meta/
serving.rs1use std::collections::HashMap;
16
17use comfy_table::{Row, Table};
18use itertools::Itertools;
19use risingwave_common::hash::VirtualNode;
20use risingwave_pb::common::WorkerType;
21
22use crate::CtlContext;
23
24pub async fn list_serving_fragment_mappings(context: &CtlContext) -> anyhow::Result<()> {
25 let meta_client = context.meta_client().await?;
26 let mappings = meta_client.list_serving_vnode_mappings().await?;
27 let workers: HashMap<_, _> = meta_client
28 .list_worker_nodes(Some(WorkerType::ComputeNode))
29 .await?
30 .into_iter()
31 .map(|worker| (worker.id, worker))
32 .collect();
33
34 let mut table = Table::new();
35 table.set_header({
36 let mut row = Row::new();
37 row.add_cell("Table Id".into());
38 row.add_cell("Fragment Id".into());
39 row.add_cell("Virtual Node".into());
40 row.add_cell("Worker".into());
41 row
42 });
43
44 let rows = mappings
45 .iter()
46 .flat_map(|(fragment_id, (table_id, mapping))| {
47 let mut worker_nodes: HashMap<u32, Vec<VirtualNode>> = HashMap::new();
48 for (vnode, worker_slot_id) in mapping.iter_with_vnode() {
49 worker_nodes
50 .entry(worker_slot_id.worker_id())
51 .or_default()
52 .push(vnode);
53 }
54 worker_nodes.into_iter().map(|(worker_id, vnodes)| {
55 (*table_id, *fragment_id, vnodes, workers.get(&worker_id))
56 })
57 })
58 .collect_vec();
59
60 for (table_id, fragment_id, vnodes, worker) in
61 rows.into_iter().sorted_by_key(|(t, f, ..)| (*t, *f))
62 {
63 let mut row = Row::new();
64 row.add_cell(table_id.into());
65 row.add_cell(fragment_id.into());
66 row.add_cell(
67 format!(
68 "{} in total: {}",
69 vnodes.len(),
70 vnodes
71 .into_iter()
72 .sorted()
73 .map(|v| v.to_index().to_string())
74 .join(",")
75 )
76 .into(),
77 );
78 if let Some(w) = worker
79 && let Some(addr) = w.host.as_ref()
80 {
81 row.add_cell(format!("id: {}; {}:{}", w.id, addr.host, addr.port).into());
82 } else {
83 row.add_cell("".into());
84 }
85 table.add_row(row);
86 }
87 println!("{table}");
88 Ok(())
89}