risingwave_ctl/cmd_impl/meta/
serving.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use comfy_table::{Row, Table};
18use itertools::Itertools;
19use risingwave_common::hash::VirtualNode;
20use risingwave_pb::common::WorkerType;
21
22use crate::CtlContext;
23
24pub async fn list_serving_fragment_mappings(context: &CtlContext) -> anyhow::Result<()> {
25    let meta_client = context.meta_client().await?;
26    let mappings = meta_client.list_serving_vnode_mappings().await?;
27    let workers: HashMap<_, _> = meta_client
28        .list_worker_nodes(Some(WorkerType::ComputeNode))
29        .await?
30        .into_iter()
31        .map(|worker| (worker.id, worker))
32        .collect();
33
34    let mut table = Table::new();
35    table.set_header({
36        let mut row = Row::new();
37        row.add_cell("Table Id".into());
38        row.add_cell("Fragment Id".into());
39        row.add_cell("Virtual Node".into());
40        row.add_cell("Worker".into());
41        row
42    });
43
44    let rows = mappings
45        .iter()
46        .flat_map(|(fragment_id, (table_id, mapping))| {
47            let mut worker_nodes: HashMap<u32, Vec<VirtualNode>> = HashMap::new();
48            for (vnode, worker_slot_id) in mapping.iter_with_vnode() {
49                worker_nodes
50                    .entry(worker_slot_id.worker_id())
51                    .or_default()
52                    .push(vnode);
53            }
54            worker_nodes.into_iter().map(|(worker_id, vnodes)| {
55                (*table_id, *fragment_id, vnodes, workers.get(&worker_id))
56            })
57        })
58        .collect_vec();
59
60    for (table_id, fragment_id, vnodes, worker) in
61        rows.into_iter().sorted_by_key(|(t, f, ..)| (*t, *f))
62    {
63        let mut row = Row::new();
64        row.add_cell(table_id.into());
65        row.add_cell(fragment_id.into());
66        row.add_cell(
67            format!(
68                "{} in total: {}",
69                vnodes.len(),
70                vnodes
71                    .into_iter()
72                    .sorted()
73                    .map(|v| v.to_index().to_string())
74                    .join(",")
75            )
76            .into(),
77        );
78        if let Some(w) = worker
79            && let Some(addr) = w.host.as_ref()
80        {
81            row.add_cell(format!("id: {}; {}:{}", w.id, addr.host, addr.port).into());
82        } else {
83            row.add_cell("".into());
84        }
85        table.add_row(row);
86    }
87    println!("{table}");
88    Ok(())
89}