risingwave_ctl/cmd_impl/meta/
cluster_info.rs1use std::collections::{BTreeMap, BTreeSet, HashMap};
16
17use comfy_table::{Attribute, Cell, Row, Table};
18use itertools::Itertools;
19use risingwave_common::catalog::FragmentTypeFlag;
20use risingwave_common::util::addr::HostAddr;
21use risingwave_connector::source::{SplitImpl, SplitMetaData};
22use risingwave_pb::id::{ActorId, FragmentId};
23use risingwave_pb::meta::GetClusterInfoResponse;
24use risingwave_pb::meta::table_fragments::State;
25use risingwave_pb::source::ConnectorSplits;
26
27use crate::CtlContext;
28
29pub async fn get_cluster_info(context: &CtlContext) -> anyhow::Result<GetClusterInfoResponse> {
30 let meta_client = context.meta_client().await?;
31 let response = meta_client.get_cluster_info().await?;
32 Ok(response)
33}
34
35pub async fn source_split_info(context: &CtlContext, ignore_id: bool) -> anyhow::Result<()> {
36 let GetClusterInfoResponse {
37 worker_nodes: _,
38 source_infos: _,
39 table_fragments,
40 mut actor_splits,
41 revision: _,
42 } = get_cluster_info(context).await?;
43
44 let mut actor_splits_map: BTreeMap<ActorId, (usize, String)> = BTreeMap::new();
45
46 for table_fragment in &table_fragments {
48 for fragment in table_fragment.fragments.values() {
49 let fragment_type_mask = fragment.fragment_type_mask;
50 if fragment_type_mask & FragmentTypeFlag::Source as u32 == 0
51 && fragment_type_mask & FragmentTypeFlag::SourceScan as u32 == 0
52 {
53 continue;
55 }
56 if fragment_type_mask & FragmentTypeFlag::Dml as u32 != 0 {
57 continue;
59 }
60
61 for actor in &fragment.actors {
62 if let Some(ConnectorSplits { splits }) = actor_splits.remove(&actor.actor_id) {
63 let num_splits = splits.len();
64 let splits = splits
65 .iter()
66 .map(|split| SplitImpl::try_from(split).unwrap())
67 .map(|split| split.id())
68 .collect_vec()
69 .join(",");
70 actor_splits_map.insert(actor.actor_id, (num_splits, splits));
71 }
72 }
73 }
74 }
75
76 for table_fragment in &table_fragments {
78 if ignore_id {
79 println!("Table");
80 } else {
81 println!("Table #{}", table_fragment.table_id);
82 }
83 for fragment in table_fragment
84 .fragments
85 .values()
86 .sorted_by_key(|f| f.fragment_id)
87 {
88 let fragment_type_mask = fragment.fragment_type_mask;
89 if fragment_type_mask & FragmentTypeFlag::Source as u32 == 0
90 && fragment_type_mask & FragmentTypeFlag::SourceScan as u32 == 0
91 {
92 continue;
94 }
95 if fragment_type_mask & FragmentTypeFlag::Dml as u32 != 0 {
96 continue;
98 }
99
100 println!(
101 "\tFragment{} ({})",
102 if ignore_id {
103 "".to_owned()
104 } else {
105 format!(" #{}", fragment.fragment_id)
106 },
107 if fragment_type_mask == FragmentTypeFlag::Source as u32 {
108 "Source"
109 } else {
110 "SourceScan"
111 }
112 );
113 for actor in fragment.actors.iter().sorted_by_key(|a| a.actor_id) {
114 if let Some((split_count, splits)) = actor_splits_map.get(&actor.actor_id) {
115 println!(
116 "\t\tActor{} ({} splits): [{}]",
117 if ignore_id {
118 "".to_owned()
119 } else {
120 format!(" #{:<3}", actor.actor_id,)
121 },
122 split_count,
123 splits,
124 );
125 } else {
126 println!(
127 "\t\tError: Actor #{:<3} (not found in actor_splits)",
128 actor.actor_id,
129 )
130 }
131 }
132 }
133 }
134
135 Ok(())
136}
137
138pub async fn cluster_info(context: &CtlContext) -> anyhow::Result<()> {
139 let GetClusterInfoResponse {
140 worker_nodes,
141 table_fragments,
142 actor_splits: _,
143 source_infos: _,
144 revision,
145 } = get_cluster_info(context).await?;
146
147 let mut fragments = BTreeMap::new();
149 let mut fragment_states = HashMap::new();
151
152 for table_fragment in &table_fragments {
153 for (&id, fragment) in &table_fragment.fragments {
154 for actor in &fragment.actors {
155 let worker_id = table_fragment
156 .actor_status
157 .get(&actor.actor_id)
158 .unwrap()
159 .worker_id();
160
161 fragments
162 .entry(id)
163 .or_insert_with(BTreeMap::new)
164 .entry(worker_id)
165 .or_insert(BTreeSet::new())
166 .insert(actor.actor_id);
167 }
168 fragment_states.insert(id, table_fragment.state());
169 }
170 }
171
172 let mut table = Table::new();
173
174 let cross_out_if_creating = |cell: Cell, fid: FragmentId| -> Cell {
175 match fragment_states[&fid] {
176 State::Unspecified => unreachable!(),
177 State::Creating => cell.add_attribute(Attribute::CrossedOut),
178 State::Created | State::Initial => cell,
179 }
180 };
181
182 table.set_header({
184 let mut row = Row::new();
185 row.add_cell("Compute Node".into());
186 for &fid in fragments.keys() {
187 let cell = Cell::new(format!("Frag {fid}"));
188 let cell = cross_out_if_creating(cell, fid);
189 row.add_cell(cell);
190 }
191 row
192 });
193
194 let mut last_worker_id = None;
195 for worker in worker_nodes {
196 let mut row = Row::new();
198 row.add_cell(if last_worker_id == Some(worker.id) {
199 "".into()
200 } else {
201 last_worker_id = Some(worker.id);
202 let cordoned = if worker.get_property().map_or(true, |p| p.is_unschedulable) {
203 " (cordoned)"
204 } else {
205 ""
206 };
207 Cell::new(format!(
208 "{}@{}{}",
209 worker.id,
210 HostAddr::from(worker.get_host().unwrap()),
211 cordoned,
212 ))
213 .add_attribute(Attribute::Bold)
214 });
215 for (&fragment_id, worker_actors) in &fragments {
216 let cell = if let Some(actors) = worker_actors.get(&worker.id) {
217 actors
218 .iter()
219 .map(|actor| format!("{}", actor))
220 .join(",")
221 .into()
222 } else {
223 "-".into()
224 };
225 let cell = cross_out_if_creating(cell, fragment_id);
226 row.add_cell(cell);
227 }
228 table.add_row(row);
229 }
230
231 println!("{table}");
232 println!("Revision: {}", revision);
233
234 Ok(())
235}