risingwave_ctl/cmd_impl/meta/
serving.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use comfy_table::{Row, Table};
18use itertools::Itertools;
19use risingwave_common::hash::VirtualNode;
20use risingwave_common::id::WorkerId;
21use risingwave_pb::common::WorkerType;
22
23use crate::CtlContext;
24
25pub async fn list_serving_fragment_mappings(context: &CtlContext) -> anyhow::Result<()> {
26    let meta_client = context.meta_client().await?;
27    let mappings = meta_client.list_serving_vnode_mappings().await?;
28    let workers: HashMap<WorkerId, _> = meta_client
29        .list_worker_nodes(Some(WorkerType::ComputeNode))
30        .await?
31        .into_iter()
32        .map(|worker| (worker.id, worker))
33        .collect();
34
35    let mut table = Table::new();
36    table.set_header({
37        let mut row = Row::new();
38        row.add_cell("Table Id".into());
39        row.add_cell("Fragment Id".into());
40        row.add_cell("Virtual Node".into());
41        row.add_cell("Worker".into());
42        row
43    });
44
45    let rows = mappings
46        .iter()
47        .flat_map(|(fragment_id, (table_id, mapping))| {
48            let mut worker_nodes: HashMap<WorkerId, Vec<VirtualNode>> = HashMap::new();
49            for (vnode, worker_slot_id) in mapping.iter_with_vnode() {
50                worker_nodes
51                    .entry(worker_slot_id.worker_id())
52                    .or_default()
53                    .push(vnode);
54            }
55            worker_nodes.into_iter().map(|(worker_id, vnodes)| {
56                (*table_id, *fragment_id, vnodes, workers.get(&worker_id))
57            })
58        })
59        .collect_vec();
60
61    for (table_id, fragment_id, vnodes, worker) in
62        rows.into_iter().sorted_by_key(|(t, f, ..)| (*t, *f))
63    {
64        let mut row = Row::new();
65        row.add_cell(table_id.into());
66        row.add_cell(fragment_id.into());
67        row.add_cell(
68            format!(
69                "{} in total: {}",
70                vnodes.len(),
71                vnodes
72                    .into_iter()
73                    .sorted()
74                    .map(|v| v.to_index().to_string())
75                    .join(",")
76            )
77            .into(),
78        );
79        if let Some(w) = worker
80            && let Some(addr) = w.host.as_ref()
81        {
82            row.add_cell(format!("id: {}; {}:{}", w.id, addr.host, addr.port).into());
83        } else {
84            row.add_cell("".into());
85        }
86        table.add_row(row);
87    }
88    println!("{table}");
89    Ok(())
90}