1#![cfg_attr(not(madsim), allow(unused_imports))]
16
17use std::cmp::max;
18use std::collections::HashMap;
19use std::future::Future;
20use std::io::Write;
21use std::path::PathBuf;
22use std::sync::Arc;
23use std::time::Duration;
24
25use anyhow::{Result, anyhow, bail};
26use cfg_or_panic::cfg_or_panic;
27use clap::Parser;
28use futures::channel::{mpsc, oneshot};
29use futures::future::join_all;
30use futures::{SinkExt, StreamExt};
31use itertools::Itertools;
32#[cfg(madsim)]
33use madsim::runtime::{Handle, NodeHandle};
34use rand::Rng;
35use rand::seq::IteratorRandom;
36use risingwave_common::util::tokio_util::sync::CancellationToken;
37use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
38#[cfg(madsim)]
39use risingwave_object_store::object::sim::SimServer as ObjectStoreSimServer;
40use risingwave_pb::common::WorkerNode;
41use sqllogictest::AsyncDB;
42use tempfile::NamedTempFile;
43#[cfg(not(madsim))]
44use tokio::runtime::Handle;
45use uuid::Uuid;
46
47use crate::client::RisingWave;
48
49#[derive(Clone, Debug)]
51pub enum ConfigPath {
52 Regular(String),
54 Temp(Arc<tempfile::TempPath>),
56}
57
58impl ConfigPath {
59 pub fn as_str(&self) -> &str {
60 match self {
61 ConfigPath::Regular(s) => s,
62 ConfigPath::Temp(p) => p.as_os_str().to_str().unwrap(),
63 }
64 }
65}
66
67#[derive(Debug, Clone)]
69pub struct Configuration {
70 pub config_path: ConfigPath,
74
75 pub frontend_nodes: usize,
77
78 pub compute_nodes: usize,
80
81 pub meta_nodes: usize,
83
84 pub compactor_nodes: usize,
86
87 pub compute_node_cores: usize,
91
92 pub per_session_queries: Arc<Vec<String>>,
94
95 pub compute_resource_groups: HashMap<usize, String>,
97
98 pub compute_node_roles: HashMap<usize, String>,
101}
102
103impl Default for Configuration {
104 fn default() -> Self {
105 let config_path = {
106 let mut file =
107 tempfile::NamedTempFile::new().expect("failed to create temp config file");
108
109 let config_data = r#"
110[server]
111telemetry_enabled = false
112metrics_level = "Disabled"
113"#
114 .to_owned();
115 file.write_all(config_data.as_bytes())
116 .expect("failed to write config file");
117 file.into_temp_path()
118 };
119
120 Configuration {
121 config_path: ConfigPath::Temp(config_path.into()),
122 frontend_nodes: 1,
123 compute_nodes: 1,
124 meta_nodes: 1,
125 compactor_nodes: 1,
126 compute_node_cores: 1,
127 per_session_queries: vec![].into(),
128 compute_resource_groups: Default::default(),
129 compute_node_roles: Default::default(),
130 }
131 }
132}
133
134impl Configuration {
135 pub fn for_scale() -> Self {
137 let config_path = {
140 let mut file =
141 tempfile::NamedTempFile::new().expect("failed to create temp config file");
142 file.write_all(include_bytes!("risingwave-scale.toml"))
143 .expect("failed to write config file");
144 file.into_temp_path()
145 };
146
147 Configuration {
148 config_path: ConfigPath::Temp(config_path.into()),
149 frontend_nodes: 2,
150 compute_nodes: 3,
151 meta_nodes: 1,
152 compactor_nodes: 2,
153 compute_node_cores: 2,
154 per_session_queries: vec![].into(),
155 ..Default::default()
156 }
157 }
158
159 pub fn for_scale_no_shuffle() -> Self {
162 let mut conf = Self::for_scale();
163 conf.per_session_queries = vec![
164 "SET STREAMING_USE_ARRANGEMENT_BACKFILL = false;".into(),
165 "SET STREAMING_USE_SNAPSHOT_BACKFILL = false;".into(),
166 ]
167 .into();
168 conf
169 }
170
171 pub fn for_scale_shared_source() -> Self {
172 let mut conf = Self::for_scale();
173 conf.per_session_queries = vec!["SET STREAMING_USE_SHARED_SOURCE = true;".into()].into();
174 conf
175 }
176
177 pub fn for_auto_parallelism(
178 max_heartbeat_interval_secs: u64,
179 enable_auto_parallelism: bool,
180 ) -> Self {
181 let disable_automatic_parallelism_control = !enable_auto_parallelism;
182
183 let config_path = {
184 let mut file =
185 tempfile::NamedTempFile::new().expect("failed to create temp config file");
186
187 let config_data = format!(
188 r#"[meta]
189max_heartbeat_interval_secs = {max_heartbeat_interval_secs}
190disable_automatic_parallelism_control = {disable_automatic_parallelism_control}
191parallelism_control_trigger_first_delay_sec = 0
192parallelism_control_batch_size = 10
193parallelism_control_trigger_period_sec = 10
194
195[system]
196barrier_interval_ms = 250
197checkpoint_frequency = 4
198
199[server]
200telemetry_enabled = false
201metrics_level = "Disabled"
202"#
203 );
204 file.write_all(config_data.as_bytes())
205 .expect("failed to write config file");
206 file.into_temp_path()
207 };
208
209 Configuration {
210 config_path: ConfigPath::Temp(config_path.into()),
211 frontend_nodes: 1,
212 compute_nodes: 3,
213 meta_nodes: 1,
214 compactor_nodes: 1,
215 compute_node_cores: 2,
216 per_session_queries: vec![
217 "create view if not exists table_parallelism as select t.name, tf.parallelism from rw_tables t, rw_table_fragments tf where t.id = tf.table_id;".into(),
218 "create view if not exists mview_parallelism as select m.name, tf.parallelism from rw_materialized_views m, rw_table_fragments tf where m.id = tf.table_id;".into(),
219 ]
220 .into(),
221 ..Default::default()
222 }
223 }
224
225 pub fn for_default_parallelism(default_parallelism: usize) -> Self {
226 let config_path = {
227 let mut file =
228 tempfile::NamedTempFile::new().expect("failed to create temp config file");
229
230 let config_data = format!(
231 r#"
232[server]
233telemetry_enabled = false
234metrics_level = "Disabled"
235[meta]
236default_parallelism = {default_parallelism}
237"#
238 );
239 file.write_all(config_data.as_bytes())
240 .expect("failed to write config file");
241 file.into_temp_path()
242 };
243
244 Configuration {
245 config_path: ConfigPath::Temp(config_path.into()),
246 frontend_nodes: 1,
247 compute_nodes: 1,
248 meta_nodes: 1,
249 compactor_nodes: 1,
250 compute_node_cores: default_parallelism * 2,
251 per_session_queries: vec![].into(),
252 compute_resource_groups: Default::default(),
253 compute_node_roles: Default::default(),
254 }
255 }
256
257 pub fn for_backfill() -> Self {
259 let config_path = {
262 let mut file =
263 tempfile::NamedTempFile::new().expect("failed to create temp config file");
264 file.write_all(include_bytes!("backfill.toml"))
265 .expect("failed to write config file");
266 file.into_temp_path()
267 };
268
269 Configuration {
270 config_path: ConfigPath::Temp(config_path.into()),
271 frontend_nodes: 1,
272 compute_nodes: 1,
273 meta_nodes: 1,
274 compactor_nodes: 1,
275 compute_node_cores: 4,
276 ..Default::default()
277 }
278 }
279
280 pub fn for_arrangement_backfill() -> Self {
281 let config_path = {
284 let mut file =
285 tempfile::NamedTempFile::new().expect("failed to create temp config file");
286 file.write_all(include_bytes!("arrangement_backfill.toml"))
287 .expect("failed to write config file");
288 file.into_temp_path()
289 };
290
291 Configuration {
292 config_path: ConfigPath::Temp(config_path.into()),
293 frontend_nodes: 1,
294 compute_nodes: 3,
295 meta_nodes: 1,
296 compactor_nodes: 1,
297 compute_node_cores: 1,
298 per_session_queries: vec![
299 "SET STREAMING_USE_ARRANGEMENT_BACKFILL = true;".into(),
300 "SET STREAMING_USE_SNAPSHOT_BACKFILL = false;".into(),
301 ]
302 .into(),
303 ..Default::default()
304 }
305 }
306
307 pub fn for_background_ddl() -> Self {
308 let config_path = {
311 let mut file =
312 tempfile::NamedTempFile::new().expect("failed to create temp config file");
313 file.write_all(include_bytes!("background_ddl.toml"))
314 .expect("failed to write config file");
315 file.into_temp_path()
316 };
317
318 Configuration {
319 config_path: ConfigPath::Temp(config_path.into()),
320 frontend_nodes: 1,
326 compute_nodes: 3,
327 meta_nodes: 1,
328 compactor_nodes: 2,
329 compute_node_cores: 2,
330 ..Default::default()
331 }
332 }
333
334 pub fn enable_arrangement_backfill() -> Self {
335 let config_path = {
336 let mut file =
337 tempfile::NamedTempFile::new().expect("failed to create temp config file");
338 file.write_all(include_bytes!("disable_arrangement_backfill.toml"))
339 .expect("failed to write config file");
340 file.into_temp_path()
341 };
342 Configuration {
343 config_path: ConfigPath::Temp(config_path.into()),
344 frontend_nodes: 1,
345 compute_nodes: 1,
346 meta_nodes: 1,
347 compactor_nodes: 1,
348 compute_node_cores: 1,
349 per_session_queries: vec![].into(),
350 ..Default::default()
351 }
352 }
353
354 pub fn total_streaming_cores(&self) -> u32 {
356 (self.compute_nodes * self.compute_node_cores) as u32
357 }
358}
359
360pub struct Cluster {
376 config: Configuration,
377 handle: Handle,
378 #[cfg(madsim)]
379 pub(crate) client: NodeHandle,
380 #[cfg(madsim)]
381 pub(crate) ctl: NodeHandle,
382 #[cfg(madsim)]
383 pub(crate) sqlite_file_handle: NamedTempFile,
384}
385
386impl Cluster {
387 #[cfg_or_panic(madsim)]
391 pub async fn start(conf: Configuration) -> Result<Self> {
392 use madsim::net::ipvs::*;
393
394 let handle = madsim::runtime::Handle::current();
395 println!("seed = {}", handle.seed());
396 println!("{:#?}", conf);
397
398 assert_eq!(conf.meta_nodes, 1);
400
401 let net = madsim::net::NetSim::current();
403 for i in 1..=conf.meta_nodes {
404 net.add_dns_record(
405 &format!("meta-{i}"),
406 format!("192.168.1.{i}").parse().unwrap(),
407 );
408 }
409
410 net.add_dns_record("frontend", "192.168.2.0".parse().unwrap());
411 net.add_dns_record("message_queue", "192.168.11.1".parse().unwrap());
412 net.global_ipvs().add_service(
413 ServiceAddr::Tcp("192.168.2.0:4566".into()),
414 Scheduler::RoundRobin,
415 );
416 for i in 1..=conf.frontend_nodes {
417 net.global_ipvs().add_server(
418 ServiceAddr::Tcp("192.168.2.0:4566".into()),
419 &format!("192.168.2.{i}:4566"),
420 )
421 }
422
423 handle
425 .create_node()
426 .name("kafka-broker")
427 .ip("192.168.11.1".parse().unwrap())
428 .init(move || async move {
429 rdkafka::SimBroker::default()
430 .serve("0.0.0.0:29092".parse().unwrap())
431 .await
432 })
433 .build();
434
435 handle
437 .create_node()
438 .name("object_store_sim")
439 .ip("192.168.12.1".parse().unwrap())
440 .init(move || async move {
441 ObjectStoreSimServer::builder()
442 .serve("0.0.0.0:9301".parse().unwrap())
443 .await
444 })
445 .build();
446
447 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
449
450 let mut meta_addrs = vec![];
451 for i in 1..=conf.meta_nodes {
452 meta_addrs.push(format!("http://meta-{i}:5690"));
453 }
454 unsafe { std::env::set_var("RW_META_ADDR", meta_addrs.join(",")) };
455
456 let sqlite_file_handle: NamedTempFile = NamedTempFile::new().unwrap();
457 let file_path = sqlite_file_handle.path().display().to_string();
458 tracing::info!(?file_path, "sqlite_file_path");
459 let sql_endpoint = format!("sqlite://{}?mode=rwc", file_path);
460 let backend_args = vec!["--backend", "sql", "--sql-endpoint", &sql_endpoint];
461
462 for i in 1..=conf.meta_nodes {
464 let args = [
465 "meta-node",
466 "--config-path",
467 conf.config_path.as_str(),
468 "--listen-addr",
469 "0.0.0.0:5690",
470 "--advertise-addr",
471 &format!("meta-{i}:5690"),
472 "--state-store",
473 "hummock+sim://hummockadmin:hummockadmin@192.168.12.1:9301/hummock001",
474 "--data-directory",
475 "hummock_001",
476 "--temp-secret-file-dir",
477 &format!("./secrets/meta-{i}"),
478 ];
479 let args = args.into_iter().chain(backend_args.clone().into_iter());
480 let opts = risingwave_meta_node::MetaNodeOpts::parse_from(args);
481 handle
482 .create_node()
483 .name(format!("meta-{i}"))
484 .ip([192, 168, 1, i as u8].into())
485 .init(move || {
486 risingwave_meta_node::start(
487 opts.clone(),
488 CancellationToken::new(), )
490 })
491 .build();
492 }
493
494 tokio::time::sleep(std::time::Duration::from_secs(15)).await;
496
497 for i in 1..=conf.frontend_nodes {
499 let opts = risingwave_frontend::FrontendOpts::parse_from([
500 "frontend-node",
501 "--config-path",
502 conf.config_path.as_str(),
503 "--listen-addr",
504 "0.0.0.0:4566",
505 "--health-check-listener-addr",
506 "0.0.0.0:6786",
507 "--advertise-addr",
508 &format!("192.168.2.{i}:4566"),
509 "--temp-secret-file-dir",
510 &format!("./secrets/frontend-{i}"),
511 ]);
512 handle
513 .create_node()
514 .name(format!("frontend-{i}"))
515 .ip([192, 168, 2, i as u8].into())
516 .init(move || {
517 risingwave_frontend::start(
518 opts.clone(),
519 CancellationToken::new(), )
521 })
522 .build();
523 }
524
525 for i in 1..=conf.compute_nodes {
527 let opts = risingwave_compute::ComputeNodeOpts::parse_from([
528 "compute-node",
529 "--config-path",
530 conf.config_path.as_str(),
531 "--listen-addr",
532 "0.0.0.0:5688",
533 "--advertise-addr",
534 &format!("192.168.3.{i}:5688"),
535 "--total-memory-bytes",
536 "6979321856",
537 "--parallelism",
538 &conf.compute_node_cores.to_string(),
539 "--temp-secret-file-dir",
540 &format!("./secrets/compute-{i}"),
541 "--resource-group",
542 &conf
543 .compute_resource_groups
544 .get(&i)
545 .cloned()
546 .unwrap_or(DEFAULT_RESOURCE_GROUP.to_string()),
547 "--role",
548 &conf
549 .compute_node_roles
550 .get(&i)
551 .cloned()
552 .unwrap_or("both".to_string()),
553 ]);
554 handle
555 .create_node()
556 .name(format!("compute-{i}"))
557 .ip([192, 168, 3, i as u8].into())
558 .cores(conf.compute_node_cores)
559 .init(move || {
560 risingwave_compute::start(
561 opts.clone(),
562 CancellationToken::new(), )
564 })
565 .build();
566 }
567
568 for i in 1..=conf.compactor_nodes {
570 let opts = risingwave_compactor::CompactorOpts::parse_from([
571 "compactor-node",
572 "--config-path",
573 conf.config_path.as_str(),
574 "--listen-addr",
575 "0.0.0.0:6660",
576 "--advertise-addr",
577 &format!("192.168.4.{i}:6660"),
578 ]);
579 handle
580 .create_node()
581 .name(format!("compactor-{i}"))
582 .ip([192, 168, 4, i as u8].into())
583 .init(move || {
584 risingwave_compactor::start(
585 opts.clone(),
586 CancellationToken::new(), )
588 })
589 .build();
590 }
591
592 tokio::time::sleep(Duration::from_secs(15)).await;
594
595 let client = handle
597 .create_node()
598 .name("client")
599 .ip([192, 168, 100, 1].into())
600 .build();
601
602 let ctl = handle
604 .create_node()
605 .name("ctl")
606 .ip([192, 168, 101, 1].into())
607 .build();
608
609 Ok(Self {
610 config: conf,
611 handle,
612 client,
613 ctl,
614 sqlite_file_handle,
615 })
616 }
617
618 #[cfg_or_panic(madsim)]
619 fn per_session_queries(&self) -> Arc<Vec<String>> {
620 self.config.per_session_queries.clone()
621 }
622
623 #[cfg_or_panic(madsim)]
625 pub fn start_session(&mut self) -> Session {
626 let (query_tx, mut query_rx) = mpsc::channel::<SessionRequest>(0);
627 let per_session_queries = self.per_session_queries();
628
629 self.client.spawn(async move {
630 let mut client = RisingWave::connect("frontend".into(), "dev".into()).await?;
631
632 for sql in per_session_queries.as_ref() {
633 client.run(sql).await?;
634 }
635 drop(per_session_queries);
636
637 while let Some((sql, tx)) = query_rx.next().await {
638 let result = client
639 .run(&sql)
640 .await
641 .map(|output| match output {
642 sqllogictest::DBOutput::Rows { rows, .. } => rows
643 .into_iter()
644 .map(|row| {
645 row.into_iter()
646 .map(|v| v.to_string())
647 .collect::<Vec<_>>()
648 .join(" ")
649 })
650 .collect::<Vec<_>>()
651 .join("\n"),
652 _ => "".to_string(),
653 })
654 .map_err(Into::into);
655
656 let _ = tx.send(result);
657 }
658
659 Ok::<_, anyhow::Error>(())
660 });
661
662 Session { query_tx }
663 }
664
665 pub async fn run(&mut self, sql: impl Into<String>) -> Result<String> {
670 self.start_session().run(sql).await
671 }
672
673 #[cfg_or_panic(madsim)]
675 pub async fn run_on_client<F>(&self, future: F) -> F::Output
676 where
677 F: Future + Send + 'static,
678 F::Output: Send + 'static,
679 {
680 self.client.spawn(future).await.unwrap()
681 }
682
683 pub async fn get_random_worker_nodes(&self, n: usize) -> Result<Vec<WorkerNode>> {
684 let worker_nodes = self.get_cluster_info().await?.get_worker_nodes().clone();
685 if worker_nodes.len() < n {
686 return Err(anyhow!("cannot remove more nodes than present"));
687 }
688 let rand_nodes = worker_nodes
689 .iter()
690 .choose_multiple(&mut rand::rng(), n)
691 .clone();
692 Ok(rand_nodes.iter().cloned().cloned().collect_vec())
693 }
694
695 pub async fn wait_until(
697 &mut self,
698 sql: impl Into<String> + Clone,
699 mut p: impl FnMut(&str) -> bool,
700 interval: Duration,
701 timeout: Duration,
702 ) -> Result<String> {
703 let fut = async move {
704 let mut interval = tokio::time::interval(interval);
705 loop {
706 interval.tick().await;
707 let result = self.run(sql.clone()).await?;
708 if p(&result) {
709 return Ok::<_, anyhow::Error>(result);
710 }
711 }
712 };
713
714 match tokio::time::timeout(timeout, fut).await {
715 Ok(r) => Ok(r?),
716 Err(_) => bail!("wait_until timeout"),
717 }
718 }
719
720 pub async fn wait_until_non_empty(
722 &mut self,
723 sql: &str,
724 interval: Duration,
725 timeout: Duration,
726 ) -> Result<String> {
727 self.wait_until(sql, |r| !r.trim().is_empty(), interval, timeout)
728 .await
729 }
730
731 pub async fn kill_node(&self, opts: &KillOpts) {
734 let mut nodes = vec![];
735 if opts.kill_meta {
736 let rand = rand::rng().random_range(0..3);
737 for i in 1..=self.config.meta_nodes {
738 match rand {
739 0 => break, 1 => {} _ if !rand::rng().random_bool(0.5) => continue, _ => {}
743 }
744 nodes.push(format!("meta-{}", i));
745 }
746 if nodes.len() == self.config.meta_nodes {
748 nodes.truncate(1);
749 }
750 }
751 if opts.kill_frontend {
752 let rand = rand::rng().random_range(0..3);
753 for i in 1..=self.config.frontend_nodes {
754 match rand {
755 0 => break, 1 => {} _ if !rand::rng().random_bool(0.5) => continue, _ => {}
759 }
760 nodes.push(format!("frontend-{}", i));
761 }
762 }
763 if opts.kill_compute {
764 let rand = rand::rng().random_range(0..3);
765 for i in 1..=self.config.compute_nodes {
766 match rand {
767 0 => break, 1 => {} _ if !rand::rng().random_bool(0.5) => continue, _ => {}
771 }
772 nodes.push(format!("compute-{}", i));
773 }
774 }
775 if opts.kill_compactor {
776 let rand = rand::rng().random_range(0..3);
777 for i in 1..=self.config.compactor_nodes {
778 match rand {
779 0 => break, 1 => {} _ if !rand::rng().random_bool(0.5) => continue, _ => {}
783 }
784 nodes.push(format!("compactor-{}", i));
785 }
786 }
787
788 self.kill_nodes(nodes, opts.restart_delay_secs).await
789 }
790
791 #[cfg_or_panic(madsim)]
794 pub async fn kill_nodes(
795 &self,
796 nodes: impl IntoIterator<Item = impl AsRef<str>>,
797 restart_delay_secs: u32,
798 ) {
799 join_all(nodes.into_iter().map(|name| async move {
800 let name = name.as_ref();
801 let t = rand::rng().random_range(Duration::from_secs(0)..Duration::from_secs(1));
802 tokio::time::sleep(t).await;
803 tracing::info!("kill {name}");
804 Handle::current().kill(name);
805
806 let mut t = rand::rng().random_range(Duration::from_secs(0)..Duration::from_secs(1));
807 if rand::rng().random_bool(0.1) {
810 t += Duration::from_secs(restart_delay_secs as u64);
812 }
813 tokio::time::sleep(t).await;
814 tracing::info!("restart {name}");
815 Handle::current().restart(name);
816 }))
817 .await;
818 }
819
820 #[cfg_or_panic(madsim)]
821 pub async fn kill_nodes_and_restart(
822 &self,
823 nodes: impl IntoIterator<Item = impl AsRef<str>>,
824 restart_delay_secs: u32,
825 ) {
826 join_all(nodes.into_iter().map(|name| async move {
827 let name = name.as_ref();
828 tracing::info!("kill {name}");
829 Handle::current().kill(name);
830 tokio::time::sleep(Duration::from_secs(restart_delay_secs as u64)).await;
831 tracing::info!("restart {name}");
832 Handle::current().restart(name);
833 }))
834 .await;
835 }
836
837 #[cfg_or_panic(madsim)]
838 pub async fn simple_kill_nodes(&self, nodes: impl IntoIterator<Item = impl AsRef<str>>) {
839 join_all(nodes.into_iter().map(|name| async move {
840 let name = name.as_ref();
841 tracing::info!("kill {name}");
842 Handle::current().kill(name);
843 }))
844 .await;
845 }
846
847 #[cfg_or_panic(madsim)]
848 pub async fn simple_restart_nodes(&self, nodes: impl IntoIterator<Item = impl AsRef<str>>) {
849 join_all(nodes.into_iter().map(|name| async move {
850 let name = name.as_ref();
851 tracing::info!("restart {name}");
852 Handle::current().restart(name);
853 }))
854 .await;
855 }
856
857 #[cfg_or_panic(madsim)]
859 pub async fn create_kafka_producer(&self, datadir: &str) {
860 self.handle
861 .create_node()
862 .name("kafka-producer")
863 .ip("192.168.11.2".parse().unwrap())
864 .build()
865 .spawn(crate::kafka::producer(
866 "192.168.11.1:29092",
867 datadir.to_string(),
868 ))
869 .await
870 .unwrap();
871 }
872
873 #[cfg_or_panic(madsim)]
875 pub fn create_kafka_topics(&self, topics: HashMap<String, i32>) {
876 self.handle
877 .create_node()
878 .name("kafka-topic-create")
879 .ip("192.168.11.3".parse().unwrap())
880 .build()
881 .spawn(crate::kafka::create_topics("192.168.11.1:29092", topics));
882 }
883
884 pub fn config(&self) -> Configuration {
885 self.config.clone()
886 }
887
888 pub fn handle(&self) -> &Handle {
889 &self.handle
890 }
891
892 #[cfg_or_panic(madsim)]
894 pub async fn graceful_shutdown(&self) {
895 let mut nodes = vec![];
896 let mut metas = vec![];
897 for i in 1..=self.config.meta_nodes {
898 metas.push(format!("meta-{i}"));
899 }
900 for i in 1..=self.config.frontend_nodes {
901 nodes.push(format!("frontend-{i}"));
902 }
903 for i in 1..=self.config.compute_nodes {
904 nodes.push(format!("compute-{i}"));
905 }
906 for i in 1..=self.config.compactor_nodes {
907 nodes.push(format!("compactor-{i}"));
908 }
909
910 tracing::info!("graceful shutdown");
911 let waiting_time = Duration::from_secs(10);
912 for node in &nodes {
914 self.handle.send_ctrl_c(node);
915 }
916 tokio::time::sleep(waiting_time).await;
917 for meta in &metas {
919 self.handle.send_ctrl_c(meta);
920 }
921 tokio::time::sleep(waiting_time).await;
922
923 for node in nodes.iter().chain(metas.iter()) {
925 if !self.handle.is_exit(node) {
926 panic!("failed to graceful shutdown {node} in {waiting_time:?}");
927 }
928 }
929 }
930
931 pub async fn wait_for_recovery(&mut self) -> Result<()> {
932 let timeout = Duration::from_secs(200);
933 let mut session = self.start_session();
934 tokio::time::timeout(timeout, async {
935 loop {
936 if let Ok(result) = session.run("select rw_recovery_status()").await
937 && result == "RUNNING"
938 {
939 break;
940 }
941 tokio::time::sleep(Duration::from_nanos(10)).await;
942 }
943 })
944 .await?;
945 Ok(())
946 }
947
948 pub async fn wait_for_scale(&mut self, parallelism: usize) -> Result<()> {
950 let timeout = Duration::from_secs(200);
951 let mut session = self.start_session();
952 tokio::time::timeout(timeout, async {
953 loop {
954 let parallelism_sql = format!(
955 "select count(parallelism) filter (where parallelism != {parallelism})\
956 from (select count(*) parallelism from rw_actors group by fragment_id);"
957 );
958 if let Ok(result) = session.run(¶llelism_sql).await
959 && result == "0"
960 {
961 break;
962 }
963 tokio::time::sleep(Duration::from_nanos(10)).await;
964 }
965 })
966 .await?;
967 Ok(())
968 }
969}
970
971type SessionRequest = (
972 String, oneshot::Sender<Result<String>>, );
975
976#[derive(Debug, Clone)]
978pub struct Session {
979 query_tx: mpsc::Sender<SessionRequest>,
980}
981
982impl Session {
983 pub async fn run_all(&mut self, sqls: Vec<impl Into<String>>) -> Result<Vec<String>> {
985 let mut results = Vec::with_capacity(sqls.len());
986 for sql in sqls {
987 let result = self.run(sql).await?;
988 results.push(result);
989 }
990 Ok(results)
991 }
992
993 pub async fn run(&mut self, sql: impl Into<String>) -> Result<String> {
995 let (tx, rx) = oneshot::channel();
996 self.query_tx.send((sql.into(), tx)).await?;
997 rx.await?
998 }
999
1000 pub async fn flush(&mut self) -> Result<()> {
1002 self.run("FLUSH").await?;
1003 Ok(())
1004 }
1005
1006 pub async fn is_arrangement_backfill_enabled(&mut self) -> Result<bool> {
1007 let result = self.run("show streaming_use_arrangement_backfill").await?;
1008 Ok(result == "true")
1009 }
1010}
1011
1012#[derive(Debug, Clone, Copy, PartialEq)]
1014pub struct KillOpts {
1015 pub kill_rate: f32,
1016 pub kill_meta: bool,
1017 pub kill_frontend: bool,
1018 pub kill_compute: bool,
1019 pub kill_compactor: bool,
1020 pub restart_delay_secs: u32,
1021}
1022
1023impl KillOpts {
1024 pub const ALL: Self = KillOpts {
1026 kill_rate: 1.0,
1027 kill_meta: false, kill_frontend: true,
1029 kill_compute: true,
1030 kill_compactor: true,
1031 restart_delay_secs: 20,
1032 };
1033 pub const ALL_FAST: Self = KillOpts {
1034 kill_rate: 1.0,
1035 kill_meta: false, kill_frontend: true,
1037 kill_compute: true,
1038 kill_compactor: true,
1039 restart_delay_secs: 2,
1040 };
1041}