risingwave_ctl/cmd_impl/meta/
cluster_info.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet, HashMap};
16
17use comfy_table::{Attribute, Cell, Row, Table};
18use itertools::Itertools;
19use risingwave_common::util::addr::HostAddr;
20use risingwave_connector::source::{SplitImpl, SplitMetaData};
21use risingwave_pb::meta::GetClusterInfoResponse;
22use risingwave_pb::meta::table_fragments::State;
23use risingwave_pb::source::ConnectorSplits;
24use risingwave_pb::stream_plan::FragmentTypeFlag;
25
26use crate::CtlContext;
27
28pub async fn get_cluster_info(context: &CtlContext) -> anyhow::Result<GetClusterInfoResponse> {
29    let meta_client = context.meta_client().await?;
30    let response = meta_client.get_cluster_info().await?;
31    Ok(response)
32}
33
34pub async fn source_split_info(context: &CtlContext, ignore_id: bool) -> anyhow::Result<()> {
35    let GetClusterInfoResponse {
36        worker_nodes: _,
37        source_infos: _,
38        table_fragments,
39        mut actor_splits,
40        revision: _,
41    } = get_cluster_info(context).await?;
42
43    let mut actor_splits_map: BTreeMap<u32, (usize, String)> = BTreeMap::new();
44
45    // build actor_splits_map
46    for table_fragment in &table_fragments {
47        if table_fragment.actor_splits.is_empty() {
48            continue;
49        }
50
51        for fragment in table_fragment.fragments.values() {
52            let fragment_type_mask = fragment.fragment_type_mask;
53            if fragment_type_mask & FragmentTypeFlag::Source as u32 == 0
54                && fragment_type_mask & FragmentTypeFlag::SourceScan as u32 == 0
55            {
56                // no source or source backfill
57                continue;
58            }
59            if fragment_type_mask & FragmentTypeFlag::Dml as u32 != 0 {
60                // skip dummy source for dml fragment
61                continue;
62            }
63
64            for actor in &fragment.actors {
65                if let Some(ConnectorSplits { splits }) = actor_splits.remove(&actor.actor_id) {
66                    let num_splits = splits.len();
67                    let splits = splits
68                        .iter()
69                        .map(|split| SplitImpl::try_from(split).unwrap())
70                        .map(|split| split.id())
71                        .collect_vec()
72                        .join(",");
73                    actor_splits_map.insert(actor.actor_id, (num_splits, splits));
74                }
75            }
76        }
77    }
78
79    // print in the second iteration. Otherwise we don't have upstream splits info
80    for table_fragment in &table_fragments {
81        if table_fragment.actor_splits.is_empty() {
82            continue;
83        }
84        if ignore_id {
85            println!("Table");
86        } else {
87            println!("Table #{}", table_fragment.table_id);
88        }
89        for fragment in table_fragment
90            .fragments
91            .values()
92            .sorted_by_key(|f| f.fragment_id)
93        {
94            let fragment_type_mask = fragment.fragment_type_mask;
95            if fragment_type_mask & FragmentTypeFlag::Source as u32 == 0
96                && fragment_type_mask & FragmentTypeFlag::SourceScan as u32 == 0
97            {
98                // no source or source backfill
99                continue;
100            }
101            if fragment_type_mask & FragmentTypeFlag::Dml as u32 != 0 {
102                // skip dummy source for dml fragment
103                continue;
104            }
105
106            println!(
107                "\tFragment{} ({})",
108                if ignore_id {
109                    "".to_owned()
110                } else {
111                    format!(" #{}", fragment.fragment_id)
112                },
113                if fragment_type_mask == FragmentTypeFlag::Source as u32 {
114                    "Source"
115                } else {
116                    "SourceScan"
117                }
118            );
119            for actor in fragment.actors.iter().sorted_by_key(|a| a.actor_id) {
120                if let Some((split_count, splits)) = actor_splits_map.get(&actor.actor_id) {
121                    println!(
122                        "\t\tActor{} ({} splits): [{}]",
123                        if ignore_id {
124                            "".to_owned()
125                        } else {
126                            format!(" #{:<3}", actor.actor_id,)
127                        },
128                        split_count,
129                        splits,
130                    );
131                } else {
132                    println!(
133                        "\t\tError: Actor #{:<3} (not found in actor_splits)",
134                        actor.actor_id,
135                    )
136                }
137            }
138        }
139    }
140
141    Ok(())
142}
143
144pub async fn cluster_info(context: &CtlContext) -> anyhow::Result<()> {
145    let GetClusterInfoResponse {
146        worker_nodes,
147        table_fragments,
148        actor_splits: _,
149        source_infos: _,
150        revision,
151    } = get_cluster_info(context).await?;
152
153    // Fragment ID -> [Worker ID -> [Actor ID]]
154    let mut fragments = BTreeMap::new();
155    // Fragment ID -> Table Fragments' State
156    let mut fragment_states = HashMap::new();
157
158    for table_fragment in &table_fragments {
159        for (&id, fragment) in &table_fragment.fragments {
160            for actor in &fragment.actors {
161                let worker_id = table_fragment
162                    .actor_status
163                    .get(&actor.actor_id)
164                    .unwrap()
165                    .worker_id();
166
167                fragments
168                    .entry(id)
169                    .or_insert_with(BTreeMap::new)
170                    .entry(worker_id)
171                    .or_insert(BTreeSet::new())
172                    .insert(actor.actor_id);
173            }
174            fragment_states.insert(id, table_fragment.state());
175        }
176    }
177
178    let mut table = Table::new();
179
180    let cross_out_if_creating = |cell: Cell, fid: u32| -> Cell {
181        match fragment_states[&fid] {
182            State::Unspecified => unreachable!(),
183            State::Creating => cell.add_attribute(Attribute::CrossedOut),
184            State::Created | State::Initial => cell,
185        }
186    };
187
188    // Compute Node, Frag 1, Frag 2, ..., Frag N
189    table.set_header({
190        let mut row = Row::new();
191        row.add_cell("Compute Node".into());
192        for &fid in fragments.keys() {
193            let cell = Cell::new(format!("Frag {fid}"));
194            let cell = cross_out_if_creating(cell, fid);
195            row.add_cell(cell);
196        }
197        row
198    });
199
200    let mut last_worker_id = None;
201    for worker in worker_nodes {
202        // Compute Node,  Actor 1, Actor 11, -, ..., Actor N
203        let mut row = Row::new();
204        row.add_cell(if last_worker_id == Some(worker.id) {
205            "".into()
206        } else {
207            last_worker_id = Some(worker.id);
208            let cordoned = if worker.get_property().map_or(true, |p| p.is_unschedulable) {
209                " (cordoned)"
210            } else {
211                ""
212            };
213            Cell::new(format!(
214                "{}@{}{}",
215                worker.id,
216                HostAddr::from(worker.get_host().unwrap()),
217                cordoned,
218            ))
219            .add_attribute(Attribute::Bold)
220        });
221        for (&fragment_id, worker_actors) in &fragments {
222            let cell = if let Some(actors) = worker_actors.get(&worker.id) {
223                actors
224                    .iter()
225                    .map(|actor| format!("{}", actor))
226                    .join(",")
227                    .into()
228            } else {
229                "-".into()
230            };
231            let cell = cross_out_if_creating(cell, fragment_id);
232            row.add_cell(cell);
233        }
234        table.add_row(row);
235    }
236
237    println!("{table}");
238    println!("Revision: {}", revision);
239
240    Ok(())
241}