1#![cfg_attr(not(madsim), allow(unused_imports))]
16
17use std::cmp::max;
18use std::collections::HashMap;
19use std::future::Future;
20use std::io::Write;
21use std::path::PathBuf;
22use std::sync::Arc;
23use std::time::Duration;
24
25use anyhow::{Result, anyhow, bail};
26use cfg_or_panic::cfg_or_panic;
27use clap::Parser;
28use futures::channel::{mpsc, oneshot};
29use futures::future::join_all;
30use futures::{SinkExt, StreamExt};
31use itertools::Itertools;
32#[cfg(madsim)]
33use madsim::runtime::{Handle, NodeHandle};
34use rand::Rng;
35use rand::seq::IteratorRandom;
36use risingwave_common::util::tokio_util::sync::CancellationToken;
37use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
38#[cfg(madsim)]
39use risingwave_object_store::object::sim::SimServer as ObjectStoreSimServer;
40use risingwave_pb::common::WorkerNode;
41use sqllogictest::AsyncDB;
42use tempfile::NamedTempFile;
43#[cfg(not(madsim))]
44use tokio::runtime::Handle;
45use uuid::Uuid;
46
47use crate::client::RisingWave;
48
49#[derive(Clone, Debug)]
51pub enum ConfigPath {
52 Regular(String),
54 Temp(Arc<tempfile::TempPath>),
56}
57
58impl ConfigPath {
59 pub fn as_str(&self) -> &str {
60 match self {
61 ConfigPath::Regular(s) => s,
62 ConfigPath::Temp(p) => p.as_os_str().to_str().unwrap(),
63 }
64 }
65}
66
67#[derive(Debug, Clone)]
69pub struct Configuration {
70 pub config_path: ConfigPath,
74
75 pub frontend_nodes: usize,
77
78 pub compute_nodes: usize,
80
81 pub meta_nodes: usize,
83
84 pub compactor_nodes: usize,
86
87 pub compute_node_cores: usize,
91
92 pub per_session_queries: Arc<Vec<String>>,
94
95 pub compute_resource_groups: HashMap<usize, String>,
97}
98
99impl Default for Configuration {
100 fn default() -> Self {
101 let config_path = {
102 let mut file =
103 tempfile::NamedTempFile::new().expect("failed to create temp config file");
104
105 let config_data = r#"
106[server]
107telemetry_enabled = false
108metrics_level = "Disabled"
109"#
110 .to_owned();
111 file.write_all(config_data.as_bytes())
112 .expect("failed to write config file");
113 file.into_temp_path()
114 };
115
116 Configuration {
117 config_path: ConfigPath::Temp(config_path.into()),
118 frontend_nodes: 1,
119 compute_nodes: 1,
120 meta_nodes: 1,
121 compactor_nodes: 1,
122 compute_node_cores: 1,
123 per_session_queries: vec![].into(),
124 compute_resource_groups: Default::default(),
125 }
126 }
127}
128
129impl Configuration {
130 pub fn for_scale() -> Self {
132 let config_path = {
135 let mut file =
136 tempfile::NamedTempFile::new().expect("failed to create temp config file");
137 file.write_all(include_bytes!("risingwave-scale.toml"))
138 .expect("failed to write config file");
139 file.into_temp_path()
140 };
141
142 Configuration {
143 config_path: ConfigPath::Temp(config_path.into()),
144 frontend_nodes: 2,
145 compute_nodes: 3,
146 meta_nodes: 1,
147 compactor_nodes: 2,
148 compute_node_cores: 2,
149 ..Default::default()
150 }
151 }
152
153 pub fn for_scale_no_shuffle() -> Self {
156 let mut conf = Self::for_scale();
157 conf.per_session_queries =
158 vec!["SET STREAMING_USE_ARRANGEMENT_BACKFILL = false;".into()].into();
159 conf
160 }
161
162 pub fn for_scale_shared_source() -> Self {
163 let mut conf = Self::for_scale();
164 conf.per_session_queries = vec!["SET STREAMING_USE_SHARED_SOURCE = true;".into()].into();
165 conf
166 }
167
168 pub fn for_auto_parallelism(
169 max_heartbeat_interval_secs: u64,
170 enable_auto_parallelism: bool,
171 ) -> Self {
172 let disable_automatic_parallelism_control = !enable_auto_parallelism;
173
174 let config_path = {
175 let mut file =
176 tempfile::NamedTempFile::new().expect("failed to create temp config file");
177
178 let config_data = format!(
179 r#"[meta]
180max_heartbeat_interval_secs = {max_heartbeat_interval_secs}
181disable_automatic_parallelism_control = {disable_automatic_parallelism_control}
182parallelism_control_trigger_first_delay_sec = 0
183parallelism_control_batch_size = 10
184parallelism_control_trigger_period_sec = 10
185
186[system]
187barrier_interval_ms = 250
188checkpoint_frequency = 4
189
190[server]
191telemetry_enabled = false
192metrics_level = "Disabled"
193"#
194 );
195 file.write_all(config_data.as_bytes())
196 .expect("failed to write config file");
197 file.into_temp_path()
198 };
199
200 Configuration {
201 config_path: ConfigPath::Temp(config_path.into()),
202 frontend_nodes: 1,
203 compute_nodes: 3,
204 meta_nodes: 1,
205 compactor_nodes: 1,
206 compute_node_cores: 2,
207 per_session_queries: vec![
208 "create view if not exists table_parallelism as select t.name, tf.parallelism from rw_tables t, rw_table_fragments tf where t.id = tf.table_id;".into(),
209 "create view if not exists mview_parallelism as select m.name, tf.parallelism from rw_materialized_views m, rw_table_fragments tf where m.id = tf.table_id;".into(),
210 ]
211 .into(),
212 ..Default::default()
213 }
214 }
215
216 pub fn for_default_parallelism(default_parallelism: usize) -> Self {
217 let config_path = {
218 let mut file =
219 tempfile::NamedTempFile::new().expect("failed to create temp config file");
220
221 let config_data = format!(
222 r#"
223[server]
224telemetry_enabled = false
225metrics_level = "Disabled"
226[meta]
227default_parallelism = {default_parallelism}
228"#
229 )
230 .to_owned();
231 file.write_all(config_data.as_bytes())
232 .expect("failed to write config file");
233 file.into_temp_path()
234 };
235
236 Configuration {
237 config_path: ConfigPath::Temp(config_path.into()),
238 frontend_nodes: 1,
239 compute_nodes: 1,
240 meta_nodes: 1,
241 compactor_nodes: 1,
242 compute_node_cores: default_parallelism * 2,
243 per_session_queries: vec![].into(),
244 compute_resource_groups: Default::default(),
245 }
246 }
247
248 pub fn for_backfill() -> Self {
250 let config_path = {
253 let mut file =
254 tempfile::NamedTempFile::new().expect("failed to create temp config file");
255 file.write_all(include_bytes!("backfill.toml"))
256 .expect("failed to write config file");
257 file.into_temp_path()
258 };
259
260 Configuration {
261 config_path: ConfigPath::Temp(config_path.into()),
262 frontend_nodes: 1,
263 compute_nodes: 1,
264 meta_nodes: 1,
265 compactor_nodes: 1,
266 compute_node_cores: 4,
267 ..Default::default()
268 }
269 }
270
271 pub fn for_arrangement_backfill() -> Self {
272 let config_path = {
275 let mut file =
276 tempfile::NamedTempFile::new().expect("failed to create temp config file");
277 file.write_all(include_bytes!("arrangement_backfill.toml"))
278 .expect("failed to write config file");
279 file.into_temp_path()
280 };
281
282 Configuration {
283 config_path: ConfigPath::Temp(config_path.into()),
284 frontend_nodes: 1,
285 compute_nodes: 3,
286 meta_nodes: 1,
287 compactor_nodes: 1,
288 compute_node_cores: 1,
289 per_session_queries: vec!["SET STREAMING_USE_ARRANGEMENT_BACKFILL = true;".into()]
290 .into(),
291 ..Default::default()
292 }
293 }
294
295 pub fn for_background_ddl() -> Self {
296 let config_path = {
299 let mut file =
300 tempfile::NamedTempFile::new().expect("failed to create temp config file");
301 file.write_all(include_bytes!("background_ddl.toml"))
302 .expect("failed to write config file");
303 file.into_temp_path()
304 };
305
306 Configuration {
307 config_path: ConfigPath::Temp(config_path.into()),
308 frontend_nodes: 1,
314 compute_nodes: 3,
315 meta_nodes: 1,
316 compactor_nodes: 2,
317 compute_node_cores: 2,
318 ..Default::default()
319 }
320 }
321
322 pub fn enable_arrangement_backfill() -> Self {
323 let config_path = {
324 let mut file =
325 tempfile::NamedTempFile::new().expect("failed to create temp config file");
326 file.write_all(include_bytes!("disable_arrangement_backfill.toml"))
327 .expect("failed to write config file");
328 file.into_temp_path()
329 };
330 Configuration {
331 config_path: ConfigPath::Temp(config_path.into()),
332 frontend_nodes: 1,
333 compute_nodes: 1,
334 meta_nodes: 1,
335 compactor_nodes: 1,
336 compute_node_cores: 1,
337 per_session_queries: vec![].into(),
338 ..Default::default()
339 }
340 }
341}
342
343pub struct Cluster {
359 config: Configuration,
360 handle: Handle,
361 #[cfg(madsim)]
362 pub(crate) client: NodeHandle,
363 #[cfg(madsim)]
364 pub(crate) ctl: NodeHandle,
365 #[cfg(madsim)]
366 pub(crate) sqlite_file_handle: NamedTempFile,
367}
368
369impl Cluster {
370 #[cfg_or_panic(madsim)]
374 pub async fn start(conf: Configuration) -> Result<Self> {
375 use madsim::net::ipvs::*;
376
377 let handle = madsim::runtime::Handle::current();
378 println!("seed = {}", handle.seed());
379 println!("{:#?}", conf);
380
381 assert_eq!(conf.meta_nodes, 1);
383
384 let net = madsim::net::NetSim::current();
386 for i in 1..=conf.meta_nodes {
387 net.add_dns_record(
388 &format!("meta-{i}"),
389 format!("192.168.1.{i}").parse().unwrap(),
390 );
391 }
392
393 net.add_dns_record("frontend", "192.168.2.0".parse().unwrap());
394 net.add_dns_record("message_queue", "192.168.11.1".parse().unwrap());
395 net.global_ipvs().add_service(
396 ServiceAddr::Tcp("192.168.2.0:4566".into()),
397 Scheduler::RoundRobin,
398 );
399 for i in 1..=conf.frontend_nodes {
400 net.global_ipvs().add_server(
401 ServiceAddr::Tcp("192.168.2.0:4566".into()),
402 &format!("192.168.2.{i}:4566"),
403 )
404 }
405
406 handle
408 .create_node()
409 .name("kafka-broker")
410 .ip("192.168.11.1".parse().unwrap())
411 .init(move || async move {
412 rdkafka::SimBroker::default()
413 .serve("0.0.0.0:29092".parse().unwrap())
414 .await
415 })
416 .build();
417
418 handle
420 .create_node()
421 .name("object_store_sim")
422 .ip("192.168.12.1".parse().unwrap())
423 .init(move || async move {
424 ObjectStoreSimServer::builder()
425 .serve("0.0.0.0:9301".parse().unwrap())
426 .await
427 })
428 .build();
429
430 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
432
433 let mut meta_addrs = vec![];
434 for i in 1..=conf.meta_nodes {
435 meta_addrs.push(format!("http://meta-{i}:5690"));
436 }
437 unsafe { std::env::set_var("RW_META_ADDR", meta_addrs.join(",")) };
438
439 let sqlite_file_handle: NamedTempFile = NamedTempFile::new().unwrap();
440 let file_path = sqlite_file_handle.path().display().to_string();
441 tracing::info!(?file_path, "sqlite_file_path");
442 let sql_endpoint = format!("sqlite://{}?mode=rwc", file_path);
443 let backend_args = vec!["--backend", "sql", "--sql-endpoint", &sql_endpoint];
444
445 for i in 1..=conf.meta_nodes {
447 let args = [
448 "meta-node",
449 "--config-path",
450 conf.config_path.as_str(),
451 "--listen-addr",
452 "0.0.0.0:5690",
453 "--advertise-addr",
454 &format!("meta-{i}:5690"),
455 "--state-store",
456 "hummock+sim://hummockadmin:hummockadmin@192.168.12.1:9301/hummock001",
457 "--data-directory",
458 "hummock_001",
459 "--temp-secret-file-dir",
460 &format!("./secrets/meta-{i}"),
461 ];
462 let args = args.into_iter().chain(backend_args.clone().into_iter());
463 let opts = risingwave_meta_node::MetaNodeOpts::parse_from(args);
464 handle
465 .create_node()
466 .name(format!("meta-{i}"))
467 .ip([192, 168, 1, i as u8].into())
468 .init(move || {
469 risingwave_meta_node::start(
470 opts.clone(),
471 CancellationToken::new(), )
473 })
474 .build();
475 }
476
477 tokio::time::sleep(std::time::Duration::from_secs(15)).await;
479
480 for i in 1..=conf.frontend_nodes {
482 let opts = risingwave_frontend::FrontendOpts::parse_from([
483 "frontend-node",
484 "--config-path",
485 conf.config_path.as_str(),
486 "--listen-addr",
487 "0.0.0.0:4566",
488 "--health-check-listener-addr",
489 "0.0.0.0:6786",
490 "--advertise-addr",
491 &format!("192.168.2.{i}:4566"),
492 "--temp-secret-file-dir",
493 &format!("./secrets/frontend-{i}"),
494 ]);
495 handle
496 .create_node()
497 .name(format!("frontend-{i}"))
498 .ip([192, 168, 2, i as u8].into())
499 .init(move || {
500 risingwave_frontend::start(
501 opts.clone(),
502 CancellationToken::new(), )
504 })
505 .build();
506 }
507
508 for i in 1..=conf.compute_nodes {
510 let opts = risingwave_compute::ComputeNodeOpts::parse_from([
511 "compute-node",
512 "--config-path",
513 conf.config_path.as_str(),
514 "--listen-addr",
515 "0.0.0.0:5688",
516 "--advertise-addr",
517 &format!("192.168.3.{i}:5688"),
518 "--total-memory-bytes",
519 "6979321856",
520 "--parallelism",
521 &conf.compute_node_cores.to_string(),
522 "--temp-secret-file-dir",
523 &format!("./secrets/compute-{i}"),
524 "--resource-group",
525 &conf
526 .compute_resource_groups
527 .get(&i)
528 .cloned()
529 .unwrap_or(DEFAULT_RESOURCE_GROUP.to_string()),
530 ]);
531 handle
532 .create_node()
533 .name(format!("compute-{i}"))
534 .ip([192, 168, 3, i as u8].into())
535 .cores(conf.compute_node_cores)
536 .init(move || {
537 risingwave_compute::start(
538 opts.clone(),
539 CancellationToken::new(), )
541 })
542 .build();
543 }
544
545 for i in 1..=conf.compactor_nodes {
547 let opts = risingwave_compactor::CompactorOpts::parse_from([
548 "compactor-node",
549 "--config-path",
550 conf.config_path.as_str(),
551 "--listen-addr",
552 "0.0.0.0:6660",
553 "--advertise-addr",
554 &format!("192.168.4.{i}:6660"),
555 ]);
556 handle
557 .create_node()
558 .name(format!("compactor-{i}"))
559 .ip([192, 168, 4, i as u8].into())
560 .init(move || {
561 risingwave_compactor::start(
562 opts.clone(),
563 CancellationToken::new(), )
565 })
566 .build();
567 }
568
569 tokio::time::sleep(Duration::from_secs(15)).await;
571
572 let client = handle
574 .create_node()
575 .name("client")
576 .ip([192, 168, 100, 1].into())
577 .build();
578
579 let ctl = handle
581 .create_node()
582 .name("ctl")
583 .ip([192, 168, 101, 1].into())
584 .build();
585
586 Ok(Self {
587 config: conf,
588 handle,
589 client,
590 ctl,
591 sqlite_file_handle,
592 })
593 }
594
595 #[cfg_or_panic(madsim)]
596 fn per_session_queries(&self) -> Arc<Vec<String>> {
597 self.config.per_session_queries.clone()
598 }
599
600 #[cfg_or_panic(madsim)]
602 pub fn start_session(&mut self) -> Session {
603 let (query_tx, mut query_rx) = mpsc::channel::<SessionRequest>(0);
604 let per_session_queries = self.per_session_queries();
605
606 self.client.spawn(async move {
607 let mut client = RisingWave::connect("frontend".into(), "dev".into()).await?;
608
609 for sql in per_session_queries.as_ref() {
610 client.run(sql).await?;
611 }
612 drop(per_session_queries);
613
614 while let Some((sql, tx)) = query_rx.next().await {
615 let result = client
616 .run(&sql)
617 .await
618 .map(|output| match output {
619 sqllogictest::DBOutput::Rows { rows, .. } => rows
620 .into_iter()
621 .map(|row| {
622 row.into_iter()
623 .map(|v| v.to_string())
624 .collect::<Vec<_>>()
625 .join(" ")
626 })
627 .collect::<Vec<_>>()
628 .join("\n"),
629 _ => "".to_string(),
630 })
631 .map_err(Into::into);
632
633 let _ = tx.send(result);
634 }
635
636 Ok::<_, anyhow::Error>(())
637 });
638
639 Session { query_tx }
640 }
641
642 pub async fn run(&mut self, sql: impl Into<String>) -> Result<String> {
647 self.start_session().run(sql).await
648 }
649
650 #[cfg_or_panic(madsim)]
652 pub async fn run_on_client<F>(&self, future: F) -> F::Output
653 where
654 F: Future + Send + 'static,
655 F::Output: Send + 'static,
656 {
657 self.client.spawn(future).await.unwrap()
658 }
659
660 pub async fn get_random_worker_nodes(&self, n: usize) -> Result<Vec<WorkerNode>> {
661 let worker_nodes = self.get_cluster_info().await?.get_worker_nodes().clone();
662 if worker_nodes.len() < n {
663 return Err(anyhow!("cannot remove more nodes than present"));
664 }
665 let rand_nodes = worker_nodes
666 .iter()
667 .choose_multiple(&mut rand::rng(), n)
668 .to_vec();
669 Ok(rand_nodes.iter().cloned().cloned().collect_vec())
670 }
671
672 pub async fn wait_until(
674 &mut self,
675 sql: impl Into<String> + Clone,
676 mut p: impl FnMut(&str) -> bool,
677 interval: Duration,
678 timeout: Duration,
679 ) -> Result<String> {
680 let fut = async move {
681 let mut interval = tokio::time::interval(interval);
682 loop {
683 interval.tick().await;
684 let result = self.run(sql.clone()).await?;
685 if p(&result) {
686 return Ok::<_, anyhow::Error>(result);
687 }
688 }
689 };
690
691 match tokio::time::timeout(timeout, fut).await {
692 Ok(r) => Ok(r?),
693 Err(_) => bail!("wait_until timeout"),
694 }
695 }
696
697 pub async fn wait_until_non_empty(
699 &mut self,
700 sql: &str,
701 interval: Duration,
702 timeout: Duration,
703 ) -> Result<String> {
704 self.wait_until(sql, |r| !r.trim().is_empty(), interval, timeout)
705 .await
706 }
707
708 pub async fn kill_node(&self, opts: &KillOpts) {
711 let mut nodes = vec![];
712 if opts.kill_meta {
713 let rand = rand::rng().random_range(0..3);
714 for i in 1..=self.config.meta_nodes {
715 match rand {
716 0 => break, 1 => {} _ if !rand::rng().random_bool(0.5) => continue, _ => {}
720 }
721 nodes.push(format!("meta-{}", i));
722 }
723 if nodes.len() == self.config.meta_nodes {
725 nodes.truncate(1);
726 }
727 }
728 if opts.kill_frontend {
729 let rand = rand::rng().random_range(0..3);
730 for i in 1..=self.config.frontend_nodes {
731 match rand {
732 0 => break, 1 => {} _ if !rand::rng().random_bool(0.5) => continue, _ => {}
736 }
737 nodes.push(format!("frontend-{}", i));
738 }
739 }
740 if opts.kill_compute {
741 let rand = rand::rng().random_range(0..3);
742 for i in 1..=self.config.compute_nodes {
743 match rand {
744 0 => break, 1 => {} _ if !rand::rng().random_bool(0.5) => continue, _ => {}
748 }
749 nodes.push(format!("compute-{}", i));
750 }
751 }
752 if opts.kill_compactor {
753 let rand = rand::rng().random_range(0..3);
754 for i in 1..=self.config.compactor_nodes {
755 match rand {
756 0 => break, 1 => {} _ if !rand::rng().random_bool(0.5) => continue, _ => {}
760 }
761 nodes.push(format!("compactor-{}", i));
762 }
763 }
764
765 self.kill_nodes(nodes, opts.restart_delay_secs).await
766 }
767
768 #[cfg_or_panic(madsim)]
771 pub async fn kill_nodes(
772 &self,
773 nodes: impl IntoIterator<Item = impl AsRef<str>>,
774 restart_delay_secs: u32,
775 ) {
776 join_all(nodes.into_iter().map(|name| async move {
777 let name = name.as_ref();
778 let t = rand::rng().random_range(Duration::from_secs(0)..Duration::from_secs(1));
779 tokio::time::sleep(t).await;
780 tracing::info!("kill {name}");
781 Handle::current().kill(name);
782
783 let mut t = rand::rng().random_range(Duration::from_secs(0)..Duration::from_secs(1));
784 if rand::rng().random_bool(0.1) {
787 t += Duration::from_secs(restart_delay_secs as u64);
789 }
790 tokio::time::sleep(t).await;
791 tracing::info!("restart {name}");
792 Handle::current().restart(name);
793 }))
794 .await;
795 }
796
797 #[cfg_or_panic(madsim)]
798 pub async fn kill_nodes_and_restart(
799 &self,
800 nodes: impl IntoIterator<Item = impl AsRef<str>>,
801 restart_delay_secs: u32,
802 ) {
803 join_all(nodes.into_iter().map(|name| async move {
804 let name = name.as_ref();
805 tracing::info!("kill {name}");
806 Handle::current().kill(name);
807 tokio::time::sleep(Duration::from_secs(restart_delay_secs as u64)).await;
808 tracing::info!("restart {name}");
809 Handle::current().restart(name);
810 }))
811 .await;
812 }
813
814 #[cfg_or_panic(madsim)]
815 pub async fn simple_kill_nodes(&self, nodes: impl IntoIterator<Item = impl AsRef<str>>) {
816 join_all(nodes.into_iter().map(|name| async move {
817 let name = name.as_ref();
818 tracing::info!("kill {name}");
819 Handle::current().kill(name);
820 }))
821 .await;
822 }
823
824 #[cfg_or_panic(madsim)]
825 pub async fn simple_restart_nodes(&self, nodes: impl IntoIterator<Item = impl AsRef<str>>) {
826 join_all(nodes.into_iter().map(|name| async move {
827 let name = name.as_ref();
828 tracing::info!("restart {name}");
829 Handle::current().restart(name);
830 }))
831 .await;
832 }
833
834 #[cfg_or_panic(madsim)]
836 pub async fn create_kafka_producer(&self, datadir: &str) {
837 self.handle
838 .create_node()
839 .name("kafka-producer")
840 .ip("192.168.11.2".parse().unwrap())
841 .build()
842 .spawn(crate::kafka::producer(
843 "192.168.11.1:29092",
844 datadir.to_string(),
845 ))
846 .await
847 .unwrap();
848 }
849
850 #[cfg_or_panic(madsim)]
852 pub fn create_kafka_topics(&self, topics: HashMap<String, i32>) {
853 self.handle
854 .create_node()
855 .name("kafka-topic-create")
856 .ip("192.168.11.3".parse().unwrap())
857 .build()
858 .spawn(crate::kafka::create_topics("192.168.11.1:29092", topics));
859 }
860
861 pub fn config(&self) -> Configuration {
862 self.config.clone()
863 }
864
865 pub fn handle(&self) -> &Handle {
866 &self.handle
867 }
868
869 #[cfg_or_panic(madsim)]
871 pub async fn graceful_shutdown(&self) {
872 let mut nodes = vec![];
873 let mut metas = vec![];
874 for i in 1..=self.config.meta_nodes {
875 metas.push(format!("meta-{i}"));
876 }
877 for i in 1..=self.config.frontend_nodes {
878 nodes.push(format!("frontend-{i}"));
879 }
880 for i in 1..=self.config.compute_nodes {
881 nodes.push(format!("compute-{i}"));
882 }
883 for i in 1..=self.config.compactor_nodes {
884 nodes.push(format!("compactor-{i}"));
885 }
886
887 tracing::info!("graceful shutdown");
888 let waiting_time = Duration::from_secs(10);
889 for node in &nodes {
891 self.handle.send_ctrl_c(node);
892 }
893 tokio::time::sleep(waiting_time).await;
894 for meta in &metas {
896 self.handle.send_ctrl_c(meta);
897 }
898 tokio::time::sleep(waiting_time).await;
899
900 for node in nodes.iter().chain(metas.iter()) {
902 if !self.handle.is_exit(node) {
903 panic!("failed to graceful shutdown {node} in {waiting_time:?}");
904 }
905 }
906 }
907
908 pub async fn wait_for_recovery(&mut self) -> Result<()> {
909 let timeout = Duration::from_secs(200);
910 let mut session = self.start_session();
911 tokio::time::timeout(timeout, async {
912 loop {
913 if let Ok(result) = session.run("select rw_recovery_status()").await
914 && result == "RUNNING"
915 {
916 break;
917 }
918 tokio::time::sleep(Duration::from_nanos(10)).await;
919 }
920 })
921 .await?;
922 Ok(())
923 }
924
925 pub async fn wait_for_scale(&mut self, parallelism: usize) -> Result<()> {
927 let timeout = Duration::from_secs(200);
928 let mut session = self.start_session();
929 tokio::time::timeout(timeout, async {
930 loop {
931 let parallelism_sql = format!(
932 "select count(parallelism) filter (where parallelism != {parallelism})\
933 from (select count(*) parallelism from rw_actors group by fragment_id);"
934 );
935 if let Ok(result) = session.run(¶llelism_sql).await
936 && result == "0"
937 {
938 break;
939 }
940 tokio::time::sleep(Duration::from_nanos(10)).await;
941 }
942 })
943 .await?;
944 Ok(())
945 }
946}
947
948type SessionRequest = (
949 String, oneshot::Sender<Result<String>>, );
952
953#[derive(Debug, Clone)]
955pub struct Session {
956 query_tx: mpsc::Sender<SessionRequest>,
957}
958
959impl Session {
960 pub async fn run_all(&mut self, sqls: Vec<impl Into<String>>) -> Result<Vec<String>> {
962 let mut results = Vec::with_capacity(sqls.len());
963 for sql in sqls {
964 let result = self.run(sql).await?;
965 results.push(result);
966 }
967 Ok(results)
968 }
969
970 pub async fn run(&mut self, sql: impl Into<String>) -> Result<String> {
972 let (tx, rx) = oneshot::channel();
973 self.query_tx.send((sql.into(), tx)).await?;
974 rx.await?
975 }
976
977 pub async fn flush(&mut self) -> Result<()> {
979 self.run("FLUSH").await?;
980 Ok(())
981 }
982
983 pub async fn is_arrangement_backfill_enabled(&mut self) -> Result<bool> {
984 let result = self.run("show streaming_use_arrangement_backfill").await?;
985 Ok(result == "true")
986 }
987}
988
989#[derive(Debug, Clone, Copy, PartialEq)]
991pub struct KillOpts {
992 pub kill_rate: f32,
993 pub kill_meta: bool,
994 pub kill_frontend: bool,
995 pub kill_compute: bool,
996 pub kill_compactor: bool,
997 pub restart_delay_secs: u32,
998}
999
1000impl KillOpts {
1001 pub const ALL: Self = KillOpts {
1003 kill_rate: 1.0,
1004 kill_meta: false, kill_frontend: true,
1006 kill_compute: true,
1007 kill_compactor: true,
1008 restart_delay_secs: 20,
1009 };
1010 pub const ALL_FAST: Self = KillOpts {
1011 kill_rate: 1.0,
1012 kill_meta: false, kill_frontend: true,
1014 kill_compute: true,
1015 kill_compactor: true,
1016 restart_delay_secs: 2,
1017 };
1018}