risingwave_ctl/cmd_impl/meta/
cluster_info.rs1use std::collections::{BTreeMap, BTreeSet, HashMap};
16
17use comfy_table::{Attribute, Cell, Row, Table};
18use itertools::Itertools;
19use risingwave_common::util::addr::HostAddr;
20use risingwave_connector::source::{SplitImpl, SplitMetaData};
21use risingwave_pb::meta::GetClusterInfoResponse;
22use risingwave_pb::meta::table_fragments::State;
23use risingwave_pb::source::ConnectorSplits;
24use risingwave_pb::stream_plan::FragmentTypeFlag;
25
26use crate::CtlContext;
27
28pub async fn get_cluster_info(context: &CtlContext) -> anyhow::Result<GetClusterInfoResponse> {
29 let meta_client = context.meta_client().await?;
30 let response = meta_client.get_cluster_info().await?;
31 Ok(response)
32}
33
34pub async fn source_split_info(context: &CtlContext, ignore_id: bool) -> anyhow::Result<()> {
35 let GetClusterInfoResponse {
36 worker_nodes: _,
37 source_infos: _,
38 table_fragments,
39 mut actor_splits,
40 revision: _,
41 } = get_cluster_info(context).await?;
42
43 let mut actor_splits_map: BTreeMap<u32, (usize, String)> = BTreeMap::new();
44
45 for table_fragment in &table_fragments {
47 if table_fragment.actor_splits.is_empty() {
48 continue;
49 }
50
51 for fragment in table_fragment.fragments.values() {
52 let fragment_type_mask = fragment.fragment_type_mask;
53 if fragment_type_mask & FragmentTypeFlag::Source as u32 == 0
54 && fragment_type_mask & FragmentTypeFlag::SourceScan as u32 == 0
55 {
56 continue;
58 }
59 if fragment_type_mask & FragmentTypeFlag::Dml as u32 != 0 {
60 continue;
62 }
63
64 for actor in &fragment.actors {
65 if let Some(ConnectorSplits { splits }) = actor_splits.remove(&actor.actor_id) {
66 let num_splits = splits.len();
67 let splits = splits
68 .iter()
69 .map(|split| SplitImpl::try_from(split).unwrap())
70 .map(|split| split.id())
71 .collect_vec()
72 .join(",");
73 actor_splits_map.insert(actor.actor_id, (num_splits, splits));
74 }
75 }
76 }
77 }
78
79 for table_fragment in &table_fragments {
81 if table_fragment.actor_splits.is_empty() {
82 continue;
83 }
84 if ignore_id {
85 println!("Table");
86 } else {
87 println!("Table #{}", table_fragment.table_id);
88 }
89 for fragment in table_fragment
90 .fragments
91 .values()
92 .sorted_by_key(|f| f.fragment_id)
93 {
94 let fragment_type_mask = fragment.fragment_type_mask;
95 if fragment_type_mask & FragmentTypeFlag::Source as u32 == 0
96 && fragment_type_mask & FragmentTypeFlag::SourceScan as u32 == 0
97 {
98 continue;
100 }
101 if fragment_type_mask & FragmentTypeFlag::Dml as u32 != 0 {
102 continue;
104 }
105
106 println!(
107 "\tFragment{} ({})",
108 if ignore_id {
109 "".to_owned()
110 } else {
111 format!(" #{}", fragment.fragment_id)
112 },
113 if fragment_type_mask == FragmentTypeFlag::Source as u32 {
114 "Source"
115 } else {
116 "SourceScan"
117 }
118 );
119 for actor in fragment.actors.iter().sorted_by_key(|a| a.actor_id) {
120 if let Some((split_count, splits)) = actor_splits_map.get(&actor.actor_id) {
121 println!(
122 "\t\tActor{} ({} splits): [{}]",
123 if ignore_id {
124 "".to_owned()
125 } else {
126 format!(" #{:<3}", actor.actor_id,)
127 },
128 split_count,
129 splits,
130 );
131 } else {
132 println!(
133 "\t\tError: Actor #{:<3} (not found in actor_splits)",
134 actor.actor_id,
135 )
136 }
137 }
138 }
139 }
140
141 Ok(())
142}
143
144pub async fn cluster_info(context: &CtlContext) -> anyhow::Result<()> {
145 let GetClusterInfoResponse {
146 worker_nodes,
147 table_fragments,
148 actor_splits: _,
149 source_infos: _,
150 revision,
151 } = get_cluster_info(context).await?;
152
153 let mut fragments = BTreeMap::new();
155 let mut fragment_states = HashMap::new();
157
158 for table_fragment in &table_fragments {
159 for (&id, fragment) in &table_fragment.fragments {
160 for actor in &fragment.actors {
161 let worker_id = table_fragment
162 .actor_status
163 .get(&actor.actor_id)
164 .unwrap()
165 .worker_id();
166
167 fragments
168 .entry(id)
169 .or_insert_with(BTreeMap::new)
170 .entry(worker_id)
171 .or_insert(BTreeSet::new())
172 .insert(actor.actor_id);
173 }
174 fragment_states.insert(id, table_fragment.state());
175 }
176 }
177
178 let mut table = Table::new();
179
180 let cross_out_if_creating = |cell: Cell, fid: u32| -> Cell {
181 match fragment_states[&fid] {
182 State::Unspecified => unreachable!(),
183 State::Creating => cell.add_attribute(Attribute::CrossedOut),
184 State::Created | State::Initial => cell,
185 }
186 };
187
188 table.set_header({
190 let mut row = Row::new();
191 row.add_cell("Compute Node".into());
192 for &fid in fragments.keys() {
193 let cell = Cell::new(format!("Frag {fid}"));
194 let cell = cross_out_if_creating(cell, fid);
195 row.add_cell(cell);
196 }
197 row
198 });
199
200 let mut last_worker_id = None;
201 for worker in worker_nodes {
202 let mut row = Row::new();
204 row.add_cell(if last_worker_id == Some(worker.id) {
205 "".into()
206 } else {
207 last_worker_id = Some(worker.id);
208 let cordoned = if worker.get_property().map_or(true, |p| p.is_unschedulable) {
209 " (cordoned)"
210 } else {
211 ""
212 };
213 Cell::new(format!(
214 "{}@{}{}",
215 worker.id,
216 HostAddr::from(worker.get_host().unwrap()),
217 cordoned,
218 ))
219 .add_attribute(Attribute::Bold)
220 });
221 for (&fragment_id, worker_actors) in &fragments {
222 let cell = if let Some(actors) = worker_actors.get(&worker.id) {
223 actors
224 .iter()
225 .map(|actor| format!("{}", actor))
226 .join(",")
227 .into()
228 } else {
229 "-".into()
230 };
231 let cell = cross_out_if_creating(cell, fragment_id);
232 row.add_cell(cell);
233 }
234 table.add_row(row);
235 }
236
237 println!("{table}");
238 println!("Revision: {}", revision);
239
240 Ok(())
241}