risingwave_simulation/
cluster.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![cfg_attr(not(madsim), allow(unused_imports))]
16
17use std::cmp::max;
18use std::collections::HashMap;
19use std::future::Future;
20use std::io::Write;
21use std::path::PathBuf;
22use std::sync::Arc;
23use std::time::Duration;
24
25use anyhow::{Result, anyhow, bail};
26use cfg_or_panic::cfg_or_panic;
27use clap::Parser;
28use futures::channel::{mpsc, oneshot};
29use futures::future::join_all;
30use futures::{SinkExt, StreamExt};
31use itertools::Itertools;
32#[cfg(madsim)]
33use madsim::runtime::{Handle, NodeHandle};
34use rand::Rng;
35use rand::seq::IteratorRandom;
36use risingwave_common::util::tokio_util::sync::CancellationToken;
37use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
38#[cfg(madsim)]
39use risingwave_object_store::object::sim::SimServer as ObjectStoreSimServer;
40use risingwave_pb::common::WorkerNode;
41use sqllogictest::AsyncDB;
42use tempfile::NamedTempFile;
43#[cfg(not(madsim))]
44use tokio::runtime::Handle;
45use uuid::Uuid;
46
47use crate::client::RisingWave;
48
49/// The path to the configuration file for the cluster.
50#[derive(Clone, Debug)]
51pub enum ConfigPath {
52    /// A regular path pointing to a external configuration file.
53    Regular(String),
54    /// A temporary path pointing to a configuration file created at runtime.
55    Temp(Arc<tempfile::TempPath>),
56}
57
58impl ConfigPath {
59    pub fn as_str(&self) -> &str {
60        match self {
61            ConfigPath::Regular(s) => s,
62            ConfigPath::Temp(p) => p.as_os_str().to_str().unwrap(),
63        }
64    }
65}
66
67/// RisingWave cluster configuration.
68#[derive(Debug, Clone)]
69pub struct Configuration {
70    /// The path to configuration file.
71    ///
72    /// Empty string means using the default config.
73    pub config_path: ConfigPath,
74
75    /// The number of frontend nodes.
76    pub frontend_nodes: usize,
77
78    /// The number of compute nodes.
79    pub compute_nodes: usize,
80
81    /// The number of meta nodes.
82    pub meta_nodes: usize,
83
84    /// The number of compactor nodes.
85    pub compactor_nodes: usize,
86
87    /// The number of CPU cores for each compute node.
88    ///
89    /// This determines `worker_node_parallelism`.
90    pub compute_node_cores: usize,
91
92    /// Queries to run per session.
93    pub per_session_queries: Arc<Vec<String>>,
94
95    /// Resource groups for compute nodes.
96    pub compute_resource_groups: HashMap<usize, String>,
97}
98
99impl Default for Configuration {
100    fn default() -> Self {
101        let config_path = {
102            let mut file =
103                tempfile::NamedTempFile::new().expect("failed to create temp config file");
104
105            let config_data = r#"
106[server]
107telemetry_enabled = false
108metrics_level = "Disabled"
109"#
110            .to_owned();
111            file.write_all(config_data.as_bytes())
112                .expect("failed to write config file");
113            file.into_temp_path()
114        };
115
116        Configuration {
117            config_path: ConfigPath::Temp(config_path.into()),
118            frontend_nodes: 1,
119            compute_nodes: 1,
120            meta_nodes: 1,
121            compactor_nodes: 1,
122            compute_node_cores: 1,
123            per_session_queries: vec![].into(),
124            compute_resource_groups: Default::default(),
125        }
126    }
127}
128
129impl Configuration {
130    /// Returns the configuration for scale test.
131    pub fn for_scale() -> Self {
132        // Embed the config file and create a temporary file at runtime. The file will be deleted
133        // automatically when it's dropped.
134        let config_path = {
135            let mut file =
136                tempfile::NamedTempFile::new().expect("failed to create temp config file");
137            file.write_all(include_bytes!("risingwave-scale.toml"))
138                .expect("failed to write config file");
139            file.into_temp_path()
140        };
141
142        Configuration {
143            config_path: ConfigPath::Temp(config_path.into()),
144            frontend_nodes: 2,
145            compute_nodes: 3,
146            meta_nodes: 1,
147            compactor_nodes: 2,
148            compute_node_cores: 2,
149            ..Default::default()
150        }
151    }
152
153    /// Provides a configuration for scale test which ensures that the arrangement backfill is disabled,
154    /// so table scan will use `no_shuffle`.
155    pub fn for_scale_no_shuffle() -> Self {
156        let mut conf = Self::for_scale();
157        conf.per_session_queries =
158            vec!["SET STREAMING_USE_ARRANGEMENT_BACKFILL = false;".into()].into();
159        conf
160    }
161
162    pub fn for_scale_shared_source() -> Self {
163        let mut conf = Self::for_scale();
164        conf.per_session_queries = vec!["SET STREAMING_USE_SHARED_SOURCE = true;".into()].into();
165        conf
166    }
167
168    pub fn for_auto_parallelism(
169        max_heartbeat_interval_secs: u64,
170        enable_auto_parallelism: bool,
171    ) -> Self {
172        let disable_automatic_parallelism_control = !enable_auto_parallelism;
173
174        let config_path = {
175            let mut file =
176                tempfile::NamedTempFile::new().expect("failed to create temp config file");
177
178            let config_data = format!(
179                r#"[meta]
180max_heartbeat_interval_secs = {max_heartbeat_interval_secs}
181disable_automatic_parallelism_control = {disable_automatic_parallelism_control}
182parallelism_control_trigger_first_delay_sec = 0
183parallelism_control_batch_size = 10
184parallelism_control_trigger_period_sec = 10
185
186[system]
187barrier_interval_ms = 250
188checkpoint_frequency = 4
189
190[server]
191telemetry_enabled = false
192metrics_level = "Disabled"
193"#
194            );
195            file.write_all(config_data.as_bytes())
196                .expect("failed to write config file");
197            file.into_temp_path()
198        };
199
200        Configuration {
201            config_path: ConfigPath::Temp(config_path.into()),
202            frontend_nodes: 1,
203            compute_nodes: 3,
204            meta_nodes: 1,
205            compactor_nodes: 1,
206            compute_node_cores: 2,
207            per_session_queries: vec![
208                "create view if not exists table_parallelism as select t.name, tf.parallelism from rw_tables t, rw_table_fragments tf where t.id = tf.table_id;".into(),
209                "create view if not exists mview_parallelism as select m.name, tf.parallelism from rw_materialized_views m, rw_table_fragments tf where m.id = tf.table_id;".into(),
210            ]
211                .into(),
212            ..Default::default()
213        }
214    }
215
216    pub fn for_default_parallelism(default_parallelism: usize) -> Self {
217        let config_path = {
218            let mut file =
219                tempfile::NamedTempFile::new().expect("failed to create temp config file");
220
221            let config_data = format!(
222                r#"
223[server]
224telemetry_enabled = false
225metrics_level = "Disabled"
226[meta]
227default_parallelism = {default_parallelism}
228"#
229            )
230            .to_owned();
231            file.write_all(config_data.as_bytes())
232                .expect("failed to write config file");
233            file.into_temp_path()
234        };
235
236        Configuration {
237            config_path: ConfigPath::Temp(config_path.into()),
238            frontend_nodes: 1,
239            compute_nodes: 1,
240            meta_nodes: 1,
241            compactor_nodes: 1,
242            compute_node_cores: default_parallelism * 2,
243            per_session_queries: vec![].into(),
244            compute_resource_groups: Default::default(),
245        }
246    }
247
248    /// Returns the config for backfill test.
249    pub fn for_backfill() -> Self {
250        // Embed the config file and create a temporary file at runtime. The file will be deleted
251        // automatically when it's dropped.
252        let config_path = {
253            let mut file =
254                tempfile::NamedTempFile::new().expect("failed to create temp config file");
255            file.write_all(include_bytes!("backfill.toml"))
256                .expect("failed to write config file");
257            file.into_temp_path()
258        };
259
260        Configuration {
261            config_path: ConfigPath::Temp(config_path.into()),
262            frontend_nodes: 1,
263            compute_nodes: 1,
264            meta_nodes: 1,
265            compactor_nodes: 1,
266            compute_node_cores: 4,
267            ..Default::default()
268        }
269    }
270
271    pub fn for_arrangement_backfill() -> Self {
272        // Embed the config file and create a temporary file at runtime. The file will be deleted
273        // automatically when it's dropped.
274        let config_path = {
275            let mut file =
276                tempfile::NamedTempFile::new().expect("failed to create temp config file");
277            file.write_all(include_bytes!("arrangement_backfill.toml"))
278                .expect("failed to write config file");
279            file.into_temp_path()
280        };
281
282        Configuration {
283            config_path: ConfigPath::Temp(config_path.into()),
284            frontend_nodes: 1,
285            compute_nodes: 3,
286            meta_nodes: 1,
287            compactor_nodes: 1,
288            compute_node_cores: 1,
289            per_session_queries: vec!["SET STREAMING_USE_ARRANGEMENT_BACKFILL = true;".into()]
290                .into(),
291            ..Default::default()
292        }
293    }
294
295    pub fn for_background_ddl() -> Self {
296        // Embed the config file and create a temporary file at runtime. The file will be deleted
297        // automatically when it's dropped.
298        let config_path = {
299            let mut file =
300                tempfile::NamedTempFile::new().expect("failed to create temp config file");
301            file.write_all(include_bytes!("background_ddl.toml"))
302                .expect("failed to write config file");
303            file.into_temp_path()
304        };
305
306        Configuration {
307            config_path: ConfigPath::Temp(config_path.into()),
308            // NOTE(kwannoel): The cancel test depends on `processlist`,
309            // which will cancel a stream job within the process.
310            // so we cannot have multiple frontend node, since a new session spawned
311            // to cancel the job could be routed to a different frontend node,
312            // in a different process.
313            frontend_nodes: 1,
314            compute_nodes: 3,
315            meta_nodes: 1,
316            compactor_nodes: 2,
317            compute_node_cores: 2,
318            ..Default::default()
319        }
320    }
321
322    pub fn enable_arrangement_backfill() -> Self {
323        let config_path = {
324            let mut file =
325                tempfile::NamedTempFile::new().expect("failed to create temp config file");
326            file.write_all(include_bytes!("disable_arrangement_backfill.toml"))
327                .expect("failed to write config file");
328            file.into_temp_path()
329        };
330        Configuration {
331            config_path: ConfigPath::Temp(config_path.into()),
332            frontend_nodes: 1,
333            compute_nodes: 1,
334            meta_nodes: 1,
335            compactor_nodes: 1,
336            compute_node_cores: 1,
337            per_session_queries: vec![].into(),
338            ..Default::default()
339        }
340    }
341
342    pub fn total_streaming_cores(&self) -> u32 {
343        (self.compute_nodes * self.compute_node_cores) as u32
344    }
345}
346
347/// A risingwave cluster.
348///
349/// # Nodes
350///
351/// | Name             | IP            |
352/// | ---------------- | ------------- |
353/// | meta-x           | 192.168.1.x   |
354/// | frontend-x       | 192.168.2.x   |
355/// | compute-x        | 192.168.3.x   |
356/// | compactor-x      | 192.168.4.x   |
357/// | kafka-broker     | 192.168.11.1  |
358/// | kafka-producer   | 192.168.11.2  |
359/// | object_store_sim | 192.168.12.1  |
360/// | client           | 192.168.100.1 |
361/// | ctl              | 192.168.101.1 |
362pub struct Cluster {
363    config: Configuration,
364    handle: Handle,
365    #[cfg(madsim)]
366    pub(crate) client: NodeHandle,
367    #[cfg(madsim)]
368    pub(crate) ctl: NodeHandle,
369    #[cfg(madsim)]
370    pub(crate) sqlite_file_handle: NamedTempFile,
371}
372
373impl Cluster {
374    /// Start a RisingWave cluster for testing.
375    ///
376    /// This function should be called exactly once in a test.
377    #[cfg_or_panic(madsim)]
378    pub async fn start(conf: Configuration) -> Result<Self> {
379        use madsim::net::ipvs::*;
380
381        let handle = madsim::runtime::Handle::current();
382        println!("seed = {}", handle.seed());
383        println!("{:#?}", conf);
384
385        // TODO: support mutil meta nodes
386        assert_eq!(conf.meta_nodes, 1);
387
388        // setup DNS and load balance
389        let net = madsim::net::NetSim::current();
390        for i in 1..=conf.meta_nodes {
391            net.add_dns_record(
392                &format!("meta-{i}"),
393                format!("192.168.1.{i}").parse().unwrap(),
394            );
395        }
396
397        net.add_dns_record("frontend", "192.168.2.0".parse().unwrap());
398        net.add_dns_record("message_queue", "192.168.11.1".parse().unwrap());
399        net.global_ipvs().add_service(
400            ServiceAddr::Tcp("192.168.2.0:4566".into()),
401            Scheduler::RoundRobin,
402        );
403        for i in 1..=conf.frontend_nodes {
404            net.global_ipvs().add_server(
405                ServiceAddr::Tcp("192.168.2.0:4566".into()),
406                &format!("192.168.2.{i}:4566"),
407            )
408        }
409
410        // kafka broker
411        handle
412            .create_node()
413            .name("kafka-broker")
414            .ip("192.168.11.1".parse().unwrap())
415            .init(move || async move {
416                rdkafka::SimBroker::default()
417                    .serve("0.0.0.0:29092".parse().unwrap())
418                    .await
419            })
420            .build();
421
422        // object_store_sim
423        handle
424            .create_node()
425            .name("object_store_sim")
426            .ip("192.168.12.1".parse().unwrap())
427            .init(move || async move {
428                ObjectStoreSimServer::builder()
429                    .serve("0.0.0.0:9301".parse().unwrap())
430                    .await
431            })
432            .build();
433
434        // wait for the service to be ready
435        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
436
437        let mut meta_addrs = vec![];
438        for i in 1..=conf.meta_nodes {
439            meta_addrs.push(format!("http://meta-{i}:5690"));
440        }
441        unsafe { std::env::set_var("RW_META_ADDR", meta_addrs.join(",")) };
442
443        let sqlite_file_handle: NamedTempFile = NamedTempFile::new().unwrap();
444        let file_path = sqlite_file_handle.path().display().to_string();
445        tracing::info!(?file_path, "sqlite_file_path");
446        let sql_endpoint = format!("sqlite://{}?mode=rwc", file_path);
447        let backend_args = vec!["--backend", "sql", "--sql-endpoint", &sql_endpoint];
448
449        // meta node
450        for i in 1..=conf.meta_nodes {
451            let args = [
452                "meta-node",
453                "--config-path",
454                conf.config_path.as_str(),
455                "--listen-addr",
456                "0.0.0.0:5690",
457                "--advertise-addr",
458                &format!("meta-{i}:5690"),
459                "--state-store",
460                "hummock+sim://hummockadmin:hummockadmin@192.168.12.1:9301/hummock001",
461                "--data-directory",
462                "hummock_001",
463                "--temp-secret-file-dir",
464                &format!("./secrets/meta-{i}"),
465            ];
466            let args = args.into_iter().chain(backend_args.clone().into_iter());
467            let opts = risingwave_meta_node::MetaNodeOpts::parse_from(args);
468            handle
469                .create_node()
470                .name(format!("meta-{i}"))
471                .ip([192, 168, 1, i as u8].into())
472                .init(move || {
473                    risingwave_meta_node::start(
474                        opts.clone(),
475                        CancellationToken::new(), // dummy
476                    )
477                })
478                .build();
479        }
480
481        // wait for the service to be ready
482        tokio::time::sleep(std::time::Duration::from_secs(15)).await;
483
484        // frontend node
485        for i in 1..=conf.frontend_nodes {
486            let opts = risingwave_frontend::FrontendOpts::parse_from([
487                "frontend-node",
488                "--config-path",
489                conf.config_path.as_str(),
490                "--listen-addr",
491                "0.0.0.0:4566",
492                "--health-check-listener-addr",
493                "0.0.0.0:6786",
494                "--advertise-addr",
495                &format!("192.168.2.{i}:4566"),
496                "--temp-secret-file-dir",
497                &format!("./secrets/frontend-{i}"),
498            ]);
499            handle
500                .create_node()
501                .name(format!("frontend-{i}"))
502                .ip([192, 168, 2, i as u8].into())
503                .init(move || {
504                    risingwave_frontend::start(
505                        opts.clone(),
506                        CancellationToken::new(), // dummy
507                    )
508                })
509                .build();
510        }
511
512        // compute node
513        for i in 1..=conf.compute_nodes {
514            let opts = risingwave_compute::ComputeNodeOpts::parse_from([
515                "compute-node",
516                "--config-path",
517                conf.config_path.as_str(),
518                "--listen-addr",
519                "0.0.0.0:5688",
520                "--advertise-addr",
521                &format!("192.168.3.{i}:5688"),
522                "--total-memory-bytes",
523                "6979321856",
524                "--parallelism",
525                &conf.compute_node_cores.to_string(),
526                "--temp-secret-file-dir",
527                &format!("./secrets/compute-{i}"),
528                "--resource-group",
529                &conf
530                    .compute_resource_groups
531                    .get(&i)
532                    .cloned()
533                    .unwrap_or(DEFAULT_RESOURCE_GROUP.to_string()),
534            ]);
535            handle
536                .create_node()
537                .name(format!("compute-{i}"))
538                .ip([192, 168, 3, i as u8].into())
539                .cores(conf.compute_node_cores)
540                .init(move || {
541                    risingwave_compute::start(
542                        opts.clone(),
543                        CancellationToken::new(), // dummy
544                    )
545                })
546                .build();
547        }
548
549        // compactor node
550        for i in 1..=conf.compactor_nodes {
551            let opts = risingwave_compactor::CompactorOpts::parse_from([
552                "compactor-node",
553                "--config-path",
554                conf.config_path.as_str(),
555                "--listen-addr",
556                "0.0.0.0:6660",
557                "--advertise-addr",
558                &format!("192.168.4.{i}:6660"),
559            ]);
560            handle
561                .create_node()
562                .name(format!("compactor-{i}"))
563                .ip([192, 168, 4, i as u8].into())
564                .init(move || {
565                    risingwave_compactor::start(
566                        opts.clone(),
567                        CancellationToken::new(), // dummy
568                    )
569                })
570                .build();
571        }
572
573        // wait for the service to be ready
574        tokio::time::sleep(Duration::from_secs(15)).await;
575
576        // client
577        let client = handle
578            .create_node()
579            .name("client")
580            .ip([192, 168, 100, 1].into())
581            .build();
582
583        // risectl
584        let ctl = handle
585            .create_node()
586            .name("ctl")
587            .ip([192, 168, 101, 1].into())
588            .build();
589
590        Ok(Self {
591            config: conf,
592            handle,
593            client,
594            ctl,
595            sqlite_file_handle,
596        })
597    }
598
599    #[cfg_or_panic(madsim)]
600    fn per_session_queries(&self) -> Arc<Vec<String>> {
601        self.config.per_session_queries.clone()
602    }
603
604    /// Start a SQL session on the client node.
605    #[cfg_or_panic(madsim)]
606    pub fn start_session(&mut self) -> Session {
607        let (query_tx, mut query_rx) = mpsc::channel::<SessionRequest>(0);
608        let per_session_queries = self.per_session_queries();
609
610        self.client.spawn(async move {
611            let mut client = RisingWave::connect("frontend".into(), "dev".into()).await?;
612
613            for sql in per_session_queries.as_ref() {
614                client.run(sql).await?;
615            }
616            drop(per_session_queries);
617
618            while let Some((sql, tx)) = query_rx.next().await {
619                let result = client
620                    .run(&sql)
621                    .await
622                    .map(|output| match output {
623                        sqllogictest::DBOutput::Rows { rows, .. } => rows
624                            .into_iter()
625                            .map(|row| {
626                                row.into_iter()
627                                    .map(|v| v.to_string())
628                                    .collect::<Vec<_>>()
629                                    .join(" ")
630                            })
631                            .collect::<Vec<_>>()
632                            .join("\n"),
633                        _ => "".to_string(),
634                    })
635                    .map_err(Into::into);
636
637                let _ = tx.send(result);
638            }
639
640            Ok::<_, anyhow::Error>(())
641        });
642
643        Session { query_tx }
644    }
645
646    /// Run a SQL query on a **new** session of the client node.
647    ///
648    /// This is a convenience method that creates a new session and runs the query on it. If you
649    /// want to run multiple queries on the same session, use `start_session` and `Session::run`.
650    pub async fn run(&mut self, sql: impl Into<String>) -> Result<String> {
651        self.start_session().run(sql).await
652    }
653
654    /// Run a future on the client node.
655    #[cfg_or_panic(madsim)]
656    pub async fn run_on_client<F>(&self, future: F) -> F::Output
657    where
658        F: Future + Send + 'static,
659        F::Output: Send + 'static,
660    {
661        self.client.spawn(future).await.unwrap()
662    }
663
664    pub async fn get_random_worker_nodes(&self, n: usize) -> Result<Vec<WorkerNode>> {
665        let worker_nodes = self.get_cluster_info().await?.get_worker_nodes().clone();
666        if worker_nodes.len() < n {
667            return Err(anyhow!("cannot remove more nodes than present"));
668        }
669        let rand_nodes = worker_nodes
670            .iter()
671            .choose_multiple(&mut rand::rng(), n)
672            .to_vec();
673        Ok(rand_nodes.iter().cloned().cloned().collect_vec())
674    }
675
676    /// Run a SQL query from the client and wait until the condition is met.
677    pub async fn wait_until(
678        &mut self,
679        sql: impl Into<String> + Clone,
680        mut p: impl FnMut(&str) -> bool,
681        interval: Duration,
682        timeout: Duration,
683    ) -> Result<String> {
684        let fut = async move {
685            let mut interval = tokio::time::interval(interval);
686            loop {
687                interval.tick().await;
688                let result = self.run(sql.clone()).await?;
689                if p(&result) {
690                    return Ok::<_, anyhow::Error>(result);
691                }
692            }
693        };
694
695        match tokio::time::timeout(timeout, fut).await {
696            Ok(r) => Ok(r?),
697            Err(_) => bail!("wait_until timeout"),
698        }
699    }
700
701    /// Run a SQL query from the client and wait until the return result is not empty.
702    pub async fn wait_until_non_empty(
703        &mut self,
704        sql: &str,
705        interval: Duration,
706        timeout: Duration,
707    ) -> Result<String> {
708        self.wait_until(sql, |r| !r.trim().is_empty(), interval, timeout)
709            .await
710    }
711
712    /// Generate a list of random worker nodes to kill by `opts`, then call `kill_nodes` to kill and
713    /// restart them.
714    pub async fn kill_node(&self, opts: &KillOpts) {
715        let mut nodes = vec![];
716        if opts.kill_meta {
717            let rand = rand::rng().random_range(0..3);
718            for i in 1..=self.config.meta_nodes {
719                match rand {
720                    0 => break,                                     // no killed
721                    1 => {}                                         // all killed
722                    _ if !rand::rng().random_bool(0.5) => continue, // random killed
723                    _ => {}
724                }
725                nodes.push(format!("meta-{}", i));
726            }
727            // don't kill all meta services
728            if nodes.len() == self.config.meta_nodes {
729                nodes.truncate(1);
730            }
731        }
732        if opts.kill_frontend {
733            let rand = rand::rng().random_range(0..3);
734            for i in 1..=self.config.frontend_nodes {
735                match rand {
736                    0 => break,                                     // no killed
737                    1 => {}                                         // all killed
738                    _ if !rand::rng().random_bool(0.5) => continue, // random killed
739                    _ => {}
740                }
741                nodes.push(format!("frontend-{}", i));
742            }
743        }
744        if opts.kill_compute {
745            let rand = rand::rng().random_range(0..3);
746            for i in 1..=self.config.compute_nodes {
747                match rand {
748                    0 => break,                                     // no killed
749                    1 => {}                                         // all killed
750                    _ if !rand::rng().random_bool(0.5) => continue, // random killed
751                    _ => {}
752                }
753                nodes.push(format!("compute-{}", i));
754            }
755        }
756        if opts.kill_compactor {
757            let rand = rand::rng().random_range(0..3);
758            for i in 1..=self.config.compactor_nodes {
759                match rand {
760                    0 => break,                                     // no killed
761                    1 => {}                                         // all killed
762                    _ if !rand::rng().random_bool(0.5) => continue, // random killed
763                    _ => {}
764                }
765                nodes.push(format!("compactor-{}", i));
766            }
767        }
768
769        self.kill_nodes(nodes, opts.restart_delay_secs).await
770    }
771
772    /// Kill the given nodes by their names and restart them in 2s + `restart_delay_secs` with a
773    /// probability of 0.1.
774    #[cfg_or_panic(madsim)]
775    pub async fn kill_nodes(
776        &self,
777        nodes: impl IntoIterator<Item = impl AsRef<str>>,
778        restart_delay_secs: u32,
779    ) {
780        join_all(nodes.into_iter().map(|name| async move {
781            let name = name.as_ref();
782            let t = rand::rng().random_range(Duration::from_secs(0)..Duration::from_secs(1));
783            tokio::time::sleep(t).await;
784            tracing::info!("kill {name}");
785            Handle::current().kill(name);
786
787            let mut t = rand::rng().random_range(Duration::from_secs(0)..Duration::from_secs(1));
788            // has a small chance to restart after a long time
789            // so that the node is expired and removed from the cluster
790            if rand::rng().random_bool(0.1) {
791                // max_heartbeat_interval_secs = 15
792                t += Duration::from_secs(restart_delay_secs as u64);
793            }
794            tokio::time::sleep(t).await;
795            tracing::info!("restart {name}");
796            Handle::current().restart(name);
797        }))
798        .await;
799    }
800
801    #[cfg_or_panic(madsim)]
802    pub async fn kill_nodes_and_restart(
803        &self,
804        nodes: impl IntoIterator<Item = impl AsRef<str>>,
805        restart_delay_secs: u32,
806    ) {
807        join_all(nodes.into_iter().map(|name| async move {
808            let name = name.as_ref();
809            tracing::info!("kill {name}");
810            Handle::current().kill(name);
811            tokio::time::sleep(Duration::from_secs(restart_delay_secs as u64)).await;
812            tracing::info!("restart {name}");
813            Handle::current().restart(name);
814        }))
815        .await;
816    }
817
818    #[cfg_or_panic(madsim)]
819    pub async fn simple_kill_nodes(&self, nodes: impl IntoIterator<Item = impl AsRef<str>>) {
820        join_all(nodes.into_iter().map(|name| async move {
821            let name = name.as_ref();
822            tracing::info!("kill {name}");
823            Handle::current().kill(name);
824        }))
825        .await;
826    }
827
828    #[cfg_or_panic(madsim)]
829    pub async fn simple_restart_nodes(&self, nodes: impl IntoIterator<Item = impl AsRef<str>>) {
830        join_all(nodes.into_iter().map(|name| async move {
831            let name = name.as_ref();
832            tracing::info!("restart {name}");
833            Handle::current().restart(name);
834        }))
835        .await;
836    }
837
838    /// Create a node for kafka producer and prepare data.
839    #[cfg_or_panic(madsim)]
840    pub async fn create_kafka_producer(&self, datadir: &str) {
841        self.handle
842            .create_node()
843            .name("kafka-producer")
844            .ip("192.168.11.2".parse().unwrap())
845            .build()
846            .spawn(crate::kafka::producer(
847                "192.168.11.1:29092",
848                datadir.to_string(),
849            ))
850            .await
851            .unwrap();
852    }
853
854    /// Create a kafka topic.
855    #[cfg_or_panic(madsim)]
856    pub fn create_kafka_topics(&self, topics: HashMap<String, i32>) {
857        self.handle
858            .create_node()
859            .name("kafka-topic-create")
860            .ip("192.168.11.3".parse().unwrap())
861            .build()
862            .spawn(crate::kafka::create_topics("192.168.11.1:29092", topics));
863    }
864
865    pub fn config(&self) -> Configuration {
866        self.config.clone()
867    }
868
869    pub fn handle(&self) -> &Handle {
870        &self.handle
871    }
872
873    /// Graceful shutdown all RisingWave nodes.
874    #[cfg_or_panic(madsim)]
875    pub async fn graceful_shutdown(&self) {
876        let mut nodes = vec![];
877        let mut metas = vec![];
878        for i in 1..=self.config.meta_nodes {
879            metas.push(format!("meta-{i}"));
880        }
881        for i in 1..=self.config.frontend_nodes {
882            nodes.push(format!("frontend-{i}"));
883        }
884        for i in 1..=self.config.compute_nodes {
885            nodes.push(format!("compute-{i}"));
886        }
887        for i in 1..=self.config.compactor_nodes {
888            nodes.push(format!("compactor-{i}"));
889        }
890
891        tracing::info!("graceful shutdown");
892        let waiting_time = Duration::from_secs(10);
893        // shutdown frontends, computes, compactors
894        for node in &nodes {
895            self.handle.send_ctrl_c(node);
896        }
897        tokio::time::sleep(waiting_time).await;
898        // shutdown metas
899        for meta in &metas {
900            self.handle.send_ctrl_c(meta);
901        }
902        tokio::time::sleep(waiting_time).await;
903
904        // check all nodes are exited
905        for node in nodes.iter().chain(metas.iter()) {
906            if !self.handle.is_exit(node) {
907                panic!("failed to graceful shutdown {node} in {waiting_time:?}");
908            }
909        }
910    }
911
912    pub async fn wait_for_recovery(&mut self) -> Result<()> {
913        let timeout = Duration::from_secs(200);
914        let mut session = self.start_session();
915        tokio::time::timeout(timeout, async {
916            loop {
917                if let Ok(result) = session.run("select rw_recovery_status()").await
918                    && result == "RUNNING"
919                {
920                    break;
921                }
922                tokio::time::sleep(Duration::from_nanos(10)).await;
923            }
924        })
925        .await?;
926        Ok(())
927    }
928
929    /// This function only works if all actors in your cluster are following adaptive scaling.
930    pub async fn wait_for_scale(&mut self, parallelism: usize) -> Result<()> {
931        let timeout = Duration::from_secs(200);
932        let mut session = self.start_session();
933        tokio::time::timeout(timeout, async {
934            loop {
935                let parallelism_sql = format!(
936                    "select count(parallelism) filter (where parallelism != {parallelism})\
937                from (select count(*) parallelism from rw_actors group by fragment_id);"
938                );
939                if let Ok(result) = session.run(&parallelism_sql).await
940                    && result == "0"
941                {
942                    break;
943                }
944                tokio::time::sleep(Duration::from_nanos(10)).await;
945            }
946        })
947        .await?;
948        Ok(())
949    }
950}
951
952type SessionRequest = (
953    String,                          // query sql
954    oneshot::Sender<Result<String>>, // channel to send result back
955);
956
957/// A SQL session on the simulated client node.
958#[derive(Debug, Clone)]
959pub struct Session {
960    query_tx: mpsc::Sender<SessionRequest>,
961}
962
963impl Session {
964    /// Run the given SQLs on the session.
965    pub async fn run_all(&mut self, sqls: Vec<impl Into<String>>) -> Result<Vec<String>> {
966        let mut results = Vec::with_capacity(sqls.len());
967        for sql in sqls {
968            let result = self.run(sql).await?;
969            results.push(result);
970        }
971        Ok(results)
972    }
973
974    /// Run the given SQL query on the session.
975    pub async fn run(&mut self, sql: impl Into<String>) -> Result<String> {
976        let (tx, rx) = oneshot::channel();
977        self.query_tx.send((sql.into(), tx)).await?;
978        rx.await?
979    }
980
981    /// Run `FLUSH` on the session.
982    pub async fn flush(&mut self) -> Result<()> {
983        self.run("FLUSH").await?;
984        Ok(())
985    }
986
987    pub async fn is_arrangement_backfill_enabled(&mut self) -> Result<bool> {
988        let result = self.run("show streaming_use_arrangement_backfill").await?;
989        Ok(result == "true")
990    }
991}
992
993/// Options for killing nodes.
994#[derive(Debug, Clone, Copy, PartialEq)]
995pub struct KillOpts {
996    pub kill_rate: f32,
997    pub kill_meta: bool,
998    pub kill_frontend: bool,
999    pub kill_compute: bool,
1000    pub kill_compactor: bool,
1001    pub restart_delay_secs: u32,
1002}
1003
1004impl KillOpts {
1005    /// Killing all kind of nodes.
1006    pub const ALL: Self = KillOpts {
1007        kill_rate: 1.0,
1008        kill_meta: false, // FIXME: make it true when multiple meta nodes are supported
1009        kill_frontend: true,
1010        kill_compute: true,
1011        kill_compactor: true,
1012        restart_delay_secs: 20,
1013    };
1014    pub const ALL_FAST: Self = KillOpts {
1015        kill_rate: 1.0,
1016        kill_meta: false, // FIXME: make it true when multiple meta nodes are supported
1017        kill_frontend: true,
1018        kill_compute: true,
1019        kill_compactor: true,
1020        restart_delay_secs: 2,
1021    };
1022}