risingwave_simulation/
ctl_ext.rs1use std::collections::{BTreeMap, HashMap, HashSet};
16use std::ffi::OsString;
17use std::sync::Arc;
18
19use anyhow::{Result, anyhow};
20use cfg_or_panic::cfg_or_panic;
21use clap::Parser;
22use itertools::Itertools;
23use rand::seq::IteratorRandom;
24use rand::{Rng, rng as thread_rng};
25use risingwave_common::catalog::TableId;
26use risingwave_common::hash::WorkerSlotId;
27use risingwave_common::id::WorkerId;
28use risingwave_connector::source::{SplitImpl, SplitMetaData};
29use risingwave_hummock_sdk::CompactionGroupId;
30use risingwave_pb::id::{ActorId, FragmentId};
31use risingwave_pb::meta::GetClusterInfoResponse;
32use risingwave_pb::meta::table_fragments::PbFragment;
33use risingwave_pb::stream_plan::StreamNode;
34
35use self::predicate::BoxedPredicate;
36use crate::cluster::Cluster;
37
38pub mod predicate {
40 use risingwave_pb::stream_plan::DispatcherType;
41 use risingwave_pb::stream_plan::stream_node::NodeBody;
42
43 use super::*;
44
45 trait Predicate = Fn(&PbFragment) -> bool + Send + 'static;
46 pub type BoxedPredicate = Box<dyn Predicate>;
47
48 fn root(fragment: &PbFragment) -> &StreamNode {
49 fragment.nodes.as_ref().unwrap()
50 }
51
52 fn count(root: &StreamNode, p: &impl Fn(&StreamNode) -> bool) -> usize {
53 let child = root.input.iter().map(|n| count(n, p)).sum::<usize>();
54 child + if p(root) { 1 } else { 0 }
55 }
56
57 fn any(root: &StreamNode, p: &impl Fn(&StreamNode) -> bool) -> bool {
58 p(root) || root.input.iter().any(|n| any(n, p))
59 }
60
61 fn all(root: &StreamNode, p: &impl Fn(&StreamNode) -> bool) -> bool {
62 p(root) && root.input.iter().all(|n| all(n, p))
63 }
64
65 pub fn identity_contains_n(n: usize, s: impl Into<String>) -> BoxedPredicate {
67 let s: String = s.into();
68 let p = move |f: &PbFragment| {
69 count(root(f), &|n| {
70 n.identity.to_lowercase().contains(&s.to_lowercase())
71 }) == n
72 };
73 Box::new(p)
74 }
75
76 pub fn identity_contains(s: impl Into<String>) -> BoxedPredicate {
78 let s: String = s.into();
79 let p = move |f: &PbFragment| {
80 any(root(f), &|n| {
81 n.identity.to_lowercase().contains(&s.to_lowercase())
82 })
83 };
84 Box::new(p)
85 }
86
87 pub fn no_identity_contains(s: impl Into<String>) -> BoxedPredicate {
89 let s: String = s.into();
90 let p = move |f: &PbFragment| {
91 all(root(f), &|n| {
92 !n.identity.to_lowercase().contains(&s.to_lowercase())
93 })
94 };
95 Box::new(p)
96 }
97
98 pub fn can_reschedule() -> BoxedPredicate {
100 let p = |f: &PbFragment| {
101 !any(root(f), &|n| {
104 let Some(NodeBody::Merge(merge)) = &n.node_body else {
105 return false;
106 };
107 merge.upstream_dispatcher_type() == DispatcherType::NoShuffle
108 })
109 };
110 Box::new(p)
111 }
112
113 pub fn id(id: u32) -> BoxedPredicate {
115 let p = move |f: &PbFragment| f.fragment_id == id;
116 Box::new(p)
117 }
118}
119
120#[derive(Debug)]
121pub struct Fragment {
122 pub inner: risingwave_pb::meta::table_fragments::Fragment,
123
124 r: Arc<GetClusterInfoResponse>,
125}
126
127impl Fragment {
128 pub fn id(&self) -> FragmentId {
130 self.inner.fragment_id
131 }
132
133 pub fn all_worker_count(&self) -> HashMap<WorkerId, usize> {
134 self.r
135 .worker_nodes
136 .iter()
137 .map(|w| (w.id, w.compute_node_parallelism()))
138 .collect()
139 }
140
141 pub fn all_worker_slots(&self) -> HashSet<WorkerSlotId> {
142 self.all_worker_count()
143 .into_iter()
144 .flat_map(|(k, v)| (0..v).map(move |idx| WorkerSlotId::new(k, idx as _)))
145 .collect()
146 }
147
148 pub fn parallelism(&self) -> usize {
149 self.inner.actors.len()
150 }
151
152 pub fn used_worker_count(&self) -> HashMap<WorkerId, usize> {
153 let actor_to_worker: HashMap<_, _> = self
154 .r
155 .table_fragments
156 .iter()
157 .flat_map(|tf| {
158 tf.actor_status
159 .iter()
160 .map(|(&actor_id, status)| (actor_id, status.worker_id()))
161 })
162 .collect();
163
164 self.inner
165 .actors
166 .iter()
167 .map(|a| actor_to_worker[&a.actor_id])
168 .fold(HashMap::<WorkerId, usize>::new(), |mut acc, num| {
169 *acc.entry(num).or_insert(0) += 1;
170 acc
171 })
172 }
173
174 pub fn used_worker_slots(&self) -> HashSet<WorkerSlotId> {
175 self.used_worker_count()
176 .into_iter()
177 .flat_map(|(k, v)| (0..v).map(move |idx| WorkerSlotId::new(k, idx as _)))
178 .collect()
179 }
180}
181
182impl Cluster {
183 #[cfg_or_panic(madsim)]
185 pub async fn locate_fragments(
186 &mut self,
187 predicates: impl IntoIterator<Item = BoxedPredicate>,
188 ) -> Result<Vec<Fragment>> {
189 let predicates = predicates.into_iter().collect_vec();
190
191 let fragments = self
192 .ctl
193 .spawn(async move {
194 let r: Arc<_> = risingwave_ctl::cmd_impl::meta::get_cluster_info(
195 &risingwave_ctl::common::CtlContext::default(),
196 )
197 .await?
198 .into();
199
200 let mut results = vec![];
201 for tf in &r.table_fragments {
202 for f in tf.fragments.values() {
203 let selected = predicates.iter().all(|p| p(f));
204 if selected {
205 results.push(Fragment {
206 inner: f.clone(),
207 r: r.clone(),
208 });
209 }
210 }
211 }
212
213 Ok::<_, anyhow::Error>(results)
214 })
215 .await??;
216
217 Ok(fragments)
218 }
219
220 pub async fn locate_one_fragment(
222 &mut self,
223 predicates: impl IntoIterator<Item = BoxedPredicate>,
224 ) -> Result<Fragment> {
225 let [fragment]: [_; 1] = self
226 .locate_fragments(predicates)
227 .await?
228 .try_into()
229 .map_err(|fs| anyhow!("not exactly one fragment: {fs:#?}"))?;
230 Ok(fragment)
231 }
232
233 pub async fn locate_random_fragment(&mut self) -> Result<Fragment> {
235 self.locate_fragments([predicate::can_reschedule()])
236 .await?
237 .into_iter()
238 .choose(&mut thread_rng())
239 .ok_or_else(|| anyhow!("no reschedulable fragment"))
240 }
241
242 pub async fn locate_random_fragments(&mut self) -> Result<Vec<Fragment>> {
244 let fragments = self.locate_fragments([predicate::can_reschedule()]).await?;
245 let len = thread_rng().random_range(1..=fragments.len());
246 let selected = fragments
247 .into_iter()
248 .choose_multiple(&mut thread_rng(), len);
249 Ok(selected)
250 }
251
252 pub async fn locate_fragment_by_id(&mut self, id: FragmentId) -> Result<Fragment> {
254 self.locate_one_fragment([predicate::id(id.as_raw_id())])
255 .await
256 }
257
258 #[cfg_or_panic(madsim)]
259 pub async fn get_cluster_info(&self) -> Result<GetClusterInfoResponse> {
260 let response = self
261 .ctl
262 .spawn(async move {
263 risingwave_ctl::cmd_impl::meta::get_cluster_info(
264 &risingwave_ctl::common::CtlContext::default(),
265 )
266 .await
267 })
268 .await??;
269 Ok(response)
270 }
271
272 pub async fn list_source_splits(&self) -> Result<BTreeMap<ActorId, String>> {
274 let info = self.get_cluster_info().await?;
275 let mut res = BTreeMap::new();
276
277 for (actor_id, splits) in info.actor_splits {
278 let splits = splits
279 .splits
280 .iter()
281 .map(|split| SplitImpl::try_from(split).unwrap())
282 .map(|split| split.id())
283 .collect_vec()
284 .join(",");
285 res.insert(actor_id, splits);
286 }
287
288 Ok(res)
289 }
290
291 #[cfg_or_panic(madsim)]
293 pub async fn pause(&mut self) -> Result<()> {
294 self.ctl.spawn(start_ctl(["meta", "pause"])).await??;
295 Ok(())
296 }
297
298 #[cfg_or_panic(madsim)]
300 pub async fn resume(&mut self) -> Result<()> {
301 self.ctl.spawn(start_ctl(["meta", "resume"])).await??;
302 Ok(())
303 }
304
305 #[cfg_or_panic(madsim)]
307 pub async fn throttle_mv(&mut self, table_id: TableId, rate_limit: Option<u32>) -> Result<()> {
308 self.ctl
309 .spawn(async move {
310 let mut command: Vec<String> = vec![
311 "throttle".into(),
312 "mv".into(),
313 "--id".into(),
314 table_id.to_string(),
315 "--throttle-type".into(),
316 "backfill".into(),
317 ];
318 if let Some(rate_limit) = rate_limit {
319 command.push("--rate".into());
320 command.push(rate_limit.to_string());
321 }
322 start_ctl(command).await
323 })
324 .await??;
325 Ok(())
326 }
327
328 #[cfg_or_panic(madsim)]
329 pub async fn split_compaction_group(
330 &mut self,
331 compaction_group_id: CompactionGroupId,
332 table_id: TableId,
333 ) -> Result<()> {
334 self.ctl
335 .spawn(async move {
336 let mut command: Vec<String> = vec![
337 "hummock".into(),
338 "split-compaction-group".into(),
339 "--compaction-group-id".into(),
340 compaction_group_id.to_string(),
341 "--table-ids".into(),
342 table_id.to_string(),
343 ];
344 start_ctl(command).await
345 })
346 .await??;
347 Ok(())
348 }
349
350 #[cfg_or_panic(madsim)]
351 pub async fn trigger_manual_compaction(
352 &mut self,
353 compaction_group_id: CompactionGroupId,
354 level_id: u32,
355 ) -> Result<()> {
356 self.ctl
357 .spawn(async move {
358 let mut command: Vec<String> = vec![
359 "hummock".into(),
360 "trigger-manual-compaction".into(),
361 "--compaction-group-id".into(),
362 compaction_group_id.to_string(),
363 "--level".into(),
364 level_id.to_string(),
365 ];
366 start_ctl(command).await
367 })
368 .await??;
369 Ok(())
370 }
371}
372
373#[cfg_attr(not(madsim), allow(dead_code))]
374pub(crate) async fn start_ctl<S, I>(args: I) -> Result<()>
375where
376 S: Into<OsString>,
377 I: IntoIterator<Item = S>,
378{
379 let args = std::iter::once("ctl".into()).chain(args.into_iter().map(|s| s.into()));
380 let opts = risingwave_ctl::CliOpts::parse_from(args);
381 let context = risingwave_ctl::common::CtlContext::default();
382 risingwave_ctl::start_fallible(opts, &context).await
383}