risingwave_simulation/
cluster.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![cfg_attr(not(madsim), allow(unused_imports))]
16
17use std::cmp::max;
18use std::collections::HashMap;
19use std::future::Future;
20use std::io::Write;
21use std::path::PathBuf;
22use std::sync::Arc;
23use std::time::Duration;
24
25use anyhow::{Result, anyhow, bail};
26use cfg_or_panic::cfg_or_panic;
27use clap::Parser;
28use futures::channel::{mpsc, oneshot};
29use futures::future::join_all;
30use futures::{SinkExt, StreamExt};
31use itertools::Itertools;
32#[cfg(madsim)]
33use madsim::runtime::{Handle, NodeHandle};
34use rand::Rng;
35use rand::seq::IteratorRandom;
36use risingwave_common::util::tokio_util::sync::CancellationToken;
37use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
38#[cfg(madsim)]
39use risingwave_object_store::object::sim::SimServer as ObjectStoreSimServer;
40use risingwave_pb::common::WorkerNode;
41use sqllogictest::AsyncDB;
42use tempfile::NamedTempFile;
43#[cfg(not(madsim))]
44use tokio::runtime::Handle;
45use uuid::Uuid;
46
47use crate::client::RisingWave;
48
49/// The path to the configuration file for the cluster.
50#[derive(Clone, Debug)]
51pub enum ConfigPath {
52    /// A regular path pointing to a external configuration file.
53    Regular(String),
54    /// A temporary path pointing to a configuration file created at runtime.
55    Temp(Arc<tempfile::TempPath>),
56}
57
58impl ConfigPath {
59    pub fn as_str(&self) -> &str {
60        match self {
61            ConfigPath::Regular(s) => s,
62            ConfigPath::Temp(p) => p.as_os_str().to_str().unwrap(),
63        }
64    }
65}
66
67/// RisingWave cluster configuration.
68#[derive(Debug, Clone)]
69pub struct Configuration {
70    /// The path to configuration file.
71    ///
72    /// Empty string means using the default config.
73    pub config_path: ConfigPath,
74
75    /// The number of frontend nodes.
76    pub frontend_nodes: usize,
77
78    /// The number of compute nodes.
79    pub compute_nodes: usize,
80
81    /// The number of meta nodes.
82    pub meta_nodes: usize,
83
84    /// The number of compactor nodes.
85    pub compactor_nodes: usize,
86
87    /// The number of CPU cores for each compute node.
88    ///
89    /// This determines `worker_node_parallelism`.
90    pub compute_node_cores: usize,
91
92    /// Queries to run per session.
93    pub per_session_queries: Arc<Vec<String>>,
94
95    /// Resource groups for compute nodes.
96    pub compute_resource_groups: HashMap<usize, String>,
97}
98
99impl Default for Configuration {
100    fn default() -> Self {
101        let config_path = {
102            let mut file =
103                tempfile::NamedTempFile::new().expect("failed to create temp config file");
104
105            let config_data = r#"
106[server]
107telemetry_enabled = false
108metrics_level = "Disabled"
109"#
110            .to_owned();
111            file.write_all(config_data.as_bytes())
112                .expect("failed to write config file");
113            file.into_temp_path()
114        };
115
116        Configuration {
117            config_path: ConfigPath::Temp(config_path.into()),
118            frontend_nodes: 1,
119            compute_nodes: 1,
120            meta_nodes: 1,
121            compactor_nodes: 1,
122            compute_node_cores: 1,
123            per_session_queries: vec![].into(),
124            compute_resource_groups: Default::default(),
125        }
126    }
127}
128
129impl Configuration {
130    /// Returns the configuration for scale test.
131    pub fn for_scale() -> Self {
132        // Embed the config file and create a temporary file at runtime. The file will be deleted
133        // automatically when it's dropped.
134        let config_path = {
135            let mut file =
136                tempfile::NamedTempFile::new().expect("failed to create temp config file");
137            file.write_all(include_bytes!("risingwave-scale.toml"))
138                .expect("failed to write config file");
139            file.into_temp_path()
140        };
141
142        Configuration {
143            config_path: ConfigPath::Temp(config_path.into()),
144            frontend_nodes: 2,
145            compute_nodes: 3,
146            meta_nodes: 1,
147            compactor_nodes: 2,
148            compute_node_cores: 2,
149            ..Default::default()
150        }
151    }
152
153    /// Provides a configuration for scale test which ensures that the arrangement backfill is disabled,
154    /// so table scan will use `no_shuffle`.
155    pub fn for_scale_no_shuffle() -> Self {
156        let mut conf = Self::for_scale();
157        conf.per_session_queries =
158            vec!["SET STREAMING_USE_ARRANGEMENT_BACKFILL = false;".into()].into();
159        conf
160    }
161
162    pub fn for_scale_shared_source() -> Self {
163        let mut conf = Self::for_scale();
164        conf.per_session_queries = vec!["SET STREAMING_USE_SHARED_SOURCE = true;".into()].into();
165        conf
166    }
167
168    pub fn for_auto_parallelism(
169        max_heartbeat_interval_secs: u64,
170        enable_auto_parallelism: bool,
171    ) -> Self {
172        let disable_automatic_parallelism_control = !enable_auto_parallelism;
173
174        let config_path = {
175            let mut file =
176                tempfile::NamedTempFile::new().expect("failed to create temp config file");
177
178            let config_data = format!(
179                r#"[meta]
180max_heartbeat_interval_secs = {max_heartbeat_interval_secs}
181disable_automatic_parallelism_control = {disable_automatic_parallelism_control}
182parallelism_control_trigger_first_delay_sec = 0
183parallelism_control_batch_size = 10
184parallelism_control_trigger_period_sec = 10
185
186[system]
187barrier_interval_ms = 250
188checkpoint_frequency = 4
189
190[server]
191telemetry_enabled = false
192metrics_level = "Disabled"
193"#
194            );
195            file.write_all(config_data.as_bytes())
196                .expect("failed to write config file");
197            file.into_temp_path()
198        };
199
200        Configuration {
201            config_path: ConfigPath::Temp(config_path.into()),
202            frontend_nodes: 1,
203            compute_nodes: 3,
204            meta_nodes: 1,
205            compactor_nodes: 1,
206            compute_node_cores: 2,
207            per_session_queries: vec![
208                "create view if not exists table_parallelism as select t.name, tf.parallelism from rw_tables t, rw_table_fragments tf where t.id = tf.table_id;".into(),
209                "create view if not exists mview_parallelism as select m.name, tf.parallelism from rw_materialized_views m, rw_table_fragments tf where m.id = tf.table_id;".into(),
210            ]
211                .into(),
212            ..Default::default()
213        }
214    }
215
216    pub fn for_default_parallelism(default_parallelism: usize) -> Self {
217        let config_path = {
218            let mut file =
219                tempfile::NamedTempFile::new().expect("failed to create temp config file");
220
221            let config_data = format!(
222                r#"
223[server]
224telemetry_enabled = false
225metrics_level = "Disabled"
226[meta]
227default_parallelism = {default_parallelism}
228"#
229            )
230            .to_owned();
231            file.write_all(config_data.as_bytes())
232                .expect("failed to write config file");
233            file.into_temp_path()
234        };
235
236        Configuration {
237            config_path: ConfigPath::Temp(config_path.into()),
238            frontend_nodes: 1,
239            compute_nodes: 1,
240            meta_nodes: 1,
241            compactor_nodes: 1,
242            compute_node_cores: default_parallelism * 2,
243            per_session_queries: vec![].into(),
244            compute_resource_groups: Default::default(),
245        }
246    }
247
248    /// Returns the config for backfill test.
249    pub fn for_backfill() -> Self {
250        // Embed the config file and create a temporary file at runtime. The file will be deleted
251        // automatically when it's dropped.
252        let config_path = {
253            let mut file =
254                tempfile::NamedTempFile::new().expect("failed to create temp config file");
255            file.write_all(include_bytes!("backfill.toml"))
256                .expect("failed to write config file");
257            file.into_temp_path()
258        };
259
260        Configuration {
261            config_path: ConfigPath::Temp(config_path.into()),
262            frontend_nodes: 1,
263            compute_nodes: 1,
264            meta_nodes: 1,
265            compactor_nodes: 1,
266            compute_node_cores: 4,
267            ..Default::default()
268        }
269    }
270
271    pub fn for_arrangement_backfill() -> Self {
272        // Embed the config file and create a temporary file at runtime. The file will be deleted
273        // automatically when it's dropped.
274        let config_path = {
275            let mut file =
276                tempfile::NamedTempFile::new().expect("failed to create temp config file");
277            file.write_all(include_bytes!("arrangement_backfill.toml"))
278                .expect("failed to write config file");
279            file.into_temp_path()
280        };
281
282        Configuration {
283            config_path: ConfigPath::Temp(config_path.into()),
284            frontend_nodes: 1,
285            compute_nodes: 3,
286            meta_nodes: 1,
287            compactor_nodes: 1,
288            compute_node_cores: 1,
289            per_session_queries: vec!["SET STREAMING_USE_ARRANGEMENT_BACKFILL = true;".into()]
290                .into(),
291            ..Default::default()
292        }
293    }
294
295    pub fn for_background_ddl() -> Self {
296        // Embed the config file and create a temporary file at runtime. The file will be deleted
297        // automatically when it's dropped.
298        let config_path = {
299            let mut file =
300                tempfile::NamedTempFile::new().expect("failed to create temp config file");
301            file.write_all(include_bytes!("background_ddl.toml"))
302                .expect("failed to write config file");
303            file.into_temp_path()
304        };
305
306        Configuration {
307            config_path: ConfigPath::Temp(config_path.into()),
308            // NOTE(kwannoel): The cancel test depends on `processlist`,
309            // which will cancel a stream job within the process.
310            // so we cannot have multiple frontend node, since a new session spawned
311            // to cancel the job could be routed to a different frontend node,
312            // in a different process.
313            frontend_nodes: 1,
314            compute_nodes: 3,
315            meta_nodes: 1,
316            compactor_nodes: 2,
317            compute_node_cores: 2,
318            ..Default::default()
319        }
320    }
321
322    pub fn enable_arrangement_backfill() -> Self {
323        let config_path = {
324            let mut file =
325                tempfile::NamedTempFile::new().expect("failed to create temp config file");
326            file.write_all(include_bytes!("disable_arrangement_backfill.toml"))
327                .expect("failed to write config file");
328            file.into_temp_path()
329        };
330        Configuration {
331            config_path: ConfigPath::Temp(config_path.into()),
332            frontend_nodes: 1,
333            compute_nodes: 1,
334            meta_nodes: 1,
335            compactor_nodes: 1,
336            compute_node_cores: 1,
337            per_session_queries: vec![].into(),
338            ..Default::default()
339        }
340    }
341}
342
343/// A risingwave cluster.
344///
345/// # Nodes
346///
347/// | Name             | IP            |
348/// | ---------------- | ------------- |
349/// | meta-x           | 192.168.1.x   |
350/// | frontend-x       | 192.168.2.x   |
351/// | compute-x        | 192.168.3.x   |
352/// | compactor-x      | 192.168.4.x   |
353/// | kafka-broker     | 192.168.11.1  |
354/// | kafka-producer   | 192.168.11.2  |
355/// | object_store_sim | 192.168.12.1  |
356/// | client           | 192.168.100.1 |
357/// | ctl              | 192.168.101.1 |
358pub struct Cluster {
359    config: Configuration,
360    handle: Handle,
361    #[cfg(madsim)]
362    pub(crate) client: NodeHandle,
363    #[cfg(madsim)]
364    pub(crate) ctl: NodeHandle,
365    #[cfg(madsim)]
366    pub(crate) sqlite_file_handle: NamedTempFile,
367}
368
369impl Cluster {
370    /// Start a RisingWave cluster for testing.
371    ///
372    /// This function should be called exactly once in a test.
373    #[cfg_or_panic(madsim)]
374    pub async fn start(conf: Configuration) -> Result<Self> {
375        use madsim::net::ipvs::*;
376
377        let handle = madsim::runtime::Handle::current();
378        println!("seed = {}", handle.seed());
379        println!("{:#?}", conf);
380
381        // TODO: support mutil meta nodes
382        assert_eq!(conf.meta_nodes, 1);
383
384        // setup DNS and load balance
385        let net = madsim::net::NetSim::current();
386        for i in 1..=conf.meta_nodes {
387            net.add_dns_record(
388                &format!("meta-{i}"),
389                format!("192.168.1.{i}").parse().unwrap(),
390            );
391        }
392
393        net.add_dns_record("frontend", "192.168.2.0".parse().unwrap());
394        net.add_dns_record("message_queue", "192.168.11.1".parse().unwrap());
395        net.global_ipvs().add_service(
396            ServiceAddr::Tcp("192.168.2.0:4566".into()),
397            Scheduler::RoundRobin,
398        );
399        for i in 1..=conf.frontend_nodes {
400            net.global_ipvs().add_server(
401                ServiceAddr::Tcp("192.168.2.0:4566".into()),
402                &format!("192.168.2.{i}:4566"),
403            )
404        }
405
406        // kafka broker
407        handle
408            .create_node()
409            .name("kafka-broker")
410            .ip("192.168.11.1".parse().unwrap())
411            .init(move || async move {
412                rdkafka::SimBroker::default()
413                    .serve("0.0.0.0:29092".parse().unwrap())
414                    .await
415            })
416            .build();
417
418        // object_store_sim
419        handle
420            .create_node()
421            .name("object_store_sim")
422            .ip("192.168.12.1".parse().unwrap())
423            .init(move || async move {
424                ObjectStoreSimServer::builder()
425                    .serve("0.0.0.0:9301".parse().unwrap())
426                    .await
427            })
428            .build();
429
430        // wait for the service to be ready
431        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
432
433        let mut meta_addrs = vec![];
434        for i in 1..=conf.meta_nodes {
435            meta_addrs.push(format!("http://meta-{i}:5690"));
436        }
437        unsafe { std::env::set_var("RW_META_ADDR", meta_addrs.join(",")) };
438
439        let sqlite_file_handle: NamedTempFile = NamedTempFile::new().unwrap();
440        let file_path = sqlite_file_handle.path().display().to_string();
441        tracing::info!(?file_path, "sqlite_file_path");
442        let sql_endpoint = format!("sqlite://{}?mode=rwc", file_path);
443        let backend_args = vec!["--backend", "sql", "--sql-endpoint", &sql_endpoint];
444
445        // meta node
446        for i in 1..=conf.meta_nodes {
447            let args = [
448                "meta-node",
449                "--config-path",
450                conf.config_path.as_str(),
451                "--listen-addr",
452                "0.0.0.0:5690",
453                "--advertise-addr",
454                &format!("meta-{i}:5690"),
455                "--state-store",
456                "hummock+sim://hummockadmin:hummockadmin@192.168.12.1:9301/hummock001",
457                "--data-directory",
458                "hummock_001",
459                "--temp-secret-file-dir",
460                &format!("./secrets/meta-{i}"),
461            ];
462            let args = args.into_iter().chain(backend_args.clone().into_iter());
463            let opts = risingwave_meta_node::MetaNodeOpts::parse_from(args);
464            handle
465                .create_node()
466                .name(format!("meta-{i}"))
467                .ip([192, 168, 1, i as u8].into())
468                .init(move || {
469                    risingwave_meta_node::start(
470                        opts.clone(),
471                        CancellationToken::new(), // dummy
472                    )
473                })
474                .build();
475        }
476
477        // wait for the service to be ready
478        tokio::time::sleep(std::time::Duration::from_secs(15)).await;
479
480        // frontend node
481        for i in 1..=conf.frontend_nodes {
482            let opts = risingwave_frontend::FrontendOpts::parse_from([
483                "frontend-node",
484                "--config-path",
485                conf.config_path.as_str(),
486                "--listen-addr",
487                "0.0.0.0:4566",
488                "--health-check-listener-addr",
489                "0.0.0.0:6786",
490                "--advertise-addr",
491                &format!("192.168.2.{i}:4566"),
492                "--temp-secret-file-dir",
493                &format!("./secrets/frontend-{i}"),
494            ]);
495            handle
496                .create_node()
497                .name(format!("frontend-{i}"))
498                .ip([192, 168, 2, i as u8].into())
499                .init(move || {
500                    risingwave_frontend::start(
501                        opts.clone(),
502                        CancellationToken::new(), // dummy
503                    )
504                })
505                .build();
506        }
507
508        // compute node
509        for i in 1..=conf.compute_nodes {
510            let opts = risingwave_compute::ComputeNodeOpts::parse_from([
511                "compute-node",
512                "--config-path",
513                conf.config_path.as_str(),
514                "--listen-addr",
515                "0.0.0.0:5688",
516                "--advertise-addr",
517                &format!("192.168.3.{i}:5688"),
518                "--total-memory-bytes",
519                "6979321856",
520                "--parallelism",
521                &conf.compute_node_cores.to_string(),
522                "--temp-secret-file-dir",
523                &format!("./secrets/compute-{i}"),
524                "--resource-group",
525                &conf
526                    .compute_resource_groups
527                    .get(&i)
528                    .cloned()
529                    .unwrap_or(DEFAULT_RESOURCE_GROUP.to_string()),
530            ]);
531            handle
532                .create_node()
533                .name(format!("compute-{i}"))
534                .ip([192, 168, 3, i as u8].into())
535                .cores(conf.compute_node_cores)
536                .init(move || {
537                    risingwave_compute::start(
538                        opts.clone(),
539                        CancellationToken::new(), // dummy
540                    )
541                })
542                .build();
543        }
544
545        // compactor node
546        for i in 1..=conf.compactor_nodes {
547            let opts = risingwave_compactor::CompactorOpts::parse_from([
548                "compactor-node",
549                "--config-path",
550                conf.config_path.as_str(),
551                "--listen-addr",
552                "0.0.0.0:6660",
553                "--advertise-addr",
554                &format!("192.168.4.{i}:6660"),
555            ]);
556            handle
557                .create_node()
558                .name(format!("compactor-{i}"))
559                .ip([192, 168, 4, i as u8].into())
560                .init(move || {
561                    risingwave_compactor::start(
562                        opts.clone(),
563                        CancellationToken::new(), // dummy
564                    )
565                })
566                .build();
567        }
568
569        // wait for the service to be ready
570        tokio::time::sleep(Duration::from_secs(15)).await;
571
572        // client
573        let client = handle
574            .create_node()
575            .name("client")
576            .ip([192, 168, 100, 1].into())
577            .build();
578
579        // risectl
580        let ctl = handle
581            .create_node()
582            .name("ctl")
583            .ip([192, 168, 101, 1].into())
584            .build();
585
586        Ok(Self {
587            config: conf,
588            handle,
589            client,
590            ctl,
591            sqlite_file_handle,
592        })
593    }
594
595    #[cfg_or_panic(madsim)]
596    fn per_session_queries(&self) -> Arc<Vec<String>> {
597        self.config.per_session_queries.clone()
598    }
599
600    /// Start a SQL session on the client node.
601    #[cfg_or_panic(madsim)]
602    pub fn start_session(&mut self) -> Session {
603        let (query_tx, mut query_rx) = mpsc::channel::<SessionRequest>(0);
604        let per_session_queries = self.per_session_queries();
605
606        self.client.spawn(async move {
607            let mut client = RisingWave::connect("frontend".into(), "dev".into()).await?;
608
609            for sql in per_session_queries.as_ref() {
610                client.run(sql).await?;
611            }
612            drop(per_session_queries);
613
614            while let Some((sql, tx)) = query_rx.next().await {
615                let result = client
616                    .run(&sql)
617                    .await
618                    .map(|output| match output {
619                        sqllogictest::DBOutput::Rows { rows, .. } => rows
620                            .into_iter()
621                            .map(|row| {
622                                row.into_iter()
623                                    .map(|v| v.to_string())
624                                    .collect::<Vec<_>>()
625                                    .join(" ")
626                            })
627                            .collect::<Vec<_>>()
628                            .join("\n"),
629                        _ => "".to_string(),
630                    })
631                    .map_err(Into::into);
632
633                let _ = tx.send(result);
634            }
635
636            Ok::<_, anyhow::Error>(())
637        });
638
639        Session { query_tx }
640    }
641
642    /// Run a SQL query on a **new** session of the client node.
643    ///
644    /// This is a convenience method that creates a new session and runs the query on it. If you
645    /// want to run multiple queries on the same session, use `start_session` and `Session::run`.
646    pub async fn run(&mut self, sql: impl Into<String>) -> Result<String> {
647        self.start_session().run(sql).await
648    }
649
650    /// Run a future on the client node.
651    #[cfg_or_panic(madsim)]
652    pub async fn run_on_client<F>(&self, future: F) -> F::Output
653    where
654        F: Future + Send + 'static,
655        F::Output: Send + 'static,
656    {
657        self.client.spawn(future).await.unwrap()
658    }
659
660    pub async fn get_random_worker_nodes(&self, n: usize) -> Result<Vec<WorkerNode>> {
661        let worker_nodes = self.get_cluster_info().await?.get_worker_nodes().clone();
662        if worker_nodes.len() < n {
663            return Err(anyhow!("cannot remove more nodes than present"));
664        }
665        let rand_nodes = worker_nodes
666            .iter()
667            .choose_multiple(&mut rand::rng(), n)
668            .to_vec();
669        Ok(rand_nodes.iter().cloned().cloned().collect_vec())
670    }
671
672    /// Run a SQL query from the client and wait until the condition is met.
673    pub async fn wait_until(
674        &mut self,
675        sql: impl Into<String> + Clone,
676        mut p: impl FnMut(&str) -> bool,
677        interval: Duration,
678        timeout: Duration,
679    ) -> Result<String> {
680        let fut = async move {
681            let mut interval = tokio::time::interval(interval);
682            loop {
683                interval.tick().await;
684                let result = self.run(sql.clone()).await?;
685                if p(&result) {
686                    return Ok::<_, anyhow::Error>(result);
687                }
688            }
689        };
690
691        match tokio::time::timeout(timeout, fut).await {
692            Ok(r) => Ok(r?),
693            Err(_) => bail!("wait_until timeout"),
694        }
695    }
696
697    /// Run a SQL query from the client and wait until the return result is not empty.
698    pub async fn wait_until_non_empty(
699        &mut self,
700        sql: &str,
701        interval: Duration,
702        timeout: Duration,
703    ) -> Result<String> {
704        self.wait_until(sql, |r| !r.trim().is_empty(), interval, timeout)
705            .await
706    }
707
708    /// Generate a list of random worker nodes to kill by `opts`, then call `kill_nodes` to kill and
709    /// restart them.
710    pub async fn kill_node(&self, opts: &KillOpts) {
711        let mut nodes = vec![];
712        if opts.kill_meta {
713            let rand = rand::rng().random_range(0..3);
714            for i in 1..=self.config.meta_nodes {
715                match rand {
716                    0 => break,                                     // no killed
717                    1 => {}                                         // all killed
718                    _ if !rand::rng().random_bool(0.5) => continue, // random killed
719                    _ => {}
720                }
721                nodes.push(format!("meta-{}", i));
722            }
723            // don't kill all meta services
724            if nodes.len() == self.config.meta_nodes {
725                nodes.truncate(1);
726            }
727        }
728        if opts.kill_frontend {
729            let rand = rand::rng().random_range(0..3);
730            for i in 1..=self.config.frontend_nodes {
731                match rand {
732                    0 => break,                                     // no killed
733                    1 => {}                                         // all killed
734                    _ if !rand::rng().random_bool(0.5) => continue, // random killed
735                    _ => {}
736                }
737                nodes.push(format!("frontend-{}", i));
738            }
739        }
740        if opts.kill_compute {
741            let rand = rand::rng().random_range(0..3);
742            for i in 1..=self.config.compute_nodes {
743                match rand {
744                    0 => break,                                     // no killed
745                    1 => {}                                         // all killed
746                    _ if !rand::rng().random_bool(0.5) => continue, // random killed
747                    _ => {}
748                }
749                nodes.push(format!("compute-{}", i));
750            }
751        }
752        if opts.kill_compactor {
753            let rand = rand::rng().random_range(0..3);
754            for i in 1..=self.config.compactor_nodes {
755                match rand {
756                    0 => break,                                     // no killed
757                    1 => {}                                         // all killed
758                    _ if !rand::rng().random_bool(0.5) => continue, // random killed
759                    _ => {}
760                }
761                nodes.push(format!("compactor-{}", i));
762            }
763        }
764
765        self.kill_nodes(nodes, opts.restart_delay_secs).await
766    }
767
768    /// Kill the given nodes by their names and restart them in 2s + `restart_delay_secs` with a
769    /// probability of 0.1.
770    #[cfg_or_panic(madsim)]
771    pub async fn kill_nodes(
772        &self,
773        nodes: impl IntoIterator<Item = impl AsRef<str>>,
774        restart_delay_secs: u32,
775    ) {
776        join_all(nodes.into_iter().map(|name| async move {
777            let name = name.as_ref();
778            let t = rand::rng().random_range(Duration::from_secs(0)..Duration::from_secs(1));
779            tokio::time::sleep(t).await;
780            tracing::info!("kill {name}");
781            Handle::current().kill(name);
782
783            let mut t = rand::rng().random_range(Duration::from_secs(0)..Duration::from_secs(1));
784            // has a small chance to restart after a long time
785            // so that the node is expired and removed from the cluster
786            if rand::rng().random_bool(0.1) {
787                // max_heartbeat_interval_secs = 15
788                t += Duration::from_secs(restart_delay_secs as u64);
789            }
790            tokio::time::sleep(t).await;
791            tracing::info!("restart {name}");
792            Handle::current().restart(name);
793        }))
794        .await;
795    }
796
797    #[cfg_or_panic(madsim)]
798    pub async fn kill_nodes_and_restart(
799        &self,
800        nodes: impl IntoIterator<Item = impl AsRef<str>>,
801        restart_delay_secs: u32,
802    ) {
803        join_all(nodes.into_iter().map(|name| async move {
804            let name = name.as_ref();
805            tracing::info!("kill {name}");
806            Handle::current().kill(name);
807            tokio::time::sleep(Duration::from_secs(restart_delay_secs as u64)).await;
808            tracing::info!("restart {name}");
809            Handle::current().restart(name);
810        }))
811        .await;
812    }
813
814    #[cfg_or_panic(madsim)]
815    pub async fn simple_kill_nodes(&self, nodes: impl IntoIterator<Item = impl AsRef<str>>) {
816        join_all(nodes.into_iter().map(|name| async move {
817            let name = name.as_ref();
818            tracing::info!("kill {name}");
819            Handle::current().kill(name);
820        }))
821        .await;
822    }
823
824    #[cfg_or_panic(madsim)]
825    pub async fn simple_restart_nodes(&self, nodes: impl IntoIterator<Item = impl AsRef<str>>) {
826        join_all(nodes.into_iter().map(|name| async move {
827            let name = name.as_ref();
828            tracing::info!("restart {name}");
829            Handle::current().restart(name);
830        }))
831        .await;
832    }
833
834    /// Create a node for kafka producer and prepare data.
835    #[cfg_or_panic(madsim)]
836    pub async fn create_kafka_producer(&self, datadir: &str) {
837        self.handle
838            .create_node()
839            .name("kafka-producer")
840            .ip("192.168.11.2".parse().unwrap())
841            .build()
842            .spawn(crate::kafka::producer(
843                "192.168.11.1:29092",
844                datadir.to_string(),
845            ))
846            .await
847            .unwrap();
848    }
849
850    /// Create a kafka topic.
851    #[cfg_or_panic(madsim)]
852    pub fn create_kafka_topics(&self, topics: HashMap<String, i32>) {
853        self.handle
854            .create_node()
855            .name("kafka-topic-create")
856            .ip("192.168.11.3".parse().unwrap())
857            .build()
858            .spawn(crate::kafka::create_topics("192.168.11.1:29092", topics));
859    }
860
861    pub fn config(&self) -> Configuration {
862        self.config.clone()
863    }
864
865    pub fn handle(&self) -> &Handle {
866        &self.handle
867    }
868
869    /// Graceful shutdown all RisingWave nodes.
870    #[cfg_or_panic(madsim)]
871    pub async fn graceful_shutdown(&self) {
872        let mut nodes = vec![];
873        let mut metas = vec![];
874        for i in 1..=self.config.meta_nodes {
875            metas.push(format!("meta-{i}"));
876        }
877        for i in 1..=self.config.frontend_nodes {
878            nodes.push(format!("frontend-{i}"));
879        }
880        for i in 1..=self.config.compute_nodes {
881            nodes.push(format!("compute-{i}"));
882        }
883        for i in 1..=self.config.compactor_nodes {
884            nodes.push(format!("compactor-{i}"));
885        }
886
887        tracing::info!("graceful shutdown");
888        let waiting_time = Duration::from_secs(10);
889        // shutdown frontends, computes, compactors
890        for node in &nodes {
891            self.handle.send_ctrl_c(node);
892        }
893        tokio::time::sleep(waiting_time).await;
894        // shutdown metas
895        for meta in &metas {
896            self.handle.send_ctrl_c(meta);
897        }
898        tokio::time::sleep(waiting_time).await;
899
900        // check all nodes are exited
901        for node in nodes.iter().chain(metas.iter()) {
902            if !self.handle.is_exit(node) {
903                panic!("failed to graceful shutdown {node} in {waiting_time:?}");
904            }
905        }
906    }
907
908    pub async fn wait_for_recovery(&mut self) -> Result<()> {
909        let timeout = Duration::from_secs(200);
910        let mut session = self.start_session();
911        tokio::time::timeout(timeout, async {
912            loop {
913                if let Ok(result) = session.run("select rw_recovery_status()").await
914                    && result == "RUNNING"
915                {
916                    break;
917                }
918                tokio::time::sleep(Duration::from_nanos(10)).await;
919            }
920        })
921        .await?;
922        Ok(())
923    }
924
925    /// This function only works if all actors in your cluster are following adaptive scaling.
926    pub async fn wait_for_scale(&mut self, parallelism: usize) -> Result<()> {
927        let timeout = Duration::from_secs(200);
928        let mut session = self.start_session();
929        tokio::time::timeout(timeout, async {
930            loop {
931                let parallelism_sql = format!(
932                    "select count(parallelism) filter (where parallelism != {parallelism})\
933                from (select count(*) parallelism from rw_actors group by fragment_id);"
934                );
935                if let Ok(result) = session.run(&parallelism_sql).await
936                    && result == "0"
937                {
938                    break;
939                }
940                tokio::time::sleep(Duration::from_nanos(10)).await;
941            }
942        })
943        .await?;
944        Ok(())
945    }
946}
947
948type SessionRequest = (
949    String,                          // query sql
950    oneshot::Sender<Result<String>>, // channel to send result back
951);
952
953/// A SQL session on the simulated client node.
954#[derive(Debug, Clone)]
955pub struct Session {
956    query_tx: mpsc::Sender<SessionRequest>,
957}
958
959impl Session {
960    /// Run the given SQLs on the session.
961    pub async fn run_all(&mut self, sqls: Vec<impl Into<String>>) -> Result<Vec<String>> {
962        let mut results = Vec::with_capacity(sqls.len());
963        for sql in sqls {
964            let result = self.run(sql).await?;
965            results.push(result);
966        }
967        Ok(results)
968    }
969
970    /// Run the given SQL query on the session.
971    pub async fn run(&mut self, sql: impl Into<String>) -> Result<String> {
972        let (tx, rx) = oneshot::channel();
973        self.query_tx.send((sql.into(), tx)).await?;
974        rx.await?
975    }
976
977    /// Run `FLUSH` on the session.
978    pub async fn flush(&mut self) -> Result<()> {
979        self.run("FLUSH").await?;
980        Ok(())
981    }
982
983    pub async fn is_arrangement_backfill_enabled(&mut self) -> Result<bool> {
984        let result = self.run("show streaming_use_arrangement_backfill").await?;
985        Ok(result == "true")
986    }
987}
988
989/// Options for killing nodes.
990#[derive(Debug, Clone, Copy, PartialEq)]
991pub struct KillOpts {
992    pub kill_rate: f32,
993    pub kill_meta: bool,
994    pub kill_frontend: bool,
995    pub kill_compute: bool,
996    pub kill_compactor: bool,
997    pub restart_delay_secs: u32,
998}
999
1000impl KillOpts {
1001    /// Killing all kind of nodes.
1002    pub const ALL: Self = KillOpts {
1003        kill_rate: 1.0,
1004        kill_meta: false, // FIXME: make it true when multiple meta nodes are supported
1005        kill_frontend: true,
1006        kill_compute: true,
1007        kill_compactor: true,
1008        restart_delay_secs: 20,
1009    };
1010    pub const ALL_FAST: Self = KillOpts {
1011        kill_rate: 1.0,
1012        kill_meta: false, // FIXME: make it true when multiple meta nodes are supported
1013        kill_frontend: true,
1014        kill_compute: true,
1015        kill_compactor: true,
1016        restart_delay_secs: 2,
1017    };
1018}