risingwave_ctl/cmd_impl/meta/
cluster_info.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet, HashMap};
16
17use comfy_table::{Attribute, Cell, Row, Table};
18use itertools::Itertools;
19use risingwave_common::catalog::FragmentTypeFlag;
20use risingwave_common::util::addr::HostAddr;
21use risingwave_connector::source::{SplitImpl, SplitMetaData};
22use risingwave_pb::meta::GetClusterInfoResponse;
23use risingwave_pb::meta::table_fragments::State;
24use risingwave_pb::source::ConnectorSplits;
25
26use crate::CtlContext;
27
28pub async fn get_cluster_info(context: &CtlContext) -> anyhow::Result<GetClusterInfoResponse> {
29    let meta_client = context.meta_client().await?;
30    let response = meta_client.get_cluster_info().await?;
31    Ok(response)
32}
33
34pub async fn source_split_info(context: &CtlContext, ignore_id: bool) -> anyhow::Result<()> {
35    let GetClusterInfoResponse {
36        worker_nodes: _,
37        source_infos: _,
38        table_fragments,
39        mut actor_splits,
40        revision: _,
41    } = get_cluster_info(context).await?;
42
43    let mut actor_splits_map: BTreeMap<u32, (usize, String)> = BTreeMap::new();
44
45    // build actor_splits_map
46    for table_fragment in &table_fragments {
47        for fragment in table_fragment.fragments.values() {
48            let fragment_type_mask = fragment.fragment_type_mask;
49            if fragment_type_mask & FragmentTypeFlag::Source as u32 == 0
50                && fragment_type_mask & FragmentTypeFlag::SourceScan as u32 == 0
51            {
52                // no source or source backfill
53                continue;
54            }
55            if fragment_type_mask & FragmentTypeFlag::Dml as u32 != 0 {
56                // skip dummy source for dml fragment
57                continue;
58            }
59
60            for actor in &fragment.actors {
61                if let Some(ConnectorSplits { splits }) = actor_splits.remove(&actor.actor_id) {
62                    let num_splits = splits.len();
63                    let splits = splits
64                        .iter()
65                        .map(|split| SplitImpl::try_from(split).unwrap())
66                        .map(|split| split.id())
67                        .collect_vec()
68                        .join(",");
69                    actor_splits_map.insert(actor.actor_id, (num_splits, splits));
70                }
71            }
72        }
73    }
74
75    // print in the second iteration. Otherwise, we don't have upstream splits info
76    for table_fragment in &table_fragments {
77        if ignore_id {
78            println!("Table");
79        } else {
80            println!("Table #{}", table_fragment.table_id);
81        }
82        for fragment in table_fragment
83            .fragments
84            .values()
85            .sorted_by_key(|f| f.fragment_id)
86        {
87            let fragment_type_mask = fragment.fragment_type_mask;
88            if fragment_type_mask & FragmentTypeFlag::Source as u32 == 0
89                && fragment_type_mask & FragmentTypeFlag::SourceScan as u32 == 0
90            {
91                // no source or source backfill
92                continue;
93            }
94            if fragment_type_mask & FragmentTypeFlag::Dml as u32 != 0 {
95                // skip dummy source for dml fragment
96                continue;
97            }
98
99            println!(
100                "\tFragment{} ({})",
101                if ignore_id {
102                    "".to_owned()
103                } else {
104                    format!(" #{}", fragment.fragment_id)
105                },
106                if fragment_type_mask == FragmentTypeFlag::Source as u32 {
107                    "Source"
108                } else {
109                    "SourceScan"
110                }
111            );
112            for actor in fragment.actors.iter().sorted_by_key(|a| a.actor_id) {
113                if let Some((split_count, splits)) = actor_splits_map.get(&actor.actor_id) {
114                    println!(
115                        "\t\tActor{} ({} splits): [{}]",
116                        if ignore_id {
117                            "".to_owned()
118                        } else {
119                            format!(" #{:<3}", actor.actor_id,)
120                        },
121                        split_count,
122                        splits,
123                    );
124                } else {
125                    println!(
126                        "\t\tError: Actor #{:<3} (not found in actor_splits)",
127                        actor.actor_id,
128                    )
129                }
130            }
131        }
132    }
133
134    Ok(())
135}
136
137pub async fn cluster_info(context: &CtlContext) -> anyhow::Result<()> {
138    let GetClusterInfoResponse {
139        worker_nodes,
140        table_fragments,
141        actor_splits: _,
142        source_infos: _,
143        revision,
144    } = get_cluster_info(context).await?;
145
146    // Fragment ID -> [Worker ID -> [Actor ID]]
147    let mut fragments = BTreeMap::new();
148    // Fragment ID -> Table Fragments' State
149    let mut fragment_states = HashMap::new();
150
151    for table_fragment in &table_fragments {
152        for (&id, fragment) in &table_fragment.fragments {
153            for actor in &fragment.actors {
154                let worker_id = table_fragment
155                    .actor_status
156                    .get(&actor.actor_id)
157                    .unwrap()
158                    .worker_id();
159
160                fragments
161                    .entry(id)
162                    .or_insert_with(BTreeMap::new)
163                    .entry(worker_id)
164                    .or_insert(BTreeSet::new())
165                    .insert(actor.actor_id);
166            }
167            fragment_states.insert(id, table_fragment.state());
168        }
169    }
170
171    let mut table = Table::new();
172
173    let cross_out_if_creating = |cell: Cell, fid: u32| -> Cell {
174        match fragment_states[&fid] {
175            State::Unspecified => unreachable!(),
176            State::Creating => cell.add_attribute(Attribute::CrossedOut),
177            State::Created | State::Initial => cell,
178        }
179    };
180
181    // Compute Node, Frag 1, Frag 2, ..., Frag N
182    table.set_header({
183        let mut row = Row::new();
184        row.add_cell("Compute Node".into());
185        for &fid in fragments.keys() {
186            let cell = Cell::new(format!("Frag {fid}"));
187            let cell = cross_out_if_creating(cell, fid);
188            row.add_cell(cell);
189        }
190        row
191    });
192
193    let mut last_worker_id = None;
194    for worker in worker_nodes {
195        // Compute Node,  Actor 1, Actor 11, -, ..., Actor N
196        let mut row = Row::new();
197        row.add_cell(if last_worker_id == Some(worker.id) {
198            "".into()
199        } else {
200            last_worker_id = Some(worker.id);
201            let cordoned = if worker.get_property().map_or(true, |p| p.is_unschedulable) {
202                " (cordoned)"
203            } else {
204                ""
205            };
206            Cell::new(format!(
207                "{}@{}{}",
208                worker.id,
209                HostAddr::from(worker.get_host().unwrap()),
210                cordoned,
211            ))
212            .add_attribute(Attribute::Bold)
213        });
214        for (&fragment_id, worker_actors) in &fragments {
215            let cell = if let Some(actors) = worker_actors.get(&worker.id) {
216                actors
217                    .iter()
218                    .map(|actor| format!("{}", actor))
219                    .join(",")
220                    .into()
221            } else {
222                "-".into()
223            };
224            let cell = cross_out_if_creating(cell, fragment_id);
225            row.add_cell(cell);
226        }
227        table.add_row(row);
228    }
229
230    println!("{table}");
231    println!("Revision: {}", revision);
232
233    Ok(())
234}