risingwave_ctl/cmd_impl/meta/
cluster_info.rs1use std::collections::{BTreeMap, BTreeSet, HashMap};
16
17use comfy_table::{Attribute, Cell, Row, Table};
18use itertools::Itertools;
19use risingwave_common::catalog::FragmentTypeFlag;
20use risingwave_common::util::addr::HostAddr;
21use risingwave_connector::source::{SplitImpl, SplitMetaData};
22use risingwave_pb::meta::GetClusterInfoResponse;
23use risingwave_pb::meta::table_fragments::State;
24use risingwave_pb::source::ConnectorSplits;
25
26use crate::CtlContext;
27
28pub async fn get_cluster_info(context: &CtlContext) -> anyhow::Result<GetClusterInfoResponse> {
29 let meta_client = context.meta_client().await?;
30 let response = meta_client.get_cluster_info().await?;
31 Ok(response)
32}
33
34pub async fn source_split_info(context: &CtlContext, ignore_id: bool) -> anyhow::Result<()> {
35 let GetClusterInfoResponse {
36 worker_nodes: _,
37 source_infos: _,
38 table_fragments,
39 mut actor_splits,
40 revision: _,
41 } = get_cluster_info(context).await?;
42
43 let mut actor_splits_map: BTreeMap<u32, (usize, String)> = BTreeMap::new();
44
45 for table_fragment in &table_fragments {
47 for fragment in table_fragment.fragments.values() {
48 let fragment_type_mask = fragment.fragment_type_mask;
49 if fragment_type_mask & FragmentTypeFlag::Source as u32 == 0
50 && fragment_type_mask & FragmentTypeFlag::SourceScan as u32 == 0
51 {
52 continue;
54 }
55 if fragment_type_mask & FragmentTypeFlag::Dml as u32 != 0 {
56 continue;
58 }
59
60 for actor in &fragment.actors {
61 if let Some(ConnectorSplits { splits }) = actor_splits.remove(&actor.actor_id) {
62 let num_splits = splits.len();
63 let splits = splits
64 .iter()
65 .map(|split| SplitImpl::try_from(split).unwrap())
66 .map(|split| split.id())
67 .collect_vec()
68 .join(",");
69 actor_splits_map.insert(actor.actor_id, (num_splits, splits));
70 }
71 }
72 }
73 }
74
75 for table_fragment in &table_fragments {
77 if ignore_id {
78 println!("Table");
79 } else {
80 println!("Table #{}", table_fragment.table_id);
81 }
82 for fragment in table_fragment
83 .fragments
84 .values()
85 .sorted_by_key(|f| f.fragment_id)
86 {
87 let fragment_type_mask = fragment.fragment_type_mask;
88 if fragment_type_mask & FragmentTypeFlag::Source as u32 == 0
89 && fragment_type_mask & FragmentTypeFlag::SourceScan as u32 == 0
90 {
91 continue;
93 }
94 if fragment_type_mask & FragmentTypeFlag::Dml as u32 != 0 {
95 continue;
97 }
98
99 println!(
100 "\tFragment{} ({})",
101 if ignore_id {
102 "".to_owned()
103 } else {
104 format!(" #{}", fragment.fragment_id)
105 },
106 if fragment_type_mask == FragmentTypeFlag::Source as u32 {
107 "Source"
108 } else {
109 "SourceScan"
110 }
111 );
112 for actor in fragment.actors.iter().sorted_by_key(|a| a.actor_id) {
113 if let Some((split_count, splits)) = actor_splits_map.get(&actor.actor_id) {
114 println!(
115 "\t\tActor{} ({} splits): [{}]",
116 if ignore_id {
117 "".to_owned()
118 } else {
119 format!(" #{:<3}", actor.actor_id,)
120 },
121 split_count,
122 splits,
123 );
124 } else {
125 println!(
126 "\t\tError: Actor #{:<3} (not found in actor_splits)",
127 actor.actor_id,
128 )
129 }
130 }
131 }
132 }
133
134 Ok(())
135}
136
137pub async fn cluster_info(context: &CtlContext) -> anyhow::Result<()> {
138 let GetClusterInfoResponse {
139 worker_nodes,
140 table_fragments,
141 actor_splits: _,
142 source_infos: _,
143 revision,
144 } = get_cluster_info(context).await?;
145
146 let mut fragments = BTreeMap::new();
148 let mut fragment_states = HashMap::new();
150
151 for table_fragment in &table_fragments {
152 for (&id, fragment) in &table_fragment.fragments {
153 for actor in &fragment.actors {
154 let worker_id = table_fragment
155 .actor_status
156 .get(&actor.actor_id)
157 .unwrap()
158 .worker_id();
159
160 fragments
161 .entry(id)
162 .or_insert_with(BTreeMap::new)
163 .entry(worker_id)
164 .or_insert(BTreeSet::new())
165 .insert(actor.actor_id);
166 }
167 fragment_states.insert(id, table_fragment.state());
168 }
169 }
170
171 let mut table = Table::new();
172
173 let cross_out_if_creating = |cell: Cell, fid: u32| -> Cell {
174 match fragment_states[&fid] {
175 State::Unspecified => unreachable!(),
176 State::Creating => cell.add_attribute(Attribute::CrossedOut),
177 State::Created | State::Initial => cell,
178 }
179 };
180
181 table.set_header({
183 let mut row = Row::new();
184 row.add_cell("Compute Node".into());
185 for &fid in fragments.keys() {
186 let cell = Cell::new(format!("Frag {fid}"));
187 let cell = cross_out_if_creating(cell, fid);
188 row.add_cell(cell);
189 }
190 row
191 });
192
193 let mut last_worker_id = None;
194 for worker in worker_nodes {
195 let mut row = Row::new();
197 row.add_cell(if last_worker_id == Some(worker.id) {
198 "".into()
199 } else {
200 last_worker_id = Some(worker.id);
201 let cordoned = if worker.get_property().map_or(true, |p| p.is_unschedulable) {
202 " (cordoned)"
203 } else {
204 ""
205 };
206 Cell::new(format!(
207 "{}@{}{}",
208 worker.id,
209 HostAddr::from(worker.get_host().unwrap()),
210 cordoned,
211 ))
212 .add_attribute(Attribute::Bold)
213 });
214 for (&fragment_id, worker_actors) in &fragments {
215 let cell = if let Some(actors) = worker_actors.get(&worker.id) {
216 actors
217 .iter()
218 .map(|actor| format!("{}", actor))
219 .join(",")
220 .into()
221 } else {
222 "-".into()
223 };
224 let cell = cross_out_if_creating(cell, fragment_id);
225 row.add_cell(cell);
226 }
227 table.add_row(row);
228 }
229
230 println!("{table}");
231 println!("Revision: {}", revision);
232
233 Ok(())
234}