risingwave_simulation/
cluster.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![cfg_attr(not(madsim), allow(unused_imports))]
16
17use std::cmp::max;
18use std::collections::HashMap;
19use std::future::Future;
20use std::io::Write;
21use std::path::PathBuf;
22use std::sync::Arc;
23use std::time::Duration;
24
25use anyhow::{Result, anyhow, bail};
26use cfg_or_panic::cfg_or_panic;
27use clap::Parser;
28use futures::channel::{mpsc, oneshot};
29use futures::future::join_all;
30use futures::{SinkExt, StreamExt};
31use itertools::Itertools;
32#[cfg(madsim)]
33use madsim::runtime::{Handle, NodeHandle};
34use rand::Rng;
35use rand::seq::IteratorRandom;
36use risingwave_common::util::tokio_util::sync::CancellationToken;
37use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
38#[cfg(madsim)]
39use risingwave_object_store::object::sim::SimServer as ObjectStoreSimServer;
40use risingwave_pb::common::WorkerNode;
41use sqllogictest::AsyncDB;
42use tempfile::NamedTempFile;
43#[cfg(not(madsim))]
44use tokio::runtime::Handle;
45use uuid::Uuid;
46
47use crate::client::RisingWave;
48
49/// The path to the configuration file for the cluster.
50#[derive(Clone, Debug)]
51pub enum ConfigPath {
52    /// A regular path pointing to a external configuration file.
53    Regular(String),
54    /// A temporary path pointing to a configuration file created at runtime.
55    Temp(Arc<tempfile::TempPath>),
56}
57
58impl ConfigPath {
59    pub fn as_str(&self) -> &str {
60        match self {
61            ConfigPath::Regular(s) => s,
62            ConfigPath::Temp(p) => p.as_os_str().to_str().unwrap(),
63        }
64    }
65}
66
67/// RisingWave cluster configuration.
68#[derive(Debug, Clone)]
69pub struct Configuration {
70    /// The path to configuration file.
71    ///
72    /// Empty string means using the default config.
73    pub config_path: ConfigPath,
74
75    /// The number of frontend nodes.
76    pub frontend_nodes: usize,
77
78    /// The number of compute nodes.
79    pub compute_nodes: usize,
80
81    /// The number of meta nodes.
82    pub meta_nodes: usize,
83
84    /// The number of compactor nodes.
85    pub compactor_nodes: usize,
86
87    /// The number of CPU cores for each compute node.
88    ///
89    /// This determines `worker_node_parallelism`.
90    pub compute_node_cores: usize,
91
92    /// Queries to run per session.
93    pub per_session_queries: Arc<Vec<String>>,
94
95    /// Resource groups for compute nodes.
96    pub compute_resource_groups: HashMap<usize, String>,
97
98    /// Roles for compute nodes (1-indexed). If not set, defaults to "both".
99    /// Values: "serving", "streaming", "both".
100    pub compute_node_roles: HashMap<usize, String>,
101}
102
103impl Default for Configuration {
104    fn default() -> Self {
105        let config_path = {
106            let mut file =
107                tempfile::NamedTempFile::new().expect("failed to create temp config file");
108
109            let config_data = r#"
110[server]
111telemetry_enabled = false
112metrics_level = "Disabled"
113"#
114            .to_owned();
115            file.write_all(config_data.as_bytes())
116                .expect("failed to write config file");
117            file.into_temp_path()
118        };
119
120        Configuration {
121            config_path: ConfigPath::Temp(config_path.into()),
122            frontend_nodes: 1,
123            compute_nodes: 1,
124            meta_nodes: 1,
125            compactor_nodes: 1,
126            compute_node_cores: 1,
127            per_session_queries: vec![].into(),
128            compute_resource_groups: Default::default(),
129            compute_node_roles: Default::default(),
130        }
131    }
132}
133
134impl Configuration {
135    /// Returns the configuration for scale test.
136    pub fn for_scale() -> Self {
137        // Embed the config file and create a temporary file at runtime. The file will be deleted
138        // automatically when it's dropped.
139        let config_path = {
140            let mut file =
141                tempfile::NamedTempFile::new().expect("failed to create temp config file");
142            file.write_all(include_bytes!("risingwave-scale.toml"))
143                .expect("failed to write config file");
144            file.into_temp_path()
145        };
146
147        Configuration {
148            config_path: ConfigPath::Temp(config_path.into()),
149            frontend_nodes: 2,
150            compute_nodes: 3,
151            meta_nodes: 1,
152            compactor_nodes: 2,
153            compute_node_cores: 2,
154            per_session_queries: vec![].into(),
155            ..Default::default()
156        }
157    }
158
159    /// Provides a configuration for scale test which ensures that the arrangement backfill is disabled,
160    /// so table scan will use `no_shuffle`.
161    pub fn for_scale_no_shuffle() -> Self {
162        let mut conf = Self::for_scale();
163        conf.per_session_queries = vec![
164            "SET STREAMING_USE_ARRANGEMENT_BACKFILL = false;".into(),
165            "SET STREAMING_USE_SNAPSHOT_BACKFILL = false;".into(),
166        ]
167        .into();
168        conf
169    }
170
171    pub fn for_scale_shared_source() -> Self {
172        let mut conf = Self::for_scale();
173        conf.per_session_queries = vec!["SET STREAMING_USE_SHARED_SOURCE = true;".into()].into();
174        conf
175    }
176
177    pub fn for_auto_parallelism(
178        max_heartbeat_interval_secs: u64,
179        enable_auto_parallelism: bool,
180    ) -> Self {
181        let disable_automatic_parallelism_control = !enable_auto_parallelism;
182
183        let config_path = {
184            let mut file =
185                tempfile::NamedTempFile::new().expect("failed to create temp config file");
186
187            let config_data = format!(
188                r#"[meta]
189max_heartbeat_interval_secs = {max_heartbeat_interval_secs}
190disable_automatic_parallelism_control = {disable_automatic_parallelism_control}
191parallelism_control_trigger_first_delay_sec = 0
192parallelism_control_batch_size = 10
193parallelism_control_trigger_period_sec = 10
194
195[system]
196barrier_interval_ms = 250
197checkpoint_frequency = 4
198
199[server]
200telemetry_enabled = false
201metrics_level = "Disabled"
202"#
203            );
204            file.write_all(config_data.as_bytes())
205                .expect("failed to write config file");
206            file.into_temp_path()
207        };
208
209        Configuration {
210            config_path: ConfigPath::Temp(config_path.into()),
211            frontend_nodes: 1,
212            compute_nodes: 3,
213            meta_nodes: 1,
214            compactor_nodes: 1,
215            compute_node_cores: 2,
216            per_session_queries: vec![
217                "create view if not exists table_parallelism as select t.name, tf.parallelism from rw_tables t, rw_table_fragments tf where t.id = tf.table_id;".into(),
218                "create view if not exists mview_parallelism as select m.name, tf.parallelism from rw_materialized_views m, rw_table_fragments tf where m.id = tf.table_id;".into(),
219            ]
220                .into(),
221            ..Default::default()
222        }
223    }
224
225    pub fn for_default_parallelism(default_parallelism: usize) -> Self {
226        let config_path = {
227            let mut file =
228                tempfile::NamedTempFile::new().expect("failed to create temp config file");
229
230            let config_data = format!(
231                r#"
232[server]
233telemetry_enabled = false
234metrics_level = "Disabled"
235[meta]
236default_parallelism = {default_parallelism}
237"#
238            );
239            file.write_all(config_data.as_bytes())
240                .expect("failed to write config file");
241            file.into_temp_path()
242        };
243
244        Configuration {
245            config_path: ConfigPath::Temp(config_path.into()),
246            frontend_nodes: 1,
247            compute_nodes: 1,
248            meta_nodes: 1,
249            compactor_nodes: 1,
250            compute_node_cores: default_parallelism * 2,
251            per_session_queries: vec![].into(),
252            compute_resource_groups: Default::default(),
253            compute_node_roles: Default::default(),
254        }
255    }
256
257    /// Returns the config for backfill test.
258    pub fn for_backfill() -> Self {
259        // Embed the config file and create a temporary file at runtime. The file will be deleted
260        // automatically when it's dropped.
261        let config_path = {
262            let mut file =
263                tempfile::NamedTempFile::new().expect("failed to create temp config file");
264            file.write_all(include_bytes!("backfill.toml"))
265                .expect("failed to write config file");
266            file.into_temp_path()
267        };
268
269        Configuration {
270            config_path: ConfigPath::Temp(config_path.into()),
271            frontend_nodes: 1,
272            compute_nodes: 1,
273            meta_nodes: 1,
274            compactor_nodes: 1,
275            compute_node_cores: 4,
276            ..Default::default()
277        }
278    }
279
280    pub fn for_arrangement_backfill() -> Self {
281        // Embed the config file and create a temporary file at runtime. The file will be deleted
282        // automatically when it's dropped.
283        let config_path = {
284            let mut file =
285                tempfile::NamedTempFile::new().expect("failed to create temp config file");
286            file.write_all(include_bytes!("arrangement_backfill.toml"))
287                .expect("failed to write config file");
288            file.into_temp_path()
289        };
290
291        Configuration {
292            config_path: ConfigPath::Temp(config_path.into()),
293            frontend_nodes: 1,
294            compute_nodes: 3,
295            meta_nodes: 1,
296            compactor_nodes: 1,
297            compute_node_cores: 1,
298            per_session_queries: vec![
299                "SET STREAMING_USE_ARRANGEMENT_BACKFILL = true;".into(),
300                "SET STREAMING_USE_SNAPSHOT_BACKFILL = false;".into(),
301            ]
302            .into(),
303            ..Default::default()
304        }
305    }
306
307    pub fn for_background_ddl() -> Self {
308        // Embed the config file and create a temporary file at runtime. The file will be deleted
309        // automatically when it's dropped.
310        let config_path = {
311            let mut file =
312                tempfile::NamedTempFile::new().expect("failed to create temp config file");
313            file.write_all(include_bytes!("background_ddl.toml"))
314                .expect("failed to write config file");
315            file.into_temp_path()
316        };
317
318        Configuration {
319            config_path: ConfigPath::Temp(config_path.into()),
320            // NOTE(kwannoel): The cancel test depends on `processlist`,
321            // which will cancel a stream job within the process.
322            // so we cannot have multiple frontend node, since a new session spawned
323            // to cancel the job could be routed to a different frontend node,
324            // in a different process.
325            frontend_nodes: 1,
326            compute_nodes: 3,
327            meta_nodes: 1,
328            compactor_nodes: 2,
329            compute_node_cores: 2,
330            ..Default::default()
331        }
332    }
333
334    pub fn enable_arrangement_backfill() -> Self {
335        let config_path = {
336            let mut file =
337                tempfile::NamedTempFile::new().expect("failed to create temp config file");
338            file.write_all(include_bytes!("disable_arrangement_backfill.toml"))
339                .expect("failed to write config file");
340            file.into_temp_path()
341        };
342        Configuration {
343            config_path: ConfigPath::Temp(config_path.into()),
344            frontend_nodes: 1,
345            compute_nodes: 1,
346            meta_nodes: 1,
347            compactor_nodes: 1,
348            compute_node_cores: 1,
349            per_session_queries: vec![].into(),
350            ..Default::default()
351        }
352    }
353
354    /// Returns the total number of cores for streaming compute nodes.
355    pub fn total_streaming_cores(&self) -> u32 {
356        (self.compute_nodes * self.compute_node_cores) as u32
357    }
358}
359
360/// A risingwave cluster.
361///
362/// # Nodes
363///
364/// | Name             | IP            |
365/// | ---------------- | ------------- |
366/// | meta-x           | 192.168.1.x   |
367/// | frontend-x       | 192.168.2.x   |
368/// | compute-x        | 192.168.3.x   |
369/// | compactor-x      | 192.168.4.x   |
370/// | kafka-broker     | 192.168.11.1  |
371/// | kafka-producer   | 192.168.11.2  |
372/// | object_store_sim | 192.168.12.1  |
373/// | client           | 192.168.100.1 |
374/// | ctl              | 192.168.101.1 |
375pub struct Cluster {
376    config: Configuration,
377    handle: Handle,
378    #[cfg(madsim)]
379    pub(crate) client: NodeHandle,
380    #[cfg(madsim)]
381    pub(crate) ctl: NodeHandle,
382    #[cfg(madsim)]
383    pub(crate) sqlite_file_handle: NamedTempFile,
384}
385
386impl Cluster {
387    /// Start a RisingWave cluster for testing.
388    ///
389    /// This function should be called exactly once in a test.
390    #[cfg_or_panic(madsim)]
391    pub async fn start(conf: Configuration) -> Result<Self> {
392        use madsim::net::ipvs::*;
393
394        let handle = madsim::runtime::Handle::current();
395        println!("seed = {}", handle.seed());
396        println!("{:#?}", conf);
397
398        // TODO: support mutil meta nodes
399        assert_eq!(conf.meta_nodes, 1);
400
401        // setup DNS and load balance
402        let net = madsim::net::NetSim::current();
403        for i in 1..=conf.meta_nodes {
404            net.add_dns_record(
405                &format!("meta-{i}"),
406                format!("192.168.1.{i}").parse().unwrap(),
407            );
408        }
409
410        net.add_dns_record("frontend", "192.168.2.0".parse().unwrap());
411        net.add_dns_record("message_queue", "192.168.11.1".parse().unwrap());
412        net.global_ipvs().add_service(
413            ServiceAddr::Tcp("192.168.2.0:4566".into()),
414            Scheduler::RoundRobin,
415        );
416        for i in 1..=conf.frontend_nodes {
417            net.global_ipvs().add_server(
418                ServiceAddr::Tcp("192.168.2.0:4566".into()),
419                &format!("192.168.2.{i}:4566"),
420            )
421        }
422
423        // kafka broker
424        handle
425            .create_node()
426            .name("kafka-broker")
427            .ip("192.168.11.1".parse().unwrap())
428            .init(move || async move {
429                rdkafka::SimBroker::default()
430                    .serve("0.0.0.0:29092".parse().unwrap())
431                    .await
432            })
433            .build();
434
435        // object_store_sim
436        handle
437            .create_node()
438            .name("object_store_sim")
439            .ip("192.168.12.1".parse().unwrap())
440            .init(move || async move {
441                ObjectStoreSimServer::builder()
442                    .serve("0.0.0.0:9301".parse().unwrap())
443                    .await
444            })
445            .build();
446
447        // wait for the service to be ready
448        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
449
450        let mut meta_addrs = vec![];
451        for i in 1..=conf.meta_nodes {
452            meta_addrs.push(format!("http://meta-{i}:5690"));
453        }
454        unsafe { std::env::set_var("RW_META_ADDR", meta_addrs.join(",")) };
455
456        let sqlite_file_handle: NamedTempFile = NamedTempFile::new().unwrap();
457        let file_path = sqlite_file_handle.path().display().to_string();
458        tracing::info!(?file_path, "sqlite_file_path");
459        let sql_endpoint = format!("sqlite://{}?mode=rwc", file_path);
460        let backend_args = vec!["--backend", "sql", "--sql-endpoint", &sql_endpoint];
461
462        // meta node
463        for i in 1..=conf.meta_nodes {
464            let args = [
465                "meta-node",
466                "--config-path",
467                conf.config_path.as_str(),
468                "--listen-addr",
469                "0.0.0.0:5690",
470                "--advertise-addr",
471                &format!("meta-{i}:5690"),
472                "--state-store",
473                "hummock+sim://hummockadmin:hummockadmin@192.168.12.1:9301/hummock001",
474                "--data-directory",
475                "hummock_001",
476                "--temp-secret-file-dir",
477                &format!("./secrets/meta-{i}"),
478            ];
479            let args = args.into_iter().chain(backend_args.clone().into_iter());
480            let opts = risingwave_meta_node::MetaNodeOpts::parse_from(args);
481            handle
482                .create_node()
483                .name(format!("meta-{i}"))
484                .ip([192, 168, 1, i as u8].into())
485                .init(move || {
486                    risingwave_meta_node::start(
487                        opts.clone(),
488                        CancellationToken::new(), // dummy
489                    )
490                })
491                .build();
492        }
493
494        // wait for the service to be ready
495        tokio::time::sleep(std::time::Duration::from_secs(15)).await;
496
497        // frontend node
498        for i in 1..=conf.frontend_nodes {
499            let opts = risingwave_frontend::FrontendOpts::parse_from([
500                "frontend-node",
501                "--config-path",
502                conf.config_path.as_str(),
503                "--listen-addr",
504                "0.0.0.0:4566",
505                "--health-check-listener-addr",
506                "0.0.0.0:6786",
507                "--advertise-addr",
508                &format!("192.168.2.{i}:4566"),
509                "--temp-secret-file-dir",
510                &format!("./secrets/frontend-{i}"),
511            ]);
512            handle
513                .create_node()
514                .name(format!("frontend-{i}"))
515                .ip([192, 168, 2, i as u8].into())
516                .init(move || {
517                    risingwave_frontend::start(
518                        opts.clone(),
519                        CancellationToken::new(), // dummy
520                    )
521                })
522                .build();
523        }
524
525        // compute node
526        for i in 1..=conf.compute_nodes {
527            let opts = risingwave_compute::ComputeNodeOpts::parse_from([
528                "compute-node",
529                "--config-path",
530                conf.config_path.as_str(),
531                "--listen-addr",
532                "0.0.0.0:5688",
533                "--advertise-addr",
534                &format!("192.168.3.{i}:5688"),
535                "--total-memory-bytes",
536                "6979321856",
537                "--parallelism",
538                &conf.compute_node_cores.to_string(),
539                "--temp-secret-file-dir",
540                &format!("./secrets/compute-{i}"),
541                "--resource-group",
542                &conf
543                    .compute_resource_groups
544                    .get(&i)
545                    .cloned()
546                    .unwrap_or(DEFAULT_RESOURCE_GROUP.to_string()),
547                "--role",
548                &conf
549                    .compute_node_roles
550                    .get(&i)
551                    .cloned()
552                    .unwrap_or("both".to_string()),
553            ]);
554            handle
555                .create_node()
556                .name(format!("compute-{i}"))
557                .ip([192, 168, 3, i as u8].into())
558                .cores(conf.compute_node_cores)
559                .init(move || {
560                    risingwave_compute::start(
561                        opts.clone(),
562                        CancellationToken::new(), // dummy
563                    )
564                })
565                .build();
566        }
567
568        // compactor node
569        for i in 1..=conf.compactor_nodes {
570            let opts = risingwave_compactor::CompactorOpts::parse_from([
571                "compactor-node",
572                "--config-path",
573                conf.config_path.as_str(),
574                "--listen-addr",
575                "0.0.0.0:6660",
576                "--advertise-addr",
577                &format!("192.168.4.{i}:6660"),
578            ]);
579            handle
580                .create_node()
581                .name(format!("compactor-{i}"))
582                .ip([192, 168, 4, i as u8].into())
583                .init(move || {
584                    risingwave_compactor::start(
585                        opts.clone(),
586                        CancellationToken::new(), // dummy
587                    )
588                })
589                .build();
590        }
591
592        // wait for the service to be ready
593        tokio::time::sleep(Duration::from_secs(15)).await;
594
595        // client
596        let client = handle
597            .create_node()
598            .name("client")
599            .ip([192, 168, 100, 1].into())
600            .build();
601
602        // risectl
603        let ctl = handle
604            .create_node()
605            .name("ctl")
606            .ip([192, 168, 101, 1].into())
607            .build();
608
609        Ok(Self {
610            config: conf,
611            handle,
612            client,
613            ctl,
614            sqlite_file_handle,
615        })
616    }
617
618    #[cfg_or_panic(madsim)]
619    fn per_session_queries(&self) -> Arc<Vec<String>> {
620        self.config.per_session_queries.clone()
621    }
622
623    /// Start a SQL session on the client node.
624    #[cfg_or_panic(madsim)]
625    pub fn start_session(&mut self) -> Session {
626        let (query_tx, mut query_rx) = mpsc::channel::<SessionRequest>(0);
627        let per_session_queries = self.per_session_queries();
628
629        self.client.spawn(async move {
630            let mut client = RisingWave::connect("frontend".into(), "dev".into()).await?;
631
632            for sql in per_session_queries.as_ref() {
633                client.run(sql).await?;
634            }
635            drop(per_session_queries);
636
637            while let Some((sql, tx)) = query_rx.next().await {
638                let result = client
639                    .run(&sql)
640                    .await
641                    .map(|output| match output {
642                        sqllogictest::DBOutput::Rows { rows, .. } => rows
643                            .into_iter()
644                            .map(|row| {
645                                row.into_iter()
646                                    .map(|v| v.to_string())
647                                    .collect::<Vec<_>>()
648                                    .join(" ")
649                            })
650                            .collect::<Vec<_>>()
651                            .join("\n"),
652                        _ => "".to_string(),
653                    })
654                    .map_err(Into::into);
655
656                let _ = tx.send(result);
657            }
658
659            Ok::<_, anyhow::Error>(())
660        });
661
662        Session { query_tx }
663    }
664
665    /// Run a SQL query on a **new** session of the client node.
666    ///
667    /// This is a convenience method that creates a new session and runs the query on it. If you
668    /// want to run multiple queries on the same session, use `start_session` and `Session::run`.
669    pub async fn run(&mut self, sql: impl Into<String>) -> Result<String> {
670        self.start_session().run(sql).await
671    }
672
673    /// Run a future on the client node.
674    #[cfg_or_panic(madsim)]
675    pub async fn run_on_client<F>(&self, future: F) -> F::Output
676    where
677        F: Future + Send + 'static,
678        F::Output: Send + 'static,
679    {
680        self.client.spawn(future).await.unwrap()
681    }
682
683    pub async fn get_random_worker_nodes(&self, n: usize) -> Result<Vec<WorkerNode>> {
684        let worker_nodes = self.get_cluster_info().await?.get_worker_nodes().clone();
685        if worker_nodes.len() < n {
686            return Err(anyhow!("cannot remove more nodes than present"));
687        }
688        let rand_nodes = worker_nodes
689            .iter()
690            .choose_multiple(&mut rand::rng(), n)
691            .clone();
692        Ok(rand_nodes.iter().cloned().cloned().collect_vec())
693    }
694
695    /// Run a SQL query from the client and wait until the condition is met.
696    pub async fn wait_until(
697        &mut self,
698        sql: impl Into<String> + Clone,
699        mut p: impl FnMut(&str) -> bool,
700        interval: Duration,
701        timeout: Duration,
702    ) -> Result<String> {
703        let fut = async move {
704            let mut interval = tokio::time::interval(interval);
705            loop {
706                interval.tick().await;
707                let result = self.run(sql.clone()).await?;
708                if p(&result) {
709                    return Ok::<_, anyhow::Error>(result);
710                }
711            }
712        };
713
714        match tokio::time::timeout(timeout, fut).await {
715            Ok(r) => Ok(r?),
716            Err(_) => bail!("wait_until timeout"),
717        }
718    }
719
720    /// Run a SQL query from the client and wait until the return result is not empty.
721    pub async fn wait_until_non_empty(
722        &mut self,
723        sql: &str,
724        interval: Duration,
725        timeout: Duration,
726    ) -> Result<String> {
727        self.wait_until(sql, |r| !r.trim().is_empty(), interval, timeout)
728            .await
729    }
730
731    /// Generate a list of random worker nodes to kill by `opts`, then call `kill_nodes` to kill and
732    /// restart them.
733    pub async fn kill_node(&self, opts: &KillOpts) {
734        let mut nodes = vec![];
735        if opts.kill_meta {
736            let rand = rand::rng().random_range(0..3);
737            for i in 1..=self.config.meta_nodes {
738                match rand {
739                    0 => break,                                     // no killed
740                    1 => {}                                         // all killed
741                    _ if !rand::rng().random_bool(0.5) => continue, // random killed
742                    _ => {}
743                }
744                nodes.push(format!("meta-{}", i));
745            }
746            // don't kill all meta services
747            if nodes.len() == self.config.meta_nodes {
748                nodes.truncate(1);
749            }
750        }
751        if opts.kill_frontend {
752            let rand = rand::rng().random_range(0..3);
753            for i in 1..=self.config.frontend_nodes {
754                match rand {
755                    0 => break,                                     // no killed
756                    1 => {}                                         // all killed
757                    _ if !rand::rng().random_bool(0.5) => continue, // random killed
758                    _ => {}
759                }
760                nodes.push(format!("frontend-{}", i));
761            }
762        }
763        if opts.kill_compute {
764            let rand = rand::rng().random_range(0..3);
765            for i in 1..=self.config.compute_nodes {
766                match rand {
767                    0 => break,                                     // no killed
768                    1 => {}                                         // all killed
769                    _ if !rand::rng().random_bool(0.5) => continue, // random killed
770                    _ => {}
771                }
772                nodes.push(format!("compute-{}", i));
773            }
774        }
775        if opts.kill_compactor {
776            let rand = rand::rng().random_range(0..3);
777            for i in 1..=self.config.compactor_nodes {
778                match rand {
779                    0 => break,                                     // no killed
780                    1 => {}                                         // all killed
781                    _ if !rand::rng().random_bool(0.5) => continue, // random killed
782                    _ => {}
783                }
784                nodes.push(format!("compactor-{}", i));
785            }
786        }
787
788        self.kill_nodes(nodes, opts.restart_delay_secs).await
789    }
790
791    /// Kill the given nodes by their names and restart them in 2s + `restart_delay_secs` with a
792    /// probability of 0.1.
793    #[cfg_or_panic(madsim)]
794    pub async fn kill_nodes(
795        &self,
796        nodes: impl IntoIterator<Item = impl AsRef<str>>,
797        restart_delay_secs: u32,
798    ) {
799        join_all(nodes.into_iter().map(|name| async move {
800            let name = name.as_ref();
801            let t = rand::rng().random_range(Duration::from_secs(0)..Duration::from_secs(1));
802            tokio::time::sleep(t).await;
803            tracing::info!("kill {name}");
804            Handle::current().kill(name);
805
806            let mut t = rand::rng().random_range(Duration::from_secs(0)..Duration::from_secs(1));
807            // has a small chance to restart after a long time
808            // so that the node is expired and removed from the cluster
809            if rand::rng().random_bool(0.1) {
810                // max_heartbeat_interval_secs = 15
811                t += Duration::from_secs(restart_delay_secs as u64);
812            }
813            tokio::time::sleep(t).await;
814            tracing::info!("restart {name}");
815            Handle::current().restart(name);
816        }))
817        .await;
818    }
819
820    #[cfg_or_panic(madsim)]
821    pub async fn kill_nodes_and_restart(
822        &self,
823        nodes: impl IntoIterator<Item = impl AsRef<str>>,
824        restart_delay_secs: u32,
825    ) {
826        join_all(nodes.into_iter().map(|name| async move {
827            let name = name.as_ref();
828            tracing::info!("kill {name}");
829            Handle::current().kill(name);
830            tokio::time::sleep(Duration::from_secs(restart_delay_secs as u64)).await;
831            tracing::info!("restart {name}");
832            Handle::current().restart(name);
833        }))
834        .await;
835    }
836
837    #[cfg_or_panic(madsim)]
838    pub async fn simple_kill_nodes(&self, nodes: impl IntoIterator<Item = impl AsRef<str>>) {
839        join_all(nodes.into_iter().map(|name| async move {
840            let name = name.as_ref();
841            tracing::info!("kill {name}");
842            Handle::current().kill(name);
843        }))
844        .await;
845    }
846
847    #[cfg_or_panic(madsim)]
848    pub async fn simple_restart_nodes(&self, nodes: impl IntoIterator<Item = impl AsRef<str>>) {
849        join_all(nodes.into_iter().map(|name| async move {
850            let name = name.as_ref();
851            tracing::info!("restart {name}");
852            Handle::current().restart(name);
853        }))
854        .await;
855    }
856
857    /// Create a node for kafka producer and prepare data.
858    #[cfg_or_panic(madsim)]
859    pub async fn create_kafka_producer(&self, datadir: &str) {
860        self.handle
861            .create_node()
862            .name("kafka-producer")
863            .ip("192.168.11.2".parse().unwrap())
864            .build()
865            .spawn(crate::kafka::producer(
866                "192.168.11.1:29092",
867                datadir.to_string(),
868            ))
869            .await
870            .unwrap();
871    }
872
873    /// Create a kafka topic.
874    #[cfg_or_panic(madsim)]
875    pub fn create_kafka_topics(&self, topics: HashMap<String, i32>) {
876        self.handle
877            .create_node()
878            .name("kafka-topic-create")
879            .ip("192.168.11.3".parse().unwrap())
880            .build()
881            .spawn(crate::kafka::create_topics("192.168.11.1:29092", topics));
882    }
883
884    pub fn config(&self) -> Configuration {
885        self.config.clone()
886    }
887
888    pub fn handle(&self) -> &Handle {
889        &self.handle
890    }
891
892    /// Graceful shutdown all RisingWave nodes.
893    #[cfg_or_panic(madsim)]
894    pub async fn graceful_shutdown(&self) {
895        let mut nodes = vec![];
896        let mut metas = vec![];
897        for i in 1..=self.config.meta_nodes {
898            metas.push(format!("meta-{i}"));
899        }
900        for i in 1..=self.config.frontend_nodes {
901            nodes.push(format!("frontend-{i}"));
902        }
903        for i in 1..=self.config.compute_nodes {
904            nodes.push(format!("compute-{i}"));
905        }
906        for i in 1..=self.config.compactor_nodes {
907            nodes.push(format!("compactor-{i}"));
908        }
909
910        tracing::info!("graceful shutdown");
911        let waiting_time = Duration::from_secs(10);
912        // shutdown frontends, computes, compactors
913        for node in &nodes {
914            self.handle.send_ctrl_c(node);
915        }
916        tokio::time::sleep(waiting_time).await;
917        // shutdown metas
918        for meta in &metas {
919            self.handle.send_ctrl_c(meta);
920        }
921        tokio::time::sleep(waiting_time).await;
922
923        // check all nodes are exited
924        for node in nodes.iter().chain(metas.iter()) {
925            if !self.handle.is_exit(node) {
926                panic!("failed to graceful shutdown {node} in {waiting_time:?}");
927            }
928        }
929    }
930
931    pub async fn wait_for_recovery(&mut self) -> Result<()> {
932        let timeout = Duration::from_secs(200);
933        let mut session = self.start_session();
934        tokio::time::timeout(timeout, async {
935            loop {
936                if let Ok(result) = session.run("select rw_recovery_status()").await
937                    && result == "RUNNING"
938                {
939                    break;
940                }
941                tokio::time::sleep(Duration::from_nanos(10)).await;
942            }
943        })
944        .await?;
945        Ok(())
946    }
947
948    /// This function only works if all actors in your cluster are following adaptive scaling.
949    pub async fn wait_for_scale(&mut self, parallelism: usize) -> Result<()> {
950        let timeout = Duration::from_secs(200);
951        let mut session = self.start_session();
952        tokio::time::timeout(timeout, async {
953            loop {
954                let parallelism_sql = format!(
955                    "select count(parallelism) filter (where parallelism != {parallelism})\
956                from (select count(*) parallelism from rw_actors group by fragment_id);"
957                );
958                if let Ok(result) = session.run(&parallelism_sql).await
959                    && result == "0"
960                {
961                    break;
962                }
963                tokio::time::sleep(Duration::from_nanos(10)).await;
964            }
965        })
966        .await?;
967        Ok(())
968    }
969}
970
971type SessionRequest = (
972    String,                          // query sql
973    oneshot::Sender<Result<String>>, // channel to send result back
974);
975
976/// A SQL session on the simulated client node.
977#[derive(Debug, Clone)]
978pub struct Session {
979    query_tx: mpsc::Sender<SessionRequest>,
980}
981
982impl Session {
983    /// Run the given SQLs on the session.
984    pub async fn run_all(&mut self, sqls: Vec<impl Into<String>>) -> Result<Vec<String>> {
985        let mut results = Vec::with_capacity(sqls.len());
986        for sql in sqls {
987            let result = self.run(sql).await?;
988            results.push(result);
989        }
990        Ok(results)
991    }
992
993    /// Run the given SQL query on the session.
994    pub async fn run(&mut self, sql: impl Into<String>) -> Result<String> {
995        let (tx, rx) = oneshot::channel();
996        self.query_tx.send((sql.into(), tx)).await?;
997        rx.await?
998    }
999
1000    /// Run `FLUSH` on the session.
1001    pub async fn flush(&mut self) -> Result<()> {
1002        self.run("FLUSH").await?;
1003        Ok(())
1004    }
1005
1006    pub async fn is_arrangement_backfill_enabled(&mut self) -> Result<bool> {
1007        let result = self.run("show streaming_use_arrangement_backfill").await?;
1008        Ok(result == "true")
1009    }
1010}
1011
1012/// Options for killing nodes.
1013#[derive(Debug, Clone, Copy, PartialEq)]
1014pub struct KillOpts {
1015    pub kill_rate: f32,
1016    pub kill_meta: bool,
1017    pub kill_frontend: bool,
1018    pub kill_compute: bool,
1019    pub kill_compactor: bool,
1020    pub restart_delay_secs: u32,
1021}
1022
1023impl KillOpts {
1024    /// Killing all kind of nodes.
1025    pub const ALL: Self = KillOpts {
1026        kill_rate: 1.0,
1027        kill_meta: false, // FIXME: make it true when multiple meta nodes are supported
1028        kill_frontend: true,
1029        kill_compute: true,
1030        kill_compactor: true,
1031        restart_delay_secs: 20,
1032    };
1033    pub const ALL_FAST: Self = KillOpts {
1034        kill_rate: 1.0,
1035        kill_meta: false, // FIXME: make it true when multiple meta nodes are supported
1036        kill_frontend: true,
1037        kill_compute: true,
1038        kill_compactor: true,
1039        restart_delay_secs: 2,
1040    };
1041}