risingwave_simulation/
cluster.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![cfg_attr(not(madsim), allow(unused_imports))]
16
17use std::cmp::max;
18use std::collections::HashMap;
19use std::future::Future;
20use std::io::Write;
21use std::path::PathBuf;
22use std::sync::Arc;
23use std::time::Duration;
24
25use anyhow::{Result, anyhow, bail};
26use cfg_or_panic::cfg_or_panic;
27use clap::Parser;
28use futures::channel::{mpsc, oneshot};
29use futures::future::join_all;
30use futures::{SinkExt, StreamExt};
31use itertools::Itertools;
32#[cfg(madsim)]
33use madsim::runtime::{Handle, NodeHandle};
34use rand::Rng;
35use rand::seq::IteratorRandom;
36use risingwave_common::util::tokio_util::sync::CancellationToken;
37use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
38#[cfg(madsim)]
39use risingwave_object_store::object::sim::SimServer as ObjectStoreSimServer;
40use risingwave_pb::common::WorkerNode;
41use sqllogictest::AsyncDB;
42use tempfile::NamedTempFile;
43#[cfg(not(madsim))]
44use tokio::runtime::Handle;
45use uuid::Uuid;
46
47use crate::client::RisingWave;
48
49/// The path to the configuration file for the cluster.
50#[derive(Clone, Debug)]
51pub enum ConfigPath {
52    /// A regular path pointing to a external configuration file.
53    Regular(String),
54    /// A temporary path pointing to a configuration file created at runtime.
55    Temp(Arc<tempfile::TempPath>),
56}
57
58impl ConfigPath {
59    pub fn as_str(&self) -> &str {
60        match self {
61            ConfigPath::Regular(s) => s,
62            ConfigPath::Temp(p) => p.as_os_str().to_str().unwrap(),
63        }
64    }
65}
66
67/// RisingWave cluster configuration.
68#[derive(Debug, Clone)]
69pub struct Configuration {
70    /// The path to configuration file.
71    ///
72    /// Empty string means using the default config.
73    pub config_path: ConfigPath,
74
75    /// The number of frontend nodes.
76    pub frontend_nodes: usize,
77
78    /// The number of compute nodes.
79    pub compute_nodes: usize,
80
81    /// The number of meta nodes.
82    pub meta_nodes: usize,
83
84    /// The number of compactor nodes.
85    pub compactor_nodes: usize,
86
87    /// The number of CPU cores for each compute node.
88    ///
89    /// This determines `worker_node_parallelism`.
90    pub compute_node_cores: usize,
91
92    /// Queries to run per session.
93    pub per_session_queries: Arc<Vec<String>>,
94
95    /// Resource groups for compute nodes.
96    pub compute_resource_groups: HashMap<usize, String>,
97}
98
99impl Default for Configuration {
100    fn default() -> Self {
101        let config_path = {
102            let mut file =
103                tempfile::NamedTempFile::new().expect("failed to create temp config file");
104
105            let config_data = r#"
106[server]
107telemetry_enabled = false
108metrics_level = "Disabled"
109"#
110            .to_owned();
111            file.write_all(config_data.as_bytes())
112                .expect("failed to write config file");
113            file.into_temp_path()
114        };
115
116        Configuration {
117            config_path: ConfigPath::Temp(config_path.into()),
118            frontend_nodes: 1,
119            compute_nodes: 1,
120            meta_nodes: 1,
121            compactor_nodes: 1,
122            compute_node_cores: 1,
123            per_session_queries: vec![].into(),
124            compute_resource_groups: Default::default(),
125        }
126    }
127}
128
129impl Configuration {
130    /// Returns the configuration for scale test.
131    pub fn for_scale() -> Self {
132        // Embed the config file and create a temporary file at runtime. The file will be deleted
133        // automatically when it's dropped.
134        let config_path = {
135            let mut file =
136                tempfile::NamedTempFile::new().expect("failed to create temp config file");
137            file.write_all(include_bytes!("risingwave-scale.toml"))
138                .expect("failed to write config file");
139            file.into_temp_path()
140        };
141
142        Configuration {
143            config_path: ConfigPath::Temp(config_path.into()),
144            frontend_nodes: 2,
145            compute_nodes: 3,
146            meta_nodes: 1,
147            compactor_nodes: 2,
148            compute_node_cores: 2,
149            ..Default::default()
150        }
151    }
152
153    /// Provides a configuration for scale test which ensures that the arrangement backfill is disabled,
154    /// so table scan will use `no_shuffle`.
155    pub fn for_scale_no_shuffle() -> Self {
156        let mut conf = Self::for_scale();
157        conf.per_session_queries =
158            vec!["SET STREAMING_USE_ARRANGEMENT_BACKFILL = false;".into()].into();
159        conf
160    }
161
162    pub fn for_scale_shared_source() -> Self {
163        let mut conf = Self::for_scale();
164        conf.per_session_queries = vec!["SET STREAMING_USE_SHARED_SOURCE = true;".into()].into();
165        conf
166    }
167
168    pub fn for_auto_parallelism(
169        max_heartbeat_interval_secs: u64,
170        enable_auto_parallelism: bool,
171    ) -> Self {
172        let disable_automatic_parallelism_control = !enable_auto_parallelism;
173
174        let config_path = {
175            let mut file =
176                tempfile::NamedTempFile::new().expect("failed to create temp config file");
177
178            let config_data = format!(
179                r#"[meta]
180max_heartbeat_interval_secs = {max_heartbeat_interval_secs}
181disable_automatic_parallelism_control = {disable_automatic_parallelism_control}
182parallelism_control_trigger_first_delay_sec = 0
183parallelism_control_batch_size = 10
184parallelism_control_trigger_period_sec = 10
185
186[system]
187barrier_interval_ms = 250
188checkpoint_frequency = 4
189
190[server]
191telemetry_enabled = false
192metrics_level = "Disabled"
193"#
194            );
195            file.write_all(config_data.as_bytes())
196                .expect("failed to write config file");
197            file.into_temp_path()
198        };
199
200        Configuration {
201            config_path: ConfigPath::Temp(config_path.into()),
202            frontend_nodes: 1,
203            compute_nodes: 3,
204            meta_nodes: 1,
205            compactor_nodes: 1,
206            compute_node_cores: 2,
207            per_session_queries: vec![
208                "create view if not exists table_parallelism as select t.name, tf.parallelism from rw_tables t, rw_table_fragments tf where t.id = tf.table_id;".into(),
209                "create view if not exists mview_parallelism as select m.name, tf.parallelism from rw_materialized_views m, rw_table_fragments tf where m.id = tf.table_id;".into(),
210            ]
211                .into(),
212            ..Default::default()
213        }
214    }
215
216    pub fn for_default_parallelism(default_parallelism: usize) -> Self {
217        let config_path = {
218            let mut file =
219                tempfile::NamedTempFile::new().expect("failed to create temp config file");
220
221            let config_data = format!(
222                r#"
223[server]
224telemetry_enabled = false
225metrics_level = "Disabled"
226[meta]
227default_parallelism = {default_parallelism}
228"#
229            )
230            .to_owned();
231            file.write_all(config_data.as_bytes())
232                .expect("failed to write config file");
233            file.into_temp_path()
234        };
235
236        Configuration {
237            config_path: ConfigPath::Temp(config_path.into()),
238            frontend_nodes: 1,
239            compute_nodes: 1,
240            meta_nodes: 1,
241            compactor_nodes: 1,
242            compute_node_cores: default_parallelism * 2,
243            per_session_queries: vec![].into(),
244            compute_resource_groups: Default::default(),
245        }
246    }
247
248    /// Returns the config for backfill test.
249    pub fn for_backfill() -> Self {
250        // Embed the config file and create a temporary file at runtime. The file will be deleted
251        // automatically when it's dropped.
252        let config_path = {
253            let mut file =
254                tempfile::NamedTempFile::new().expect("failed to create temp config file");
255            file.write_all(include_bytes!("backfill.toml"))
256                .expect("failed to write config file");
257            file.into_temp_path()
258        };
259
260        Configuration {
261            config_path: ConfigPath::Temp(config_path.into()),
262            frontend_nodes: 1,
263            compute_nodes: 1,
264            meta_nodes: 1,
265            compactor_nodes: 1,
266            compute_node_cores: 4,
267            ..Default::default()
268        }
269    }
270
271    pub fn for_arrangement_backfill() -> Self {
272        // Embed the config file and create a temporary file at runtime. The file will be deleted
273        // automatically when it's dropped.
274        let config_path = {
275            let mut file =
276                tempfile::NamedTempFile::new().expect("failed to create temp config file");
277            file.write_all(include_bytes!("arrangement_backfill.toml"))
278                .expect("failed to write config file");
279            file.into_temp_path()
280        };
281
282        Configuration {
283            config_path: ConfigPath::Temp(config_path.into()),
284            frontend_nodes: 1,
285            compute_nodes: 3,
286            meta_nodes: 1,
287            compactor_nodes: 1,
288            compute_node_cores: 1,
289            per_session_queries: vec!["SET STREAMING_USE_ARRANGEMENT_BACKFILL = true;".into()]
290                .into(),
291            ..Default::default()
292        }
293    }
294
295    pub fn for_background_ddl() -> Self {
296        // Embed the config file and create a temporary file at runtime. The file will be deleted
297        // automatically when it's dropped.
298        let config_path = {
299            let mut file =
300                tempfile::NamedTempFile::new().expect("failed to create temp config file");
301            file.write_all(include_bytes!("background_ddl.toml"))
302                .expect("failed to write config file");
303            file.into_temp_path()
304        };
305
306        Configuration {
307            config_path: ConfigPath::Temp(config_path.into()),
308            // NOTE(kwannoel): The cancel test depends on `processlist`,
309            // which will cancel a stream job within the process.
310            // so we cannot have multiple frontend node, since a new session spawned
311            // to cancel the job could be routed to a different frontend node,
312            // in a different process.
313            frontend_nodes: 1,
314            compute_nodes: 3,
315            meta_nodes: 1,
316            compactor_nodes: 2,
317            compute_node_cores: 2,
318            ..Default::default()
319        }
320    }
321
322    pub fn enable_arrangement_backfill() -> Self {
323        let config_path = {
324            let mut file =
325                tempfile::NamedTempFile::new().expect("failed to create temp config file");
326            file.write_all(include_bytes!("disable_arrangement_backfill.toml"))
327                .expect("failed to write config file");
328            file.into_temp_path()
329        };
330        Configuration {
331            config_path: ConfigPath::Temp(config_path.into()),
332            frontend_nodes: 1,
333            compute_nodes: 1,
334            meta_nodes: 1,
335            compactor_nodes: 1,
336            compute_node_cores: 1,
337            per_session_queries: vec![].into(),
338            ..Default::default()
339        }
340    }
341}
342
343/// A risingwave cluster.
344///
345/// # Nodes
346///
347/// | Name             | IP            |
348/// | ---------------- | ------------- |
349/// | meta-x           | 192.168.1.x   |
350/// | frontend-x       | 192.168.2.x   |
351/// | compute-x        | 192.168.3.x   |
352/// | compactor-x      | 192.168.4.x   |
353/// | kafka-broker     | 192.168.11.1  |
354/// | kafka-producer   | 192.168.11.2  |
355/// | object_store_sim | 192.168.12.1  |
356/// | client           | 192.168.100.1 |
357/// | ctl              | 192.168.101.1 |
358pub struct Cluster {
359    config: Configuration,
360    handle: Handle,
361    #[cfg(madsim)]
362    pub(crate) client: NodeHandle,
363    #[cfg(madsim)]
364    pub(crate) ctl: NodeHandle,
365    #[cfg(madsim)]
366    pub(crate) sqlite_file_handle: NamedTempFile,
367}
368
369impl Cluster {
370    /// Start a RisingWave cluster for testing.
371    ///
372    /// This function should be called exactly once in a test.
373    #[cfg_or_panic(madsim)]
374    pub async fn start(conf: Configuration) -> Result<Self> {
375        use madsim::net::ipvs::*;
376
377        let handle = madsim::runtime::Handle::current();
378        println!("seed = {}", handle.seed());
379        println!("{:#?}", conf);
380
381        // TODO: support mutil meta nodes
382        assert_eq!(conf.meta_nodes, 1);
383
384        // setup DNS and load balance
385        let net = madsim::net::NetSim::current();
386        for i in 1..=conf.meta_nodes {
387            net.add_dns_record(
388                &format!("meta-{i}"),
389                format!("192.168.1.{i}").parse().unwrap(),
390            );
391        }
392
393        net.add_dns_record("frontend", "192.168.2.0".parse().unwrap());
394        net.add_dns_record("message_queue", "192.168.11.1".parse().unwrap());
395        net.global_ipvs().add_service(
396            ServiceAddr::Tcp("192.168.2.0:4566".into()),
397            Scheduler::RoundRobin,
398        );
399        for i in 1..=conf.frontend_nodes {
400            net.global_ipvs().add_server(
401                ServiceAddr::Tcp("192.168.2.0:4566".into()),
402                &format!("192.168.2.{i}:4566"),
403            )
404        }
405
406        // kafka broker
407        handle
408            .create_node()
409            .name("kafka-broker")
410            .ip("192.168.11.1".parse().unwrap())
411            .init(move || async move {
412                rdkafka::SimBroker::default()
413                    .serve("0.0.0.0:29092".parse().unwrap())
414                    .await
415            })
416            .build();
417
418        // object_store_sim
419        handle
420            .create_node()
421            .name("object_store_sim")
422            .ip("192.168.12.1".parse().unwrap())
423            .init(move || async move {
424                ObjectStoreSimServer::builder()
425                    .serve("0.0.0.0:9301".parse().unwrap())
426                    .await
427            })
428            .build();
429
430        // wait for the service to be ready
431        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
432
433        let mut meta_addrs = vec![];
434        for i in 1..=conf.meta_nodes {
435            meta_addrs.push(format!("http://meta-{i}:5690"));
436        }
437        std::env::set_var("RW_META_ADDR", meta_addrs.join(","));
438
439        let sqlite_file_handle: NamedTempFile = NamedTempFile::new().unwrap();
440        let file_path = sqlite_file_handle.path().display().to_string();
441        tracing::info!(?file_path, "sqlite_file_path");
442        let sql_endpoint = format!("sqlite://{}?mode=rwc", file_path);
443        let backend_args = vec!["--backend", "sql", "--sql-endpoint", &sql_endpoint];
444
445        // meta node
446        for i in 1..=conf.meta_nodes {
447            let args = [
448                "meta-node",
449                "--config-path",
450                conf.config_path.as_str(),
451                "--listen-addr",
452                "0.0.0.0:5690",
453                "--advertise-addr",
454                &format!("meta-{i}:5690"),
455                "--state-store",
456                "hummock+sim://hummockadmin:hummockadmin@192.168.12.1:9301/hummock001",
457                "--data-directory",
458                "hummock_001",
459                "--temp-secret-file-dir",
460                &format!("./secrets/meta-{i}"),
461            ];
462            let args = args.into_iter().chain(backend_args.clone().into_iter());
463            let opts = risingwave_meta_node::MetaNodeOpts::parse_from(args);
464            handle
465                .create_node()
466                .name(format!("meta-{i}"))
467                .ip([192, 168, 1, i as u8].into())
468                .init(move || {
469                    risingwave_meta_node::start(
470                        opts.clone(),
471                        CancellationToken::new(), // dummy
472                    )
473                })
474                .build();
475        }
476
477        // wait for the service to be ready
478        tokio::time::sleep(std::time::Duration::from_secs(15)).await;
479
480        // frontend node
481        for i in 1..=conf.frontend_nodes {
482            let opts = risingwave_frontend::FrontendOpts::parse_from([
483                "frontend-node",
484                "--config-path",
485                conf.config_path.as_str(),
486                "--listen-addr",
487                "0.0.0.0:4566",
488                "--advertise-addr",
489                &format!("192.168.2.{i}:4566"),
490                "--temp-secret-file-dir",
491                &format!("./secrets/frontend-{i}"),
492            ]);
493            handle
494                .create_node()
495                .name(format!("frontend-{i}"))
496                .ip([192, 168, 2, i as u8].into())
497                .init(move || {
498                    risingwave_frontend::start(
499                        opts.clone(),
500                        CancellationToken::new(), // dummy
501                    )
502                })
503                .build();
504        }
505
506        // compute node
507        for i in 1..=conf.compute_nodes {
508            let opts = risingwave_compute::ComputeNodeOpts::parse_from([
509                "compute-node",
510                "--config-path",
511                conf.config_path.as_str(),
512                "--listen-addr",
513                "0.0.0.0:5688",
514                "--advertise-addr",
515                &format!("192.168.3.{i}:5688"),
516                "--total-memory-bytes",
517                "6979321856",
518                "--parallelism",
519                &conf.compute_node_cores.to_string(),
520                "--temp-secret-file-dir",
521                &format!("./secrets/compute-{i}"),
522                "--resource-group",
523                &conf
524                    .compute_resource_groups
525                    .get(&i)
526                    .cloned()
527                    .unwrap_or(DEFAULT_RESOURCE_GROUP.to_string()),
528            ]);
529            handle
530                .create_node()
531                .name(format!("compute-{i}"))
532                .ip([192, 168, 3, i as u8].into())
533                .cores(conf.compute_node_cores)
534                .init(move || {
535                    risingwave_compute::start(
536                        opts.clone(),
537                        CancellationToken::new(), // dummy
538                    )
539                })
540                .build();
541        }
542
543        // compactor node
544        for i in 1..=conf.compactor_nodes {
545            let opts = risingwave_compactor::CompactorOpts::parse_from([
546                "compactor-node",
547                "--config-path",
548                conf.config_path.as_str(),
549                "--listen-addr",
550                "0.0.0.0:6660",
551                "--advertise-addr",
552                &format!("192.168.4.{i}:6660"),
553            ]);
554            handle
555                .create_node()
556                .name(format!("compactor-{i}"))
557                .ip([192, 168, 4, i as u8].into())
558                .init(move || {
559                    risingwave_compactor::start(
560                        opts.clone(),
561                        CancellationToken::new(), // dummy
562                    )
563                })
564                .build();
565        }
566
567        // wait for the service to be ready
568        tokio::time::sleep(Duration::from_secs(15)).await;
569
570        // client
571        let client = handle
572            .create_node()
573            .name("client")
574            .ip([192, 168, 100, 1].into())
575            .build();
576
577        // risectl
578        let ctl = handle
579            .create_node()
580            .name("ctl")
581            .ip([192, 168, 101, 1].into())
582            .build();
583
584        Ok(Self {
585            config: conf,
586            handle,
587            client,
588            ctl,
589            sqlite_file_handle,
590        })
591    }
592
593    #[cfg_or_panic(madsim)]
594    fn per_session_queries(&self) -> Arc<Vec<String>> {
595        self.config.per_session_queries.clone()
596    }
597
598    /// Start a SQL session on the client node.
599    #[cfg_or_panic(madsim)]
600    pub fn start_session(&mut self) -> Session {
601        let (query_tx, mut query_rx) = mpsc::channel::<SessionRequest>(0);
602        let per_session_queries = self.per_session_queries();
603
604        self.client.spawn(async move {
605            let mut client = RisingWave::connect("frontend".into(), "dev".into()).await?;
606
607            for sql in per_session_queries.as_ref() {
608                client.run(sql).await?;
609            }
610            drop(per_session_queries);
611
612            while let Some((sql, tx)) = query_rx.next().await {
613                let result = client
614                    .run(&sql)
615                    .await
616                    .map(|output| match output {
617                        sqllogictest::DBOutput::Rows { rows, .. } => rows
618                            .into_iter()
619                            .map(|row| {
620                                row.into_iter()
621                                    .map(|v| v.to_string())
622                                    .collect::<Vec<_>>()
623                                    .join(" ")
624                            })
625                            .collect::<Vec<_>>()
626                            .join("\n"),
627                        _ => "".to_string(),
628                    })
629                    .map_err(Into::into);
630
631                let _ = tx.send(result);
632            }
633
634            Ok::<_, anyhow::Error>(())
635        });
636
637        Session { query_tx }
638    }
639
640    /// Run a SQL query on a **new** session of the client node.
641    ///
642    /// This is a convenience method that creates a new session and runs the query on it. If you
643    /// want to run multiple queries on the same session, use `start_session` and `Session::run`.
644    pub async fn run(&mut self, sql: impl Into<String>) -> Result<String> {
645        self.start_session().run(sql).await
646    }
647
648    /// Run a future on the client node.
649    #[cfg_or_panic(madsim)]
650    pub async fn run_on_client<F>(&self, future: F) -> F::Output
651    where
652        F: Future + Send + 'static,
653        F::Output: Send + 'static,
654    {
655        self.client.spawn(future).await.unwrap()
656    }
657
658    pub async fn get_random_worker_nodes(&self, n: usize) -> Result<Vec<WorkerNode>> {
659        let worker_nodes = self.get_cluster_info().await?.get_worker_nodes().clone();
660        if worker_nodes.len() < n {
661            return Err(anyhow!("cannot remove more nodes than present"));
662        }
663        let rand_nodes = worker_nodes
664            .iter()
665            .choose_multiple(&mut rand::rng(), n)
666            .to_vec();
667        Ok(rand_nodes.iter().cloned().cloned().collect_vec())
668    }
669
670    /// Run a SQL query from the client and wait until the condition is met.
671    pub async fn wait_until(
672        &mut self,
673        sql: impl Into<String> + Clone,
674        mut p: impl FnMut(&str) -> bool,
675        interval: Duration,
676        timeout: Duration,
677    ) -> Result<String> {
678        let fut = async move {
679            let mut interval = tokio::time::interval(interval);
680            loop {
681                interval.tick().await;
682                let result = self.run(sql.clone()).await?;
683                if p(&result) {
684                    return Ok::<_, anyhow::Error>(result);
685                }
686            }
687        };
688
689        match tokio::time::timeout(timeout, fut).await {
690            Ok(r) => Ok(r?),
691            Err(_) => bail!("wait_until timeout"),
692        }
693    }
694
695    /// Run a SQL query from the client and wait until the return result is not empty.
696    pub async fn wait_until_non_empty(
697        &mut self,
698        sql: &str,
699        interval: Duration,
700        timeout: Duration,
701    ) -> Result<String> {
702        self.wait_until(sql, |r| !r.trim().is_empty(), interval, timeout)
703            .await
704    }
705
706    /// Generate a list of random worker nodes to kill by `opts`, then call `kill_nodes` to kill and
707    /// restart them.
708    pub async fn kill_node(&self, opts: &KillOpts) {
709        let mut nodes = vec![];
710        if opts.kill_meta {
711            let rand = rand::rng().random_range(0..3);
712            for i in 1..=self.config.meta_nodes {
713                match rand {
714                    0 => break,                                     // no killed
715                    1 => {}                                         // all killed
716                    _ if !rand::rng().random_bool(0.5) => continue, // random killed
717                    _ => {}
718                }
719                nodes.push(format!("meta-{}", i));
720            }
721            // don't kill all meta services
722            if nodes.len() == self.config.meta_nodes {
723                nodes.truncate(1);
724            }
725        }
726        if opts.kill_frontend {
727            let rand = rand::rng().random_range(0..3);
728            for i in 1..=self.config.frontend_nodes {
729                match rand {
730                    0 => break,                                     // no killed
731                    1 => {}                                         // all killed
732                    _ if !rand::rng().random_bool(0.5) => continue, // random killed
733                    _ => {}
734                }
735                nodes.push(format!("frontend-{}", i));
736            }
737        }
738        if opts.kill_compute {
739            let rand = rand::rng().random_range(0..3);
740            for i in 1..=self.config.compute_nodes {
741                match rand {
742                    0 => break,                                     // no killed
743                    1 => {}                                         // all killed
744                    _ if !rand::rng().random_bool(0.5) => continue, // random killed
745                    _ => {}
746                }
747                nodes.push(format!("compute-{}", i));
748            }
749        }
750        if opts.kill_compactor {
751            let rand = rand::rng().random_range(0..3);
752            for i in 1..=self.config.compactor_nodes {
753                match rand {
754                    0 => break,                                     // no killed
755                    1 => {}                                         // all killed
756                    _ if !rand::rng().random_bool(0.5) => continue, // random killed
757                    _ => {}
758                }
759                nodes.push(format!("compactor-{}", i));
760            }
761        }
762
763        self.kill_nodes(nodes, opts.restart_delay_secs).await
764    }
765
766    /// Kill the given nodes by their names and restart them in 2s + restart_delay_secs with a
767    /// probability of 0.1.
768    #[cfg_or_panic(madsim)]
769    pub async fn kill_nodes(
770        &self,
771        nodes: impl IntoIterator<Item = impl AsRef<str>>,
772        restart_delay_secs: u32,
773    ) {
774        join_all(nodes.into_iter().map(|name| async move {
775            let name = name.as_ref();
776            let t = rand::rng().random_range(Duration::from_secs(0)..Duration::from_secs(1));
777            tokio::time::sleep(t).await;
778            tracing::info!("kill {name}");
779            Handle::current().kill(name);
780
781            let mut t = rand::rng().random_range(Duration::from_secs(0)..Duration::from_secs(1));
782            // has a small chance to restart after a long time
783            // so that the node is expired and removed from the cluster
784            if rand::rng().random_bool(0.1) {
785                // max_heartbeat_interval_secs = 15
786                t += Duration::from_secs(restart_delay_secs as u64);
787            }
788            tokio::time::sleep(t).await;
789            tracing::info!("restart {name}");
790            Handle::current().restart(name);
791        }))
792        .await;
793    }
794
795    #[cfg_or_panic(madsim)]
796    pub async fn kill_nodes_and_restart(
797        &self,
798        nodes: impl IntoIterator<Item = impl AsRef<str>>,
799        restart_delay_secs: u32,
800    ) {
801        join_all(nodes.into_iter().map(|name| async move {
802            let name = name.as_ref();
803            tracing::info!("kill {name}");
804            Handle::current().kill(name);
805            tokio::time::sleep(Duration::from_secs(restart_delay_secs as u64)).await;
806            tracing::info!("restart {name}");
807            Handle::current().restart(name);
808        }))
809        .await;
810    }
811
812    #[cfg_or_panic(madsim)]
813    pub async fn simple_kill_nodes(&self, nodes: impl IntoIterator<Item = impl AsRef<str>>) {
814        join_all(nodes.into_iter().map(|name| async move {
815            let name = name.as_ref();
816            tracing::info!("kill {name}");
817            Handle::current().kill(name);
818        }))
819        .await;
820    }
821
822    #[cfg_or_panic(madsim)]
823    pub async fn simple_restart_nodes(&self, nodes: impl IntoIterator<Item = impl AsRef<str>>) {
824        join_all(nodes.into_iter().map(|name| async move {
825            let name = name.as_ref();
826            tracing::info!("restart {name}");
827            Handle::current().restart(name);
828        }))
829        .await;
830    }
831
832    /// Create a node for kafka producer and prepare data.
833    #[cfg_or_panic(madsim)]
834    pub async fn create_kafka_producer(&self, datadir: &str) {
835        self.handle
836            .create_node()
837            .name("kafka-producer")
838            .ip("192.168.11.2".parse().unwrap())
839            .build()
840            .spawn(crate::kafka::producer(
841                "192.168.11.1:29092",
842                datadir.to_string(),
843            ))
844            .await
845            .unwrap();
846    }
847
848    /// Create a kafka topic.
849    #[cfg_or_panic(madsim)]
850    pub fn create_kafka_topics(&self, topics: HashMap<String, i32>) {
851        self.handle
852            .create_node()
853            .name("kafka-topic-create")
854            .ip("192.168.11.3".parse().unwrap())
855            .build()
856            .spawn(crate::kafka::create_topics("192.168.11.1:29092", topics));
857    }
858
859    pub fn config(&self) -> Configuration {
860        self.config.clone()
861    }
862
863    pub fn handle(&self) -> &Handle {
864        &self.handle
865    }
866
867    /// Graceful shutdown all RisingWave nodes.
868    #[cfg_or_panic(madsim)]
869    pub async fn graceful_shutdown(&self) {
870        let mut nodes = vec![];
871        let mut metas = vec![];
872        for i in 1..=self.config.meta_nodes {
873            metas.push(format!("meta-{i}"));
874        }
875        for i in 1..=self.config.frontend_nodes {
876            nodes.push(format!("frontend-{i}"));
877        }
878        for i in 1..=self.config.compute_nodes {
879            nodes.push(format!("compute-{i}"));
880        }
881        for i in 1..=self.config.compactor_nodes {
882            nodes.push(format!("compactor-{i}"));
883        }
884
885        tracing::info!("graceful shutdown");
886        let waiting_time = Duration::from_secs(10);
887        // shutdown frontends, computes, compactors
888        for node in &nodes {
889            self.handle.send_ctrl_c(node);
890        }
891        tokio::time::sleep(waiting_time).await;
892        // shutdown metas
893        for meta in &metas {
894            self.handle.send_ctrl_c(meta);
895        }
896        tokio::time::sleep(waiting_time).await;
897
898        // check all nodes are exited
899        for node in nodes.iter().chain(metas.iter()) {
900            if !self.handle.is_exit(node) {
901                panic!("failed to graceful shutdown {node} in {waiting_time:?}");
902            }
903        }
904    }
905}
906
907type SessionRequest = (
908    String,                          // query sql
909    oneshot::Sender<Result<String>>, // channel to send result back
910);
911
912/// A SQL session on the simulated client node.
913#[derive(Debug, Clone)]
914pub struct Session {
915    query_tx: mpsc::Sender<SessionRequest>,
916}
917
918impl Session {
919    /// Run the given SQL query on the session.
920    pub async fn run(&mut self, sql: impl Into<String>) -> Result<String> {
921        let (tx, rx) = oneshot::channel();
922        self.query_tx.send((sql.into(), tx)).await?;
923        rx.await?
924    }
925
926    /// Run `FLUSH` on the session.
927    pub async fn flush(&mut self) -> Result<()> {
928        self.run("FLUSH").await?;
929        Ok(())
930    }
931
932    pub async fn is_arrangement_backfill_enabled(&mut self) -> Result<bool> {
933        let result = self.run("show streaming_use_arrangement_backfill").await?;
934        Ok(result == "true")
935    }
936}
937
938/// Options for killing nodes.
939#[derive(Debug, Clone, Copy, PartialEq)]
940pub struct KillOpts {
941    pub kill_rate: f32,
942    pub kill_meta: bool,
943    pub kill_frontend: bool,
944    pub kill_compute: bool,
945    pub kill_compactor: bool,
946    pub restart_delay_secs: u32,
947}
948
949impl KillOpts {
950    /// Killing all kind of nodes.
951    pub const ALL: Self = KillOpts {
952        kill_rate: 1.0,
953        kill_meta: false, // FIXME: make it true when multiple meta nodes are supported
954        kill_frontend: true,
955        kill_compute: true,
956        kill_compactor: true,
957        restart_delay_secs: 20,
958    };
959    pub const ALL_FAST: Self = KillOpts {
960        kill_rate: 1.0,
961        kill_meta: false, // FIXME: make it true when multiple meta nodes are supported
962        kill_frontend: true,
963        kill_compute: true,
964        kill_compactor: true,
965        restart_delay_secs: 2,
966    };
967}