risingwave_ctl/cmd_impl/meta/
cluster_info.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet, HashMap};
16
17use comfy_table::{Attribute, Cell, Row, Table};
18use itertools::Itertools;
19use risingwave_common::catalog::FragmentTypeFlag;
20use risingwave_common::util::addr::HostAddr;
21use risingwave_connector::source::{SplitImpl, SplitMetaData};
22use risingwave_pb::id::{ActorId, FragmentId};
23use risingwave_pb::meta::GetClusterInfoResponse;
24use risingwave_pb::meta::table_fragments::State;
25use risingwave_pb::source::ConnectorSplits;
26
27use crate::CtlContext;
28
29pub async fn get_cluster_info(context: &CtlContext) -> anyhow::Result<GetClusterInfoResponse> {
30    let meta_client = context.meta_client().await?;
31    let response = meta_client.get_cluster_info().await?;
32    Ok(response)
33}
34
35pub async fn source_split_info(context: &CtlContext, ignore_id: bool) -> anyhow::Result<()> {
36    let GetClusterInfoResponse {
37        worker_nodes: _,
38        source_infos: _,
39        table_fragments,
40        mut actor_splits,
41        revision: _,
42    } = get_cluster_info(context).await?;
43
44    let mut actor_splits_map: BTreeMap<ActorId, (usize, String)> = BTreeMap::new();
45
46    // build actor_splits_map
47    for table_fragment in &table_fragments {
48        for fragment in table_fragment.fragments.values() {
49            let fragment_type_mask = fragment.fragment_type_mask;
50            if fragment_type_mask & FragmentTypeFlag::Source as u32 == 0
51                && fragment_type_mask & FragmentTypeFlag::SourceScan as u32 == 0
52            {
53                // no source or source backfill
54                continue;
55            }
56            if fragment_type_mask & FragmentTypeFlag::Dml as u32 != 0 {
57                // skip dummy source for dml fragment
58                continue;
59            }
60
61            for actor in &fragment.actors {
62                if let Some(ConnectorSplits { splits }) = actor_splits.remove(&actor.actor_id) {
63                    let num_splits = splits.len();
64                    let splits = splits
65                        .iter()
66                        .map(|split| SplitImpl::try_from(split).unwrap())
67                        .map(|split| split.id())
68                        .collect_vec()
69                        .join(",");
70                    actor_splits_map.insert(actor.actor_id, (num_splits, splits));
71                }
72            }
73        }
74    }
75
76    // print in the second iteration. Otherwise, we don't have upstream splits info
77    for table_fragment in &table_fragments {
78        if ignore_id {
79            println!("Table");
80        } else {
81            println!("Table #{}", table_fragment.table_id);
82        }
83        for fragment in table_fragment
84            .fragments
85            .values()
86            .sorted_by_key(|f| f.fragment_id)
87        {
88            let fragment_type_mask = fragment.fragment_type_mask;
89            if fragment_type_mask & FragmentTypeFlag::Source as u32 == 0
90                && fragment_type_mask & FragmentTypeFlag::SourceScan as u32 == 0
91            {
92                // no source or source backfill
93                continue;
94            }
95            if fragment_type_mask & FragmentTypeFlag::Dml as u32 != 0 {
96                // skip dummy source for dml fragment
97                continue;
98            }
99
100            println!(
101                "\tFragment{} ({})",
102                if ignore_id {
103                    "".to_owned()
104                } else {
105                    format!(" #{}", fragment.fragment_id)
106                },
107                if fragment_type_mask == FragmentTypeFlag::Source as u32 {
108                    "Source"
109                } else {
110                    "SourceScan"
111                }
112            );
113            for actor in fragment.actors.iter().sorted_by_key(|a| a.actor_id) {
114                if let Some((split_count, splits)) = actor_splits_map.get(&actor.actor_id) {
115                    println!(
116                        "\t\tActor{} ({} splits): [{}]",
117                        if ignore_id {
118                            "".to_owned()
119                        } else {
120                            format!(" #{:<3}", actor.actor_id,)
121                        },
122                        split_count,
123                        splits,
124                    );
125                } else {
126                    println!(
127                        "\t\tError: Actor #{:<3} (not found in actor_splits)",
128                        actor.actor_id,
129                    )
130                }
131            }
132        }
133    }
134
135    Ok(())
136}
137
138pub async fn cluster_info(context: &CtlContext) -> anyhow::Result<()> {
139    let GetClusterInfoResponse {
140        worker_nodes,
141        table_fragments,
142        actor_splits: _,
143        source_infos: _,
144        revision,
145    } = get_cluster_info(context).await?;
146
147    // Fragment ID -> [Worker ID -> [Actor ID]]
148    let mut fragments = BTreeMap::new();
149    // Fragment ID -> Table Fragments' State
150    let mut fragment_states = HashMap::new();
151
152    for table_fragment in &table_fragments {
153        for (&id, fragment) in &table_fragment.fragments {
154            for actor in &fragment.actors {
155                let worker_id = table_fragment
156                    .actor_status
157                    .get(&actor.actor_id)
158                    .unwrap()
159                    .worker_id();
160
161                fragments
162                    .entry(id)
163                    .or_insert_with(BTreeMap::new)
164                    .entry(worker_id)
165                    .or_insert(BTreeSet::new())
166                    .insert(actor.actor_id);
167            }
168            fragment_states.insert(id, table_fragment.state());
169        }
170    }
171
172    let mut table = Table::new();
173
174    let cross_out_if_creating = |cell: Cell, fid: FragmentId| -> Cell {
175        match fragment_states[&fid] {
176            State::Unspecified => unreachable!(),
177            State::Creating => cell.add_attribute(Attribute::CrossedOut),
178            State::Created | State::Initial => cell,
179        }
180    };
181
182    // Compute Node, Frag 1, Frag 2, ..., Frag N
183    table.set_header({
184        let mut row = Row::new();
185        row.add_cell("Compute Node".into());
186        for &fid in fragments.keys() {
187            let cell = Cell::new(format!("Frag {fid}"));
188            let cell = cross_out_if_creating(cell, fid);
189            row.add_cell(cell);
190        }
191        row
192    });
193
194    let mut last_worker_id = None;
195    for worker in worker_nodes {
196        // Compute Node,  Actor 1, Actor 11, -, ..., Actor N
197        let mut row = Row::new();
198        row.add_cell(if last_worker_id == Some(worker.id) {
199            "".into()
200        } else {
201            last_worker_id = Some(worker.id);
202            let cordoned = if worker.get_property().map_or(true, |p| p.is_unschedulable) {
203                " (cordoned)"
204            } else {
205                ""
206            };
207            Cell::new(format!(
208                "{}@{}{}",
209                worker.id,
210                HostAddr::from(worker.get_host().unwrap()),
211                cordoned,
212            ))
213            .add_attribute(Attribute::Bold)
214        });
215        for (&fragment_id, worker_actors) in &fragments {
216            let cell = if let Some(actors) = worker_actors.get(&worker.id) {
217                actors
218                    .iter()
219                    .map(|actor| format!("{}", actor))
220                    .join(",")
221                    .into()
222            } else {
223                "-".into()
224            };
225            let cell = cross_out_if_creating(cell, fragment_id);
226            row.add_cell(cell);
227        }
228        table.add_row(row);
229    }
230
231    println!("{table}");
232    println!("Revision: {}", revision);
233
234    Ok(())
235}