risingwave_simulation/
ctl_ext.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap, HashSet};
16use std::ffi::OsString;
17use std::sync::Arc;
18
19use anyhow::{Result, anyhow};
20use cfg_or_panic::cfg_or_panic;
21use clap::Parser;
22use itertools::Itertools;
23use rand::seq::IteratorRandom;
24use rand::{Rng, rng as thread_rng};
25use risingwave_common::catalog::TableId;
26use risingwave_common::hash::WorkerSlotId;
27use risingwave_common::id::WorkerId;
28use risingwave_connector::source::{SplitImpl, SplitMetaData};
29use risingwave_hummock_sdk::CompactionGroupId;
30use risingwave_pb::id::{ActorId, FragmentId};
31use risingwave_pb::meta::GetClusterInfoResponse;
32use risingwave_pb::meta::table_fragments::PbFragment;
33use risingwave_pb::stream_plan::StreamNode;
34
35use self::predicate::BoxedPredicate;
36use crate::cluster::Cluster;
37
38/// Predicates used for locating fragments.
39pub mod predicate {
40    use risingwave_pb::stream_plan::DispatcherType;
41    use risingwave_pb::stream_plan::stream_node::NodeBody;
42
43    use super::*;
44
45    trait Predicate = Fn(&PbFragment) -> bool + Send + 'static;
46    pub type BoxedPredicate = Box<dyn Predicate>;
47
48    fn root(fragment: &PbFragment) -> &StreamNode {
49        fragment.nodes.as_ref().unwrap()
50    }
51
52    fn count(root: &StreamNode, p: &impl Fn(&StreamNode) -> bool) -> usize {
53        let child = root.input.iter().map(|n| count(n, p)).sum::<usize>();
54        child + if p(root) { 1 } else { 0 }
55    }
56
57    fn any(root: &StreamNode, p: &impl Fn(&StreamNode) -> bool) -> bool {
58        p(root) || root.input.iter().any(|n| any(n, p))
59    }
60
61    fn all(root: &StreamNode, p: &impl Fn(&StreamNode) -> bool) -> bool {
62        p(root) && root.input.iter().all(|n| all(n, p))
63    }
64
65    /// There're exactly `n` operators whose identity contains `s` in the fragment.
66    pub fn identity_contains_n(n: usize, s: impl Into<String>) -> BoxedPredicate {
67        let s: String = s.into();
68        let p = move |f: &PbFragment| {
69            count(root(f), &|n| {
70                n.identity.to_lowercase().contains(&s.to_lowercase())
71            }) == n
72        };
73        Box::new(p)
74    }
75
76    /// There exists operators whose identity contains `s` in the fragment (case insensitive).
77    pub fn identity_contains(s: impl Into<String>) -> BoxedPredicate {
78        let s: String = s.into();
79        let p = move |f: &PbFragment| {
80            any(root(f), &|n| {
81                n.identity.to_lowercase().contains(&s.to_lowercase())
82            })
83        };
84        Box::new(p)
85    }
86
87    /// There does not exist any operator whose identity contains `s` in the fragment.
88    pub fn no_identity_contains(s: impl Into<String>) -> BoxedPredicate {
89        let s: String = s.into();
90        let p = move |f: &PbFragment| {
91            all(root(f), &|n| {
92                !n.identity.to_lowercase().contains(&s.to_lowercase())
93            })
94        };
95        Box::new(p)
96    }
97
98    /// The fragment is able to be rescheduled. Used for locating random fragment.
99    pub fn can_reschedule() -> BoxedPredicate {
100        let p = |f: &PbFragment| {
101            // The rescheduling of no-shuffle downstreams must be derived from the most upstream
102            // fragment. So if a fragment has no-shuffle upstreams, it cannot be rescheduled.
103            !any(root(f), &|n| {
104                let Some(NodeBody::Merge(merge)) = &n.node_body else {
105                    return false;
106                };
107                merge.upstream_dispatcher_type() == DispatcherType::NoShuffle
108            })
109        };
110        Box::new(p)
111    }
112
113    /// The fragment with the given id.
114    pub fn id(id: u32) -> BoxedPredicate {
115        let p = move |f: &PbFragment| f.fragment_id == id;
116        Box::new(p)
117    }
118}
119
120#[derive(Debug)]
121pub struct Fragment {
122    pub inner: risingwave_pb::meta::table_fragments::Fragment,
123
124    r: Arc<GetClusterInfoResponse>,
125}
126
127impl Fragment {
128    /// The fragment id.
129    pub fn id(&self) -> FragmentId {
130        self.inner.fragment_id
131    }
132
133    pub fn all_worker_count(&self) -> HashMap<WorkerId, usize> {
134        self.r
135            .worker_nodes
136            .iter()
137            .map(|w| (w.id, w.compute_node_parallelism()))
138            .collect()
139    }
140
141    pub fn all_worker_slots(&self) -> HashSet<WorkerSlotId> {
142        self.all_worker_count()
143            .into_iter()
144            .flat_map(|(k, v)| (0..v).map(move |idx| WorkerSlotId::new(k, idx as _)))
145            .collect()
146    }
147
148    pub fn parallelism(&self) -> usize {
149        self.inner.actors.len()
150    }
151
152    pub fn used_worker_count(&self) -> HashMap<WorkerId, usize> {
153        let actor_to_worker: HashMap<_, _> = self
154            .r
155            .table_fragments
156            .iter()
157            .flat_map(|tf| {
158                tf.actor_status
159                    .iter()
160                    .map(|(&actor_id, status)| (actor_id, status.worker_id()))
161            })
162            .collect();
163
164        self.inner
165            .actors
166            .iter()
167            .map(|a| actor_to_worker[&a.actor_id])
168            .fold(HashMap::<WorkerId, usize>::new(), |mut acc, num| {
169                *acc.entry(num).or_insert(0) += 1;
170                acc
171            })
172    }
173
174    pub fn used_worker_slots(&self) -> HashSet<WorkerSlotId> {
175        self.used_worker_count()
176            .into_iter()
177            .flat_map(|(k, v)| (0..v).map(move |idx| WorkerSlotId::new(k, idx as _)))
178            .collect()
179    }
180}
181
182impl Cluster {
183    /// Locate fragments that satisfy all the predicates.
184    #[cfg_or_panic(madsim)]
185    pub async fn locate_fragments(
186        &mut self,
187        predicates: impl IntoIterator<Item = BoxedPredicate>,
188    ) -> Result<Vec<Fragment>> {
189        let predicates = predicates.into_iter().collect_vec();
190
191        let fragments = self
192            .ctl
193            .spawn(async move {
194                let r: Arc<_> = risingwave_ctl::cmd_impl::meta::get_cluster_info(
195                    &risingwave_ctl::common::CtlContext::default(),
196                )
197                .await?
198                .into();
199
200                let mut results = vec![];
201                for tf in &r.table_fragments {
202                    for f in tf.fragments.values() {
203                        let selected = predicates.iter().all(|p| p(f));
204                        if selected {
205                            results.push(Fragment {
206                                inner: f.clone(),
207                                r: r.clone(),
208                            });
209                        }
210                    }
211                }
212
213                Ok::<_, anyhow::Error>(results)
214            })
215            .await??;
216
217        Ok(fragments)
218    }
219
220    /// Locate exactly one fragment that satisfies all the predicates.
221    pub async fn locate_one_fragment(
222        &mut self,
223        predicates: impl IntoIterator<Item = BoxedPredicate>,
224    ) -> Result<Fragment> {
225        let [fragment]: [_; 1] = self
226            .locate_fragments(predicates)
227            .await?
228            .try_into()
229            .map_err(|fs| anyhow!("not exactly one fragment: {fs:#?}"))?;
230        Ok(fragment)
231    }
232
233    /// Locate a random fragment that is reschedulable.
234    pub async fn locate_random_fragment(&mut self) -> Result<Fragment> {
235        self.locate_fragments([predicate::can_reschedule()])
236            .await?
237            .into_iter()
238            .choose(&mut thread_rng())
239            .ok_or_else(|| anyhow!("no reschedulable fragment"))
240    }
241
242    /// Locate some random fragments that are reschedulable.
243    pub async fn locate_random_fragments(&mut self) -> Result<Vec<Fragment>> {
244        let fragments = self.locate_fragments([predicate::can_reschedule()]).await?;
245        let len = thread_rng().random_range(1..=fragments.len());
246        let selected = fragments
247            .into_iter()
248            .choose_multiple(&mut thread_rng(), len);
249        Ok(selected)
250    }
251
252    /// Locate a fragment with the given id.
253    pub async fn locate_fragment_by_id(&mut self, id: FragmentId) -> Result<Fragment> {
254        self.locate_one_fragment([predicate::id(id.as_raw_id())])
255            .await
256    }
257
258    #[cfg_or_panic(madsim)]
259    pub async fn get_cluster_info(&self) -> Result<GetClusterInfoResponse> {
260        let response = self
261            .ctl
262            .spawn(async move {
263                risingwave_ctl::cmd_impl::meta::get_cluster_info(
264                    &risingwave_ctl::common::CtlContext::default(),
265                )
266                .await
267            })
268            .await??;
269        Ok(response)
270    }
271
272    /// `actor_id -> splits`
273    pub async fn list_source_splits(&self) -> Result<BTreeMap<ActorId, String>> {
274        let info = self.get_cluster_info().await?;
275        let mut res = BTreeMap::new();
276
277        for (actor_id, splits) in info.actor_splits {
278            let splits = splits
279                .splits
280                .iter()
281                .map(|split| SplitImpl::try_from(split).unwrap())
282                .map(|split| split.id())
283                .collect_vec()
284                .join(",");
285            res.insert(actor_id, splits);
286        }
287
288        Ok(res)
289    }
290
291    /// Pause all data sources in the cluster.
292    #[cfg_or_panic(madsim)]
293    pub async fn pause(&mut self) -> Result<()> {
294        self.ctl.spawn(start_ctl(["meta", "pause"])).await??;
295        Ok(())
296    }
297
298    /// Resume all data sources in the cluster.
299    #[cfg_or_panic(madsim)]
300    pub async fn resume(&mut self) -> Result<()> {
301        self.ctl.spawn(start_ctl(["meta", "resume"])).await??;
302        Ok(())
303    }
304
305    /// Throttle a Mv in the cluster
306    #[cfg_or_panic(madsim)]
307    pub async fn throttle_mv(&mut self, table_id: TableId, rate_limit: Option<u32>) -> Result<()> {
308        self.ctl
309            .spawn(async move {
310                let mut command: Vec<String> = vec![
311                    "throttle".into(),
312                    "mv".into(),
313                    "--id".into(),
314                    table_id.to_string(),
315                    "--throttle-type".into(),
316                    "backfill".into(),
317                ];
318                if let Some(rate_limit) = rate_limit {
319                    command.push("--rate".into());
320                    command.push(rate_limit.to_string());
321                }
322                start_ctl(command).await
323            })
324            .await??;
325        Ok(())
326    }
327
328    #[cfg_or_panic(madsim)]
329    pub async fn split_compaction_group(
330        &mut self,
331        compaction_group_id: CompactionGroupId,
332        table_id: TableId,
333    ) -> Result<()> {
334        self.ctl
335            .spawn(async move {
336                let mut command: Vec<String> = vec![
337                    "hummock".into(),
338                    "split-compaction-group".into(),
339                    "--compaction-group-id".into(),
340                    compaction_group_id.to_string(),
341                    "--table-ids".into(),
342                    table_id.to_string(),
343                ];
344                start_ctl(command).await
345            })
346            .await??;
347        Ok(())
348    }
349
350    #[cfg_or_panic(madsim)]
351    pub async fn trigger_manual_compaction(
352        &mut self,
353        compaction_group_id: CompactionGroupId,
354        level_id: u32,
355    ) -> Result<()> {
356        self.ctl
357            .spawn(async move {
358                let mut command: Vec<String> = vec![
359                    "hummock".into(),
360                    "trigger-manual-compaction".into(),
361                    "--compaction-group-id".into(),
362                    compaction_group_id.to_string(),
363                    "--level".into(),
364                    level_id.to_string(),
365                ];
366                start_ctl(command).await
367            })
368            .await??;
369        Ok(())
370    }
371}
372
373#[cfg_attr(not(madsim), allow(dead_code))]
374pub(crate) async fn start_ctl<S, I>(args: I) -> Result<()>
375where
376    S: Into<OsString>,
377    I: IntoIterator<Item = S>,
378{
379    let args = std::iter::once("ctl".into()).chain(args.into_iter().map(|s| s.into()));
380    let opts = risingwave_ctl::CliOpts::parse_from(args);
381    let context = risingwave_ctl::common::CtlContext::default();
382    risingwave_ctl::start_fallible(opts, &context).await
383}