risingwave_simulation/
cluster.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![cfg_attr(not(madsim), allow(unused_imports))]
16
17use std::cmp::max;
18use std::collections::HashMap;
19use std::future::Future;
20use std::io::Write;
21use std::path::PathBuf;
22use std::sync::Arc;
23use std::time::Duration;
24
25use anyhow::{Result, anyhow, bail};
26use cfg_or_panic::cfg_or_panic;
27use clap::Parser;
28use futures::channel::{mpsc, oneshot};
29use futures::future::join_all;
30use futures::{SinkExt, StreamExt};
31use itertools::Itertools;
32#[cfg(madsim)]
33use madsim::runtime::{Handle, NodeHandle};
34use rand::Rng;
35use rand::seq::IteratorRandom;
36use risingwave_common::util::tokio_util::sync::CancellationToken;
37use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
38#[cfg(madsim)]
39use risingwave_object_store::object::sim::SimServer as ObjectStoreSimServer;
40use risingwave_pb::common::WorkerNode;
41use sqllogictest::AsyncDB;
42use tempfile::NamedTempFile;
43#[cfg(not(madsim))]
44use tokio::runtime::Handle;
45use uuid::Uuid;
46
47use crate::client::RisingWave;
48
49/// The path to the configuration file for the cluster.
50#[derive(Clone, Debug)]
51pub enum ConfigPath {
52    /// A regular path pointing to a external configuration file.
53    Regular(String),
54    /// A temporary path pointing to a configuration file created at runtime.
55    Temp(Arc<tempfile::TempPath>),
56}
57
58impl ConfigPath {
59    pub fn as_str(&self) -> &str {
60        match self {
61            ConfigPath::Regular(s) => s,
62            ConfigPath::Temp(p) => p.as_os_str().to_str().unwrap(),
63        }
64    }
65}
66
67/// RisingWave cluster configuration.
68#[derive(Debug, Clone)]
69pub struct Configuration {
70    /// The path to configuration file.
71    ///
72    /// Empty string means using the default config.
73    pub config_path: ConfigPath,
74
75    /// The number of frontend nodes.
76    pub frontend_nodes: usize,
77
78    /// The number of compute nodes.
79    pub compute_nodes: usize,
80
81    /// The number of meta nodes.
82    pub meta_nodes: usize,
83
84    /// The number of compactor nodes.
85    pub compactor_nodes: usize,
86
87    /// The number of CPU cores for each compute node.
88    ///
89    /// This determines `worker_node_parallelism`.
90    pub compute_node_cores: usize,
91
92    /// Queries to run per session.
93    pub per_session_queries: Arc<Vec<String>>,
94
95    /// Resource groups for compute nodes.
96    pub compute_resource_groups: HashMap<usize, String>,
97}
98
99impl Default for Configuration {
100    fn default() -> Self {
101        let config_path = {
102            let mut file =
103                tempfile::NamedTempFile::new().expect("failed to create temp config file");
104
105            let config_data = r#"
106[server]
107telemetry_enabled = false
108metrics_level = "Disabled"
109"#
110            .to_owned();
111            file.write_all(config_data.as_bytes())
112                .expect("failed to write config file");
113            file.into_temp_path()
114        };
115
116        Configuration {
117            config_path: ConfigPath::Temp(config_path.into()),
118            frontend_nodes: 1,
119            compute_nodes: 1,
120            meta_nodes: 1,
121            compactor_nodes: 1,
122            compute_node_cores: 1,
123            per_session_queries: vec![].into(),
124            compute_resource_groups: Default::default(),
125        }
126    }
127}
128
129impl Configuration {
130    /// Returns the configuration for scale test.
131    pub fn for_scale() -> Self {
132        // Embed the config file and create a temporary file at runtime. The file will be deleted
133        // automatically when it's dropped.
134        let config_path = {
135            let mut file =
136                tempfile::NamedTempFile::new().expect("failed to create temp config file");
137            file.write_all(include_bytes!("risingwave-scale.toml"))
138                .expect("failed to write config file");
139            file.into_temp_path()
140        };
141
142        Configuration {
143            config_path: ConfigPath::Temp(config_path.into()),
144            frontend_nodes: 2,
145            compute_nodes: 3,
146            meta_nodes: 1,
147            compactor_nodes: 2,
148            compute_node_cores: 2,
149            per_session_queries: vec![
150                "set streaming_parallelism_strategy_for_table = 'DEFAULT'".into(),
151                "set streaming_parallelism_strategy_for_source = 'DEFAULT'".into(),
152                "alter system set adaptive_parallelism_strategy to AUTO".into(),
153            ]
154            .into(),
155            ..Default::default()
156        }
157    }
158
159    /// Provides a configuration for scale test which ensures that the arrangement backfill is disabled,
160    /// so table scan will use `no_shuffle`.
161    pub fn for_scale_no_shuffle() -> Self {
162        let mut conf = Self::for_scale();
163        conf.per_session_queries = vec![
164            "set streaming_parallelism_strategy_for_table = 'DEFAULT'".into(),
165            "set streaming_parallelism_strategy_for_source = 'DEFAULT'".into(),
166            "alter system set adaptive_parallelism_strategy to AUTO".into(),
167            "SET STREAMING_USE_ARRANGEMENT_BACKFILL = false;".into(),
168            "SET STREAMING_USE_SNAPSHOT_BACKFILL = false;".into(),
169        ]
170        .into();
171        conf
172    }
173
174    pub fn for_scale_shared_source() -> Self {
175        let mut conf = Self::for_scale();
176        conf.per_session_queries = vec![
177            "set streaming_parallelism_strategy_for_table = 'DEFAULT'".into(),
178            "set streaming_parallelism_strategy_for_source = 'DEFAULT'".into(),
179            "alter system set adaptive_parallelism_strategy to AUTO".into(),
180            "SET STREAMING_USE_SHARED_SOURCE = true;".into(),
181        ]
182        .into();
183        conf
184    }
185
186    pub fn for_auto_parallelism(
187        max_heartbeat_interval_secs: u64,
188        enable_auto_parallelism: bool,
189    ) -> Self {
190        let disable_automatic_parallelism_control = !enable_auto_parallelism;
191
192        let config_path = {
193            let mut file =
194                tempfile::NamedTempFile::new().expect("failed to create temp config file");
195
196            let config_data = format!(
197                r#"[meta]
198max_heartbeat_interval_secs = {max_heartbeat_interval_secs}
199disable_automatic_parallelism_control = {disable_automatic_parallelism_control}
200parallelism_control_trigger_first_delay_sec = 0
201parallelism_control_batch_size = 10
202parallelism_control_trigger_period_sec = 10
203
204[system]
205barrier_interval_ms = 250
206checkpoint_frequency = 4
207
208[server]
209telemetry_enabled = false
210metrics_level = "Disabled"
211"#
212            );
213            file.write_all(config_data.as_bytes())
214                .expect("failed to write config file");
215            file.into_temp_path()
216        };
217
218        Configuration {
219            config_path: ConfigPath::Temp(config_path.into()),
220            frontend_nodes: 1,
221            compute_nodes: 3,
222            meta_nodes: 1,
223            compactor_nodes: 1,
224            compute_node_cores: 2,
225            per_session_queries: vec![
226                "set streaming_parallelism_strategy_for_table = 'DEFAULT'".into(),
227                "set streaming_parallelism_strategy_for_source = 'DEFAULT'".into(),
228                "alter system set adaptive_parallelism_strategy to AUTO".into(),
229                "create view if not exists table_parallelism as select t.name, tf.parallelism from rw_tables t, rw_table_fragments tf where t.id = tf.table_id;".into(),
230                "create view if not exists mview_parallelism as select m.name, tf.parallelism from rw_materialized_views m, rw_table_fragments tf where m.id = tf.table_id;".into(),
231            ]
232                .into(),
233            ..Default::default()
234        }
235    }
236
237    pub fn for_default_parallelism(default_parallelism: usize) -> Self {
238        let config_path = {
239            let mut file =
240                tempfile::NamedTempFile::new().expect("failed to create temp config file");
241
242            let config_data = format!(
243                r#"
244[server]
245telemetry_enabled = false
246metrics_level = "Disabled"
247[meta]
248default_parallelism = {default_parallelism}
249"#
250            );
251            file.write_all(config_data.as_bytes())
252                .expect("failed to write config file");
253            file.into_temp_path()
254        };
255
256        Configuration {
257            config_path: ConfigPath::Temp(config_path.into()),
258            frontend_nodes: 1,
259            compute_nodes: 1,
260            meta_nodes: 1,
261            compactor_nodes: 1,
262            compute_node_cores: default_parallelism * 2,
263            per_session_queries: vec![].into(),
264            compute_resource_groups: Default::default(),
265        }
266    }
267
268    /// Returns the config for backfill test.
269    pub fn for_backfill() -> Self {
270        // Embed the config file and create a temporary file at runtime. The file will be deleted
271        // automatically when it's dropped.
272        let config_path = {
273            let mut file =
274                tempfile::NamedTempFile::new().expect("failed to create temp config file");
275            file.write_all(include_bytes!("backfill.toml"))
276                .expect("failed to write config file");
277            file.into_temp_path()
278        };
279
280        Configuration {
281            config_path: ConfigPath::Temp(config_path.into()),
282            frontend_nodes: 1,
283            compute_nodes: 1,
284            meta_nodes: 1,
285            compactor_nodes: 1,
286            compute_node_cores: 4,
287            ..Default::default()
288        }
289    }
290
291    pub fn for_arrangement_backfill() -> Self {
292        // Embed the config file and create a temporary file at runtime. The file will be deleted
293        // automatically when it's dropped.
294        let config_path = {
295            let mut file =
296                tempfile::NamedTempFile::new().expect("failed to create temp config file");
297            file.write_all(include_bytes!("arrangement_backfill.toml"))
298                .expect("failed to write config file");
299            file.into_temp_path()
300        };
301
302        Configuration {
303            config_path: ConfigPath::Temp(config_path.into()),
304            frontend_nodes: 1,
305            compute_nodes: 3,
306            meta_nodes: 1,
307            compactor_nodes: 1,
308            compute_node_cores: 1,
309            per_session_queries: vec![
310                "SET STREAMING_USE_ARRANGEMENT_BACKFILL = true;".into(),
311                "SET STREAMING_USE_SNAPSHOT_BACKFILL = false;".into(),
312            ]
313            .into(),
314            ..Default::default()
315        }
316    }
317
318    pub fn for_background_ddl() -> Self {
319        // Embed the config file and create a temporary file at runtime. The file will be deleted
320        // automatically when it's dropped.
321        let config_path = {
322            let mut file =
323                tempfile::NamedTempFile::new().expect("failed to create temp config file");
324            file.write_all(include_bytes!("background_ddl.toml"))
325                .expect("failed to write config file");
326            file.into_temp_path()
327        };
328
329        Configuration {
330            config_path: ConfigPath::Temp(config_path.into()),
331            // NOTE(kwannoel): The cancel test depends on `processlist`,
332            // which will cancel a stream job within the process.
333            // so we cannot have multiple frontend node, since a new session spawned
334            // to cancel the job could be routed to a different frontend node,
335            // in a different process.
336            frontend_nodes: 1,
337            compute_nodes: 3,
338            meta_nodes: 1,
339            compactor_nodes: 2,
340            compute_node_cores: 2,
341            ..Default::default()
342        }
343    }
344
345    pub fn enable_arrangement_backfill() -> Self {
346        let config_path = {
347            let mut file =
348                tempfile::NamedTempFile::new().expect("failed to create temp config file");
349            file.write_all(include_bytes!("disable_arrangement_backfill.toml"))
350                .expect("failed to write config file");
351            file.into_temp_path()
352        };
353        Configuration {
354            config_path: ConfigPath::Temp(config_path.into()),
355            frontend_nodes: 1,
356            compute_nodes: 1,
357            meta_nodes: 1,
358            compactor_nodes: 1,
359            compute_node_cores: 1,
360            per_session_queries: vec![].into(),
361            ..Default::default()
362        }
363    }
364
365    /// Returns the total number of cores for streaming compute nodes.
366    pub fn total_streaming_cores(&self) -> u32 {
367        (self.compute_nodes * self.compute_node_cores) as u32
368    }
369}
370
371/// A risingwave cluster.
372///
373/// # Nodes
374///
375/// | Name             | IP            |
376/// | ---------------- | ------------- |
377/// | meta-x           | 192.168.1.x   |
378/// | frontend-x       | 192.168.2.x   |
379/// | compute-x        | 192.168.3.x   |
380/// | compactor-x      | 192.168.4.x   |
381/// | kafka-broker     | 192.168.11.1  |
382/// | kafka-producer   | 192.168.11.2  |
383/// | object_store_sim | 192.168.12.1  |
384/// | client           | 192.168.100.1 |
385/// | ctl              | 192.168.101.1 |
386pub struct Cluster {
387    config: Configuration,
388    handle: Handle,
389    #[cfg(madsim)]
390    pub(crate) client: NodeHandle,
391    #[cfg(madsim)]
392    pub(crate) ctl: NodeHandle,
393    #[cfg(madsim)]
394    pub(crate) sqlite_file_handle: NamedTempFile,
395}
396
397impl Cluster {
398    /// Start a RisingWave cluster for testing.
399    ///
400    /// This function should be called exactly once in a test.
401    #[cfg_or_panic(madsim)]
402    pub async fn start(conf: Configuration) -> Result<Self> {
403        use madsim::net::ipvs::*;
404
405        let handle = madsim::runtime::Handle::current();
406        println!("seed = {}", handle.seed());
407        println!("{:#?}", conf);
408
409        // TODO: support mutil meta nodes
410        assert_eq!(conf.meta_nodes, 1);
411
412        // setup DNS and load balance
413        let net = madsim::net::NetSim::current();
414        for i in 1..=conf.meta_nodes {
415            net.add_dns_record(
416                &format!("meta-{i}"),
417                format!("192.168.1.{i}").parse().unwrap(),
418            );
419        }
420
421        net.add_dns_record("frontend", "192.168.2.0".parse().unwrap());
422        net.add_dns_record("message_queue", "192.168.11.1".parse().unwrap());
423        net.global_ipvs().add_service(
424            ServiceAddr::Tcp("192.168.2.0:4566".into()),
425            Scheduler::RoundRobin,
426        );
427        for i in 1..=conf.frontend_nodes {
428            net.global_ipvs().add_server(
429                ServiceAddr::Tcp("192.168.2.0:4566".into()),
430                &format!("192.168.2.{i}:4566"),
431            )
432        }
433
434        // kafka broker
435        handle
436            .create_node()
437            .name("kafka-broker")
438            .ip("192.168.11.1".parse().unwrap())
439            .init(move || async move {
440                rdkafka::SimBroker::default()
441                    .serve("0.0.0.0:29092".parse().unwrap())
442                    .await
443            })
444            .build();
445
446        // object_store_sim
447        handle
448            .create_node()
449            .name("object_store_sim")
450            .ip("192.168.12.1".parse().unwrap())
451            .init(move || async move {
452                ObjectStoreSimServer::builder()
453                    .serve("0.0.0.0:9301".parse().unwrap())
454                    .await
455            })
456            .build();
457
458        // wait for the service to be ready
459        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
460
461        let mut meta_addrs = vec![];
462        for i in 1..=conf.meta_nodes {
463            meta_addrs.push(format!("http://meta-{i}:5690"));
464        }
465        unsafe { std::env::set_var("RW_META_ADDR", meta_addrs.join(",")) };
466
467        let sqlite_file_handle: NamedTempFile = NamedTempFile::new().unwrap();
468        let file_path = sqlite_file_handle.path().display().to_string();
469        tracing::info!(?file_path, "sqlite_file_path");
470        let sql_endpoint = format!("sqlite://{}?mode=rwc", file_path);
471        let backend_args = vec!["--backend", "sql", "--sql-endpoint", &sql_endpoint];
472
473        // meta node
474        for i in 1..=conf.meta_nodes {
475            let args = [
476                "meta-node",
477                "--config-path",
478                conf.config_path.as_str(),
479                "--listen-addr",
480                "0.0.0.0:5690",
481                "--advertise-addr",
482                &format!("meta-{i}:5690"),
483                "--state-store",
484                "hummock+sim://hummockadmin:hummockadmin@192.168.12.1:9301/hummock001",
485                "--data-directory",
486                "hummock_001",
487                "--temp-secret-file-dir",
488                &format!("./secrets/meta-{i}"),
489            ];
490            let args = args.into_iter().chain(backend_args.clone().into_iter());
491            let opts = risingwave_meta_node::MetaNodeOpts::parse_from(args);
492            handle
493                .create_node()
494                .name(format!("meta-{i}"))
495                .ip([192, 168, 1, i as u8].into())
496                .init(move || {
497                    risingwave_meta_node::start(
498                        opts.clone(),
499                        CancellationToken::new(), // dummy
500                    )
501                })
502                .build();
503        }
504
505        // wait for the service to be ready
506        tokio::time::sleep(std::time::Duration::from_secs(15)).await;
507
508        // frontend node
509        for i in 1..=conf.frontend_nodes {
510            let opts = risingwave_frontend::FrontendOpts::parse_from([
511                "frontend-node",
512                "--config-path",
513                conf.config_path.as_str(),
514                "--listen-addr",
515                "0.0.0.0:4566",
516                "--health-check-listener-addr",
517                "0.0.0.0:6786",
518                "--advertise-addr",
519                &format!("192.168.2.{i}:4566"),
520                "--temp-secret-file-dir",
521                &format!("./secrets/frontend-{i}"),
522            ]);
523            handle
524                .create_node()
525                .name(format!("frontend-{i}"))
526                .ip([192, 168, 2, i as u8].into())
527                .init(move || {
528                    risingwave_frontend::start(
529                        opts.clone(),
530                        CancellationToken::new(), // dummy
531                    )
532                })
533                .build();
534        }
535
536        // compute node
537        for i in 1..=conf.compute_nodes {
538            let opts = risingwave_compute::ComputeNodeOpts::parse_from([
539                "compute-node",
540                "--config-path",
541                conf.config_path.as_str(),
542                "--listen-addr",
543                "0.0.0.0:5688",
544                "--advertise-addr",
545                &format!("192.168.3.{i}:5688"),
546                "--total-memory-bytes",
547                "6979321856",
548                "--parallelism",
549                &conf.compute_node_cores.to_string(),
550                "--temp-secret-file-dir",
551                &format!("./secrets/compute-{i}"),
552                "--resource-group",
553                &conf
554                    .compute_resource_groups
555                    .get(&i)
556                    .cloned()
557                    .unwrap_or(DEFAULT_RESOURCE_GROUP.to_string()),
558            ]);
559            handle
560                .create_node()
561                .name(format!("compute-{i}"))
562                .ip([192, 168, 3, i as u8].into())
563                .cores(conf.compute_node_cores)
564                .init(move || {
565                    risingwave_compute::start(
566                        opts.clone(),
567                        CancellationToken::new(), // dummy
568                    )
569                })
570                .build();
571        }
572
573        // compactor node
574        for i in 1..=conf.compactor_nodes {
575            let opts = risingwave_compactor::CompactorOpts::parse_from([
576                "compactor-node",
577                "--config-path",
578                conf.config_path.as_str(),
579                "--listen-addr",
580                "0.0.0.0:6660",
581                "--advertise-addr",
582                &format!("192.168.4.{i}:6660"),
583            ]);
584            handle
585                .create_node()
586                .name(format!("compactor-{i}"))
587                .ip([192, 168, 4, i as u8].into())
588                .init(move || {
589                    risingwave_compactor::start(
590                        opts.clone(),
591                        CancellationToken::new(), // dummy
592                    )
593                })
594                .build();
595        }
596
597        // wait for the service to be ready
598        tokio::time::sleep(Duration::from_secs(15)).await;
599
600        // client
601        let client = handle
602            .create_node()
603            .name("client")
604            .ip([192, 168, 100, 1].into())
605            .build();
606
607        // risectl
608        let ctl = handle
609            .create_node()
610            .name("ctl")
611            .ip([192, 168, 101, 1].into())
612            .build();
613
614        Ok(Self {
615            config: conf,
616            handle,
617            client,
618            ctl,
619            sqlite_file_handle,
620        })
621    }
622
623    #[cfg_or_panic(madsim)]
624    fn per_session_queries(&self) -> Arc<Vec<String>> {
625        self.config.per_session_queries.clone()
626    }
627
628    /// Start a SQL session on the client node.
629    #[cfg_or_panic(madsim)]
630    pub fn start_session(&mut self) -> Session {
631        let (query_tx, mut query_rx) = mpsc::channel::<SessionRequest>(0);
632        let per_session_queries = self.per_session_queries();
633
634        self.client.spawn(async move {
635            let mut client = RisingWave::connect("frontend".into(), "dev".into()).await?;
636
637            for sql in per_session_queries.as_ref() {
638                client.run(sql).await?;
639            }
640            drop(per_session_queries);
641
642            while let Some((sql, tx)) = query_rx.next().await {
643                let result = client
644                    .run(&sql)
645                    .await
646                    .map(|output| match output {
647                        sqllogictest::DBOutput::Rows { rows, .. } => rows
648                            .into_iter()
649                            .map(|row| {
650                                row.into_iter()
651                                    .map(|v| v.to_string())
652                                    .collect::<Vec<_>>()
653                                    .join(" ")
654                            })
655                            .collect::<Vec<_>>()
656                            .join("\n"),
657                        _ => "".to_string(),
658                    })
659                    .map_err(Into::into);
660
661                let _ = tx.send(result);
662            }
663
664            Ok::<_, anyhow::Error>(())
665        });
666
667        Session { query_tx }
668    }
669
670    /// Run a SQL query on a **new** session of the client node.
671    ///
672    /// This is a convenience method that creates a new session and runs the query on it. If you
673    /// want to run multiple queries on the same session, use `start_session` and `Session::run`.
674    pub async fn run(&mut self, sql: impl Into<String>) -> Result<String> {
675        self.start_session().run(sql).await
676    }
677
678    /// Run a future on the client node.
679    #[cfg_or_panic(madsim)]
680    pub async fn run_on_client<F>(&self, future: F) -> F::Output
681    where
682        F: Future + Send + 'static,
683        F::Output: Send + 'static,
684    {
685        self.client.spawn(future).await.unwrap()
686    }
687
688    pub async fn get_random_worker_nodes(&self, n: usize) -> Result<Vec<WorkerNode>> {
689        let worker_nodes = self.get_cluster_info().await?.get_worker_nodes().clone();
690        if worker_nodes.len() < n {
691            return Err(anyhow!("cannot remove more nodes than present"));
692        }
693        let rand_nodes = worker_nodes
694            .iter()
695            .choose_multiple(&mut rand::rng(), n)
696            .clone();
697        Ok(rand_nodes.iter().cloned().cloned().collect_vec())
698    }
699
700    /// Run a SQL query from the client and wait until the condition is met.
701    pub async fn wait_until(
702        &mut self,
703        sql: impl Into<String> + Clone,
704        mut p: impl FnMut(&str) -> bool,
705        interval: Duration,
706        timeout: Duration,
707    ) -> Result<String> {
708        let fut = async move {
709            let mut interval = tokio::time::interval(interval);
710            loop {
711                interval.tick().await;
712                let result = self.run(sql.clone()).await?;
713                if p(&result) {
714                    return Ok::<_, anyhow::Error>(result);
715                }
716            }
717        };
718
719        match tokio::time::timeout(timeout, fut).await {
720            Ok(r) => Ok(r?),
721            Err(_) => bail!("wait_until timeout"),
722        }
723    }
724
725    /// Run a SQL query from the client and wait until the return result is not empty.
726    pub async fn wait_until_non_empty(
727        &mut self,
728        sql: &str,
729        interval: Duration,
730        timeout: Duration,
731    ) -> Result<String> {
732        self.wait_until(sql, |r| !r.trim().is_empty(), interval, timeout)
733            .await
734    }
735
736    /// Generate a list of random worker nodes to kill by `opts`, then call `kill_nodes` to kill and
737    /// restart them.
738    pub async fn kill_node(&self, opts: &KillOpts) {
739        let mut nodes = vec![];
740        if opts.kill_meta {
741            let rand = rand::rng().random_range(0..3);
742            for i in 1..=self.config.meta_nodes {
743                match rand {
744                    0 => break,                                     // no killed
745                    1 => {}                                         // all killed
746                    _ if !rand::rng().random_bool(0.5) => continue, // random killed
747                    _ => {}
748                }
749                nodes.push(format!("meta-{}", i));
750            }
751            // don't kill all meta services
752            if nodes.len() == self.config.meta_nodes {
753                nodes.truncate(1);
754            }
755        }
756        if opts.kill_frontend {
757            let rand = rand::rng().random_range(0..3);
758            for i in 1..=self.config.frontend_nodes {
759                match rand {
760                    0 => break,                                     // no killed
761                    1 => {}                                         // all killed
762                    _ if !rand::rng().random_bool(0.5) => continue, // random killed
763                    _ => {}
764                }
765                nodes.push(format!("frontend-{}", i));
766            }
767        }
768        if opts.kill_compute {
769            let rand = rand::rng().random_range(0..3);
770            for i in 1..=self.config.compute_nodes {
771                match rand {
772                    0 => break,                                     // no killed
773                    1 => {}                                         // all killed
774                    _ if !rand::rng().random_bool(0.5) => continue, // random killed
775                    _ => {}
776                }
777                nodes.push(format!("compute-{}", i));
778            }
779        }
780        if opts.kill_compactor {
781            let rand = rand::rng().random_range(0..3);
782            for i in 1..=self.config.compactor_nodes {
783                match rand {
784                    0 => break,                                     // no killed
785                    1 => {}                                         // all killed
786                    _ if !rand::rng().random_bool(0.5) => continue, // random killed
787                    _ => {}
788                }
789                nodes.push(format!("compactor-{}", i));
790            }
791        }
792
793        self.kill_nodes(nodes, opts.restart_delay_secs).await
794    }
795
796    /// Kill the given nodes by their names and restart them in 2s + `restart_delay_secs` with a
797    /// probability of 0.1.
798    #[cfg_or_panic(madsim)]
799    pub async fn kill_nodes(
800        &self,
801        nodes: impl IntoIterator<Item = impl AsRef<str>>,
802        restart_delay_secs: u32,
803    ) {
804        join_all(nodes.into_iter().map(|name| async move {
805            let name = name.as_ref();
806            let t = rand::rng().random_range(Duration::from_secs(0)..Duration::from_secs(1));
807            tokio::time::sleep(t).await;
808            tracing::info!("kill {name}");
809            Handle::current().kill(name);
810
811            let mut t = rand::rng().random_range(Duration::from_secs(0)..Duration::from_secs(1));
812            // has a small chance to restart after a long time
813            // so that the node is expired and removed from the cluster
814            if rand::rng().random_bool(0.1) {
815                // max_heartbeat_interval_secs = 15
816                t += Duration::from_secs(restart_delay_secs as u64);
817            }
818            tokio::time::sleep(t).await;
819            tracing::info!("restart {name}");
820            Handle::current().restart(name);
821        }))
822        .await;
823    }
824
825    #[cfg_or_panic(madsim)]
826    pub async fn kill_nodes_and_restart(
827        &self,
828        nodes: impl IntoIterator<Item = impl AsRef<str>>,
829        restart_delay_secs: u32,
830    ) {
831        join_all(nodes.into_iter().map(|name| async move {
832            let name = name.as_ref();
833            tracing::info!("kill {name}");
834            Handle::current().kill(name);
835            tokio::time::sleep(Duration::from_secs(restart_delay_secs as u64)).await;
836            tracing::info!("restart {name}");
837            Handle::current().restart(name);
838        }))
839        .await;
840    }
841
842    #[cfg_or_panic(madsim)]
843    pub async fn simple_kill_nodes(&self, nodes: impl IntoIterator<Item = impl AsRef<str>>) {
844        join_all(nodes.into_iter().map(|name| async move {
845            let name = name.as_ref();
846            tracing::info!("kill {name}");
847            Handle::current().kill(name);
848        }))
849        .await;
850    }
851
852    #[cfg_or_panic(madsim)]
853    pub async fn simple_restart_nodes(&self, nodes: impl IntoIterator<Item = impl AsRef<str>>) {
854        join_all(nodes.into_iter().map(|name| async move {
855            let name = name.as_ref();
856            tracing::info!("restart {name}");
857            Handle::current().restart(name);
858        }))
859        .await;
860    }
861
862    /// Create a node for kafka producer and prepare data.
863    #[cfg_or_panic(madsim)]
864    pub async fn create_kafka_producer(&self, datadir: &str) {
865        self.handle
866            .create_node()
867            .name("kafka-producer")
868            .ip("192.168.11.2".parse().unwrap())
869            .build()
870            .spawn(crate::kafka::producer(
871                "192.168.11.1:29092",
872                datadir.to_string(),
873            ))
874            .await
875            .unwrap();
876    }
877
878    /// Create a kafka topic.
879    #[cfg_or_panic(madsim)]
880    pub fn create_kafka_topics(&self, topics: HashMap<String, i32>) {
881        self.handle
882            .create_node()
883            .name("kafka-topic-create")
884            .ip("192.168.11.3".parse().unwrap())
885            .build()
886            .spawn(crate::kafka::create_topics("192.168.11.1:29092", topics));
887    }
888
889    pub fn config(&self) -> Configuration {
890        self.config.clone()
891    }
892
893    pub fn handle(&self) -> &Handle {
894        &self.handle
895    }
896
897    /// Graceful shutdown all RisingWave nodes.
898    #[cfg_or_panic(madsim)]
899    pub async fn graceful_shutdown(&self) {
900        let mut nodes = vec![];
901        let mut metas = vec![];
902        for i in 1..=self.config.meta_nodes {
903            metas.push(format!("meta-{i}"));
904        }
905        for i in 1..=self.config.frontend_nodes {
906            nodes.push(format!("frontend-{i}"));
907        }
908        for i in 1..=self.config.compute_nodes {
909            nodes.push(format!("compute-{i}"));
910        }
911        for i in 1..=self.config.compactor_nodes {
912            nodes.push(format!("compactor-{i}"));
913        }
914
915        tracing::info!("graceful shutdown");
916        let waiting_time = Duration::from_secs(10);
917        // shutdown frontends, computes, compactors
918        for node in &nodes {
919            self.handle.send_ctrl_c(node);
920        }
921        tokio::time::sleep(waiting_time).await;
922        // shutdown metas
923        for meta in &metas {
924            self.handle.send_ctrl_c(meta);
925        }
926        tokio::time::sleep(waiting_time).await;
927
928        // check all nodes are exited
929        for node in nodes.iter().chain(metas.iter()) {
930            if !self.handle.is_exit(node) {
931                panic!("failed to graceful shutdown {node} in {waiting_time:?}");
932            }
933        }
934    }
935
936    pub async fn wait_for_recovery(&mut self) -> Result<()> {
937        let timeout = Duration::from_secs(200);
938        let mut session = self.start_session();
939        tokio::time::timeout(timeout, async {
940            loop {
941                if let Ok(result) = session.run("select rw_recovery_status()").await
942                    && result == "RUNNING"
943                {
944                    break;
945                }
946                tokio::time::sleep(Duration::from_nanos(10)).await;
947            }
948        })
949        .await?;
950        Ok(())
951    }
952
953    /// This function only works if all actors in your cluster are following adaptive scaling.
954    pub async fn wait_for_scale(&mut self, parallelism: usize) -> Result<()> {
955        let timeout = Duration::from_secs(200);
956        let mut session = self.start_session();
957        tokio::time::timeout(timeout, async {
958            loop {
959                let parallelism_sql = format!(
960                    "select count(parallelism) filter (where parallelism != {parallelism})\
961                from (select count(*) parallelism from rw_actors group by fragment_id);"
962                );
963                if let Ok(result) = session.run(&parallelism_sql).await
964                    && result == "0"
965                {
966                    break;
967                }
968                tokio::time::sleep(Duration::from_nanos(10)).await;
969            }
970        })
971        .await?;
972        Ok(())
973    }
974}
975
976type SessionRequest = (
977    String,                          // query sql
978    oneshot::Sender<Result<String>>, // channel to send result back
979);
980
981/// A SQL session on the simulated client node.
982#[derive(Debug, Clone)]
983pub struct Session {
984    query_tx: mpsc::Sender<SessionRequest>,
985}
986
987impl Session {
988    /// Run the given SQLs on the session.
989    pub async fn run_all(&mut self, sqls: Vec<impl Into<String>>) -> Result<Vec<String>> {
990        let mut results = Vec::with_capacity(sqls.len());
991        for sql in sqls {
992            let result = self.run(sql).await?;
993            results.push(result);
994        }
995        Ok(results)
996    }
997
998    /// Run the given SQL query on the session.
999    pub async fn run(&mut self, sql: impl Into<String>) -> Result<String> {
1000        let (tx, rx) = oneshot::channel();
1001        self.query_tx.send((sql.into(), tx)).await?;
1002        rx.await?
1003    }
1004
1005    /// Run `FLUSH` on the session.
1006    pub async fn flush(&mut self) -> Result<()> {
1007        self.run("FLUSH").await?;
1008        Ok(())
1009    }
1010
1011    pub async fn is_arrangement_backfill_enabled(&mut self) -> Result<bool> {
1012        let result = self.run("show streaming_use_arrangement_backfill").await?;
1013        Ok(result == "true")
1014    }
1015}
1016
1017/// Options for killing nodes.
1018#[derive(Debug, Clone, Copy, PartialEq)]
1019pub struct KillOpts {
1020    pub kill_rate: f32,
1021    pub kill_meta: bool,
1022    pub kill_frontend: bool,
1023    pub kill_compute: bool,
1024    pub kill_compactor: bool,
1025    pub restart_delay_secs: u32,
1026}
1027
1028impl KillOpts {
1029    /// Killing all kind of nodes.
1030    pub const ALL: Self = KillOpts {
1031        kill_rate: 1.0,
1032        kill_meta: false, // FIXME: make it true when multiple meta nodes are supported
1033        kill_frontend: true,
1034        kill_compute: true,
1035        kill_compactor: true,
1036        restart_delay_secs: 20,
1037    };
1038    pub const ALL_FAST: Self = KillOpts {
1039        kill_rate: 1.0,
1040        kill_meta: false, // FIXME: make it true when multiple meta nodes are supported
1041        kill_frontend: true,
1042        kill_compute: true,
1043        kill_compactor: true,
1044        restart_delay_secs: 2,
1045    };
1046}