risingwave_ctl/cmd_impl/meta/
serving.rs1use std::collections::HashMap;
16
17use comfy_table::{Row, Table};
18use itertools::Itertools;
19use risingwave_common::hash::VirtualNode;
20use risingwave_common::id::WorkerId;
21use risingwave_pb::common::WorkerType;
22
23use crate::CtlContext;
24
25pub async fn list_serving_fragment_mappings(context: &CtlContext) -> anyhow::Result<()> {
26 let meta_client = context.meta_client().await?;
27 let mappings = meta_client.list_serving_vnode_mappings().await?;
28 let workers: HashMap<WorkerId, _> = meta_client
29 .list_worker_nodes(Some(WorkerType::ComputeNode))
30 .await?
31 .into_iter()
32 .map(|worker| (worker.id, worker))
33 .collect();
34
35 let mut table = Table::new();
36 table.set_header({
37 let mut row = Row::new();
38 row.add_cell("Table Id".into());
39 row.add_cell("Fragment Id".into());
40 row.add_cell("Virtual Node".into());
41 row.add_cell("Worker".into());
42 row
43 });
44
45 let rows = mappings
46 .iter()
47 .flat_map(|(fragment_id, (table_id, mapping))| {
48 let mut worker_nodes: HashMap<WorkerId, Vec<VirtualNode>> = HashMap::new();
49 for (vnode, worker_slot_id) in mapping.iter_with_vnode() {
50 worker_nodes
51 .entry(worker_slot_id.worker_id())
52 .or_default()
53 .push(vnode);
54 }
55 worker_nodes.into_iter().map(|(worker_id, vnodes)| {
56 (*table_id, *fragment_id, vnodes, workers.get(&worker_id))
57 })
58 })
59 .collect_vec();
60
61 for (table_id, fragment_id, vnodes, worker) in
62 rows.into_iter().sorted_by_key(|(t, f, ..)| (*t, *f))
63 {
64 let mut row = Row::new();
65 row.add_cell(table_id.into());
66 row.add_cell(fragment_id.into());
67 row.add_cell(
68 format!(
69 "{} in total: {}",
70 vnodes.len(),
71 vnodes
72 .into_iter()
73 .sorted()
74 .map(|v| v.to_index().to_string())
75 .join(",")
76 )
77 .into(),
78 );
79 if let Some(w) = worker
80 && let Some(addr) = w.host.as_ref()
81 {
82 row.add_cell(format!("id: {}; {}:{}", w.id, addr.host, addr.port).into());
83 } else {
84 row.add_cell("".into());
85 }
86 table.add_row(row);
87 }
88 println!("{table}");
89 Ok(())
90}