1#![cfg_attr(not(madsim), allow(unused_imports))]
16
17use std::cmp::max;
18use std::collections::HashMap;
19use std::future::Future;
20use std::io::Write;
21use std::path::PathBuf;
22use std::sync::Arc;
23use std::time::Duration;
24
25use anyhow::{Result, anyhow, bail};
26use cfg_or_panic::cfg_or_panic;
27use clap::Parser;
28use futures::channel::{mpsc, oneshot};
29use futures::future::join_all;
30use futures::{SinkExt, StreamExt};
31use itertools::Itertools;
32#[cfg(madsim)]
33use madsim::runtime::{Handle, NodeHandle};
34use rand::Rng;
35use rand::seq::IteratorRandom;
36use risingwave_common::util::tokio_util::sync::CancellationToken;
37use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
38#[cfg(madsim)]
39use risingwave_object_store::object::sim::SimServer as ObjectStoreSimServer;
40use risingwave_pb::common::WorkerNode;
41use sqllogictest::AsyncDB;
42use tempfile::NamedTempFile;
43#[cfg(not(madsim))]
44use tokio::runtime::Handle;
45use uuid::Uuid;
46
47use crate::client::RisingWave;
48
49#[derive(Clone, Debug)]
51pub enum ConfigPath {
52 Regular(String),
54 Temp(Arc<tempfile::TempPath>),
56}
57
58impl ConfigPath {
59 pub fn as_str(&self) -> &str {
60 match self {
61 ConfigPath::Regular(s) => s,
62 ConfigPath::Temp(p) => p.as_os_str().to_str().unwrap(),
63 }
64 }
65}
66
67#[derive(Debug, Clone)]
69pub struct Configuration {
70 pub config_path: ConfigPath,
74
75 pub frontend_nodes: usize,
77
78 pub compute_nodes: usize,
80
81 pub meta_nodes: usize,
83
84 pub compactor_nodes: usize,
86
87 pub compute_node_cores: usize,
91
92 pub per_session_queries: Arc<Vec<String>>,
94
95 pub compute_resource_groups: HashMap<usize, String>,
97}
98
99impl Default for Configuration {
100 fn default() -> Self {
101 let config_path = {
102 let mut file =
103 tempfile::NamedTempFile::new().expect("failed to create temp config file");
104
105 let config_data = r#"
106[server]
107telemetry_enabled = false
108metrics_level = "Disabled"
109"#
110 .to_owned();
111 file.write_all(config_data.as_bytes())
112 .expect("failed to write config file");
113 file.into_temp_path()
114 };
115
116 Configuration {
117 config_path: ConfigPath::Temp(config_path.into()),
118 frontend_nodes: 1,
119 compute_nodes: 1,
120 meta_nodes: 1,
121 compactor_nodes: 1,
122 compute_node_cores: 1,
123 per_session_queries: vec![].into(),
124 compute_resource_groups: Default::default(),
125 }
126 }
127}
128
129impl Configuration {
130 pub fn for_scale() -> Self {
132 let config_path = {
135 let mut file =
136 tempfile::NamedTempFile::new().expect("failed to create temp config file");
137 file.write_all(include_bytes!("risingwave-scale.toml"))
138 .expect("failed to write config file");
139 file.into_temp_path()
140 };
141
142 Configuration {
143 config_path: ConfigPath::Temp(config_path.into()),
144 frontend_nodes: 2,
145 compute_nodes: 3,
146 meta_nodes: 1,
147 compactor_nodes: 2,
148 compute_node_cores: 2,
149 ..Default::default()
150 }
151 }
152
153 pub fn for_scale_no_shuffle() -> Self {
156 let mut conf = Self::for_scale();
157 conf.per_session_queries =
158 vec!["SET STREAMING_USE_ARRANGEMENT_BACKFILL = false;".into()].into();
159 conf
160 }
161
162 pub fn for_scale_shared_source() -> Self {
163 let mut conf = Self::for_scale();
164 conf.per_session_queries = vec!["SET STREAMING_USE_SHARED_SOURCE = true;".into()].into();
165 conf
166 }
167
168 pub fn for_auto_parallelism(
169 max_heartbeat_interval_secs: u64,
170 enable_auto_parallelism: bool,
171 ) -> Self {
172 let disable_automatic_parallelism_control = !enable_auto_parallelism;
173
174 let config_path = {
175 let mut file =
176 tempfile::NamedTempFile::new().expect("failed to create temp config file");
177
178 let config_data = format!(
179 r#"[meta]
180max_heartbeat_interval_secs = {max_heartbeat_interval_secs}
181disable_automatic_parallelism_control = {disable_automatic_parallelism_control}
182parallelism_control_trigger_first_delay_sec = 0
183parallelism_control_batch_size = 10
184parallelism_control_trigger_period_sec = 10
185
186[system]
187barrier_interval_ms = 250
188checkpoint_frequency = 4
189
190[server]
191telemetry_enabled = false
192metrics_level = "Disabled"
193"#
194 );
195 file.write_all(config_data.as_bytes())
196 .expect("failed to write config file");
197 file.into_temp_path()
198 };
199
200 Configuration {
201 config_path: ConfigPath::Temp(config_path.into()),
202 frontend_nodes: 1,
203 compute_nodes: 3,
204 meta_nodes: 1,
205 compactor_nodes: 1,
206 compute_node_cores: 2,
207 per_session_queries: vec![
208 "create view if not exists table_parallelism as select t.name, tf.parallelism from rw_tables t, rw_table_fragments tf where t.id = tf.table_id;".into(),
209 "create view if not exists mview_parallelism as select m.name, tf.parallelism from rw_materialized_views m, rw_table_fragments tf where m.id = tf.table_id;".into(),
210 ]
211 .into(),
212 ..Default::default()
213 }
214 }
215
216 pub fn for_default_parallelism(default_parallelism: usize) -> Self {
217 let config_path = {
218 let mut file =
219 tempfile::NamedTempFile::new().expect("failed to create temp config file");
220
221 let config_data = format!(
222 r#"
223[server]
224telemetry_enabled = false
225metrics_level = "Disabled"
226[meta]
227default_parallelism = {default_parallelism}
228"#
229 )
230 .to_owned();
231 file.write_all(config_data.as_bytes())
232 .expect("failed to write config file");
233 file.into_temp_path()
234 };
235
236 Configuration {
237 config_path: ConfigPath::Temp(config_path.into()),
238 frontend_nodes: 1,
239 compute_nodes: 1,
240 meta_nodes: 1,
241 compactor_nodes: 1,
242 compute_node_cores: default_parallelism * 2,
243 per_session_queries: vec![].into(),
244 compute_resource_groups: Default::default(),
245 }
246 }
247
248 pub fn for_backfill() -> Self {
250 let config_path = {
253 let mut file =
254 tempfile::NamedTempFile::new().expect("failed to create temp config file");
255 file.write_all(include_bytes!("backfill.toml"))
256 .expect("failed to write config file");
257 file.into_temp_path()
258 };
259
260 Configuration {
261 config_path: ConfigPath::Temp(config_path.into()),
262 frontend_nodes: 1,
263 compute_nodes: 1,
264 meta_nodes: 1,
265 compactor_nodes: 1,
266 compute_node_cores: 4,
267 ..Default::default()
268 }
269 }
270
271 pub fn for_arrangement_backfill() -> Self {
272 let config_path = {
275 let mut file =
276 tempfile::NamedTempFile::new().expect("failed to create temp config file");
277 file.write_all(include_bytes!("arrangement_backfill.toml"))
278 .expect("failed to write config file");
279 file.into_temp_path()
280 };
281
282 Configuration {
283 config_path: ConfigPath::Temp(config_path.into()),
284 frontend_nodes: 1,
285 compute_nodes: 3,
286 meta_nodes: 1,
287 compactor_nodes: 1,
288 compute_node_cores: 1,
289 per_session_queries: vec!["SET STREAMING_USE_ARRANGEMENT_BACKFILL = true;".into()]
290 .into(),
291 ..Default::default()
292 }
293 }
294
295 pub fn for_background_ddl() -> Self {
296 let config_path = {
299 let mut file =
300 tempfile::NamedTempFile::new().expect("failed to create temp config file");
301 file.write_all(include_bytes!("background_ddl.toml"))
302 .expect("failed to write config file");
303 file.into_temp_path()
304 };
305
306 Configuration {
307 config_path: ConfigPath::Temp(config_path.into()),
308 frontend_nodes: 1,
314 compute_nodes: 3,
315 meta_nodes: 1,
316 compactor_nodes: 2,
317 compute_node_cores: 2,
318 ..Default::default()
319 }
320 }
321
322 pub fn enable_arrangement_backfill() -> Self {
323 let config_path = {
324 let mut file =
325 tempfile::NamedTempFile::new().expect("failed to create temp config file");
326 file.write_all(include_bytes!("disable_arrangement_backfill.toml"))
327 .expect("failed to write config file");
328 file.into_temp_path()
329 };
330 Configuration {
331 config_path: ConfigPath::Temp(config_path.into()),
332 frontend_nodes: 1,
333 compute_nodes: 1,
334 meta_nodes: 1,
335 compactor_nodes: 1,
336 compute_node_cores: 1,
337 per_session_queries: vec![].into(),
338 ..Default::default()
339 }
340 }
341
342 pub fn total_streaming_cores(&self) -> u32 {
343 (self.compute_nodes * self.compute_node_cores) as u32
344 }
345}
346
347pub struct Cluster {
363 config: Configuration,
364 handle: Handle,
365 #[cfg(madsim)]
366 pub(crate) client: NodeHandle,
367 #[cfg(madsim)]
368 pub(crate) ctl: NodeHandle,
369 #[cfg(madsim)]
370 pub(crate) sqlite_file_handle: NamedTempFile,
371}
372
373impl Cluster {
374 #[cfg_or_panic(madsim)]
378 pub async fn start(conf: Configuration) -> Result<Self> {
379 use madsim::net::ipvs::*;
380
381 let handle = madsim::runtime::Handle::current();
382 println!("seed = {}", handle.seed());
383 println!("{:#?}", conf);
384
385 assert_eq!(conf.meta_nodes, 1);
387
388 let net = madsim::net::NetSim::current();
390 for i in 1..=conf.meta_nodes {
391 net.add_dns_record(
392 &format!("meta-{i}"),
393 format!("192.168.1.{i}").parse().unwrap(),
394 );
395 }
396
397 net.add_dns_record("frontend", "192.168.2.0".parse().unwrap());
398 net.add_dns_record("message_queue", "192.168.11.1".parse().unwrap());
399 net.global_ipvs().add_service(
400 ServiceAddr::Tcp("192.168.2.0:4566".into()),
401 Scheduler::RoundRobin,
402 );
403 for i in 1..=conf.frontend_nodes {
404 net.global_ipvs().add_server(
405 ServiceAddr::Tcp("192.168.2.0:4566".into()),
406 &format!("192.168.2.{i}:4566"),
407 )
408 }
409
410 handle
412 .create_node()
413 .name("kafka-broker")
414 .ip("192.168.11.1".parse().unwrap())
415 .init(move || async move {
416 rdkafka::SimBroker::default()
417 .serve("0.0.0.0:29092".parse().unwrap())
418 .await
419 })
420 .build();
421
422 handle
424 .create_node()
425 .name("object_store_sim")
426 .ip("192.168.12.1".parse().unwrap())
427 .init(move || async move {
428 ObjectStoreSimServer::builder()
429 .serve("0.0.0.0:9301".parse().unwrap())
430 .await
431 })
432 .build();
433
434 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
436
437 let mut meta_addrs = vec![];
438 for i in 1..=conf.meta_nodes {
439 meta_addrs.push(format!("http://meta-{i}:5690"));
440 }
441 unsafe { std::env::set_var("RW_META_ADDR", meta_addrs.join(",")) };
442
443 let sqlite_file_handle: NamedTempFile = NamedTempFile::new().unwrap();
444 let file_path = sqlite_file_handle.path().display().to_string();
445 tracing::info!(?file_path, "sqlite_file_path");
446 let sql_endpoint = format!("sqlite://{}?mode=rwc", file_path);
447 let backend_args = vec!["--backend", "sql", "--sql-endpoint", &sql_endpoint];
448
449 for i in 1..=conf.meta_nodes {
451 let args = [
452 "meta-node",
453 "--config-path",
454 conf.config_path.as_str(),
455 "--listen-addr",
456 "0.0.0.0:5690",
457 "--advertise-addr",
458 &format!("meta-{i}:5690"),
459 "--state-store",
460 "hummock+sim://hummockadmin:hummockadmin@192.168.12.1:9301/hummock001",
461 "--data-directory",
462 "hummock_001",
463 "--temp-secret-file-dir",
464 &format!("./secrets/meta-{i}"),
465 ];
466 let args = args.into_iter().chain(backend_args.clone().into_iter());
467 let opts = risingwave_meta_node::MetaNodeOpts::parse_from(args);
468 handle
469 .create_node()
470 .name(format!("meta-{i}"))
471 .ip([192, 168, 1, i as u8].into())
472 .init(move || {
473 risingwave_meta_node::start(
474 opts.clone(),
475 CancellationToken::new(), )
477 })
478 .build();
479 }
480
481 tokio::time::sleep(std::time::Duration::from_secs(15)).await;
483
484 for i in 1..=conf.frontend_nodes {
486 let opts = risingwave_frontend::FrontendOpts::parse_from([
487 "frontend-node",
488 "--config-path",
489 conf.config_path.as_str(),
490 "--listen-addr",
491 "0.0.0.0:4566",
492 "--health-check-listener-addr",
493 "0.0.0.0:6786",
494 "--advertise-addr",
495 &format!("192.168.2.{i}:4566"),
496 "--temp-secret-file-dir",
497 &format!("./secrets/frontend-{i}"),
498 ]);
499 handle
500 .create_node()
501 .name(format!("frontend-{i}"))
502 .ip([192, 168, 2, i as u8].into())
503 .init(move || {
504 risingwave_frontend::start(
505 opts.clone(),
506 CancellationToken::new(), )
508 })
509 .build();
510 }
511
512 for i in 1..=conf.compute_nodes {
514 let opts = risingwave_compute::ComputeNodeOpts::parse_from([
515 "compute-node",
516 "--config-path",
517 conf.config_path.as_str(),
518 "--listen-addr",
519 "0.0.0.0:5688",
520 "--advertise-addr",
521 &format!("192.168.3.{i}:5688"),
522 "--total-memory-bytes",
523 "6979321856",
524 "--parallelism",
525 &conf.compute_node_cores.to_string(),
526 "--temp-secret-file-dir",
527 &format!("./secrets/compute-{i}"),
528 "--resource-group",
529 &conf
530 .compute_resource_groups
531 .get(&i)
532 .cloned()
533 .unwrap_or(DEFAULT_RESOURCE_GROUP.to_string()),
534 ]);
535 handle
536 .create_node()
537 .name(format!("compute-{i}"))
538 .ip([192, 168, 3, i as u8].into())
539 .cores(conf.compute_node_cores)
540 .init(move || {
541 risingwave_compute::start(
542 opts.clone(),
543 CancellationToken::new(), )
545 })
546 .build();
547 }
548
549 for i in 1..=conf.compactor_nodes {
551 let opts = risingwave_compactor::CompactorOpts::parse_from([
552 "compactor-node",
553 "--config-path",
554 conf.config_path.as_str(),
555 "--listen-addr",
556 "0.0.0.0:6660",
557 "--advertise-addr",
558 &format!("192.168.4.{i}:6660"),
559 ]);
560 handle
561 .create_node()
562 .name(format!("compactor-{i}"))
563 .ip([192, 168, 4, i as u8].into())
564 .init(move || {
565 risingwave_compactor::start(
566 opts.clone(),
567 CancellationToken::new(), )
569 })
570 .build();
571 }
572
573 tokio::time::sleep(Duration::from_secs(15)).await;
575
576 let client = handle
578 .create_node()
579 .name("client")
580 .ip([192, 168, 100, 1].into())
581 .build();
582
583 let ctl = handle
585 .create_node()
586 .name("ctl")
587 .ip([192, 168, 101, 1].into())
588 .build();
589
590 Ok(Self {
591 config: conf,
592 handle,
593 client,
594 ctl,
595 sqlite_file_handle,
596 })
597 }
598
599 #[cfg_or_panic(madsim)]
600 fn per_session_queries(&self) -> Arc<Vec<String>> {
601 self.config.per_session_queries.clone()
602 }
603
604 #[cfg_or_panic(madsim)]
606 pub fn start_session(&mut self) -> Session {
607 let (query_tx, mut query_rx) = mpsc::channel::<SessionRequest>(0);
608 let per_session_queries = self.per_session_queries();
609
610 self.client.spawn(async move {
611 let mut client = RisingWave::connect("frontend".into(), "dev".into()).await?;
612
613 for sql in per_session_queries.as_ref() {
614 client.run(sql).await?;
615 }
616 drop(per_session_queries);
617
618 while let Some((sql, tx)) = query_rx.next().await {
619 let result = client
620 .run(&sql)
621 .await
622 .map(|output| match output {
623 sqllogictest::DBOutput::Rows { rows, .. } => rows
624 .into_iter()
625 .map(|row| {
626 row.into_iter()
627 .map(|v| v.to_string())
628 .collect::<Vec<_>>()
629 .join(" ")
630 })
631 .collect::<Vec<_>>()
632 .join("\n"),
633 _ => "".to_string(),
634 })
635 .map_err(Into::into);
636
637 let _ = tx.send(result);
638 }
639
640 Ok::<_, anyhow::Error>(())
641 });
642
643 Session { query_tx }
644 }
645
646 pub async fn run(&mut self, sql: impl Into<String>) -> Result<String> {
651 self.start_session().run(sql).await
652 }
653
654 #[cfg_or_panic(madsim)]
656 pub async fn run_on_client<F>(&self, future: F) -> F::Output
657 where
658 F: Future + Send + 'static,
659 F::Output: Send + 'static,
660 {
661 self.client.spawn(future).await.unwrap()
662 }
663
664 pub async fn get_random_worker_nodes(&self, n: usize) -> Result<Vec<WorkerNode>> {
665 let worker_nodes = self.get_cluster_info().await?.get_worker_nodes().clone();
666 if worker_nodes.len() < n {
667 return Err(anyhow!("cannot remove more nodes than present"));
668 }
669 let rand_nodes = worker_nodes
670 .iter()
671 .choose_multiple(&mut rand::rng(), n)
672 .to_vec();
673 Ok(rand_nodes.iter().cloned().cloned().collect_vec())
674 }
675
676 pub async fn wait_until(
678 &mut self,
679 sql: impl Into<String> + Clone,
680 mut p: impl FnMut(&str) -> bool,
681 interval: Duration,
682 timeout: Duration,
683 ) -> Result<String> {
684 let fut = async move {
685 let mut interval = tokio::time::interval(interval);
686 loop {
687 interval.tick().await;
688 let result = self.run(sql.clone()).await?;
689 if p(&result) {
690 return Ok::<_, anyhow::Error>(result);
691 }
692 }
693 };
694
695 match tokio::time::timeout(timeout, fut).await {
696 Ok(r) => Ok(r?),
697 Err(_) => bail!("wait_until timeout"),
698 }
699 }
700
701 pub async fn wait_until_non_empty(
703 &mut self,
704 sql: &str,
705 interval: Duration,
706 timeout: Duration,
707 ) -> Result<String> {
708 self.wait_until(sql, |r| !r.trim().is_empty(), interval, timeout)
709 .await
710 }
711
712 pub async fn kill_node(&self, opts: &KillOpts) {
715 let mut nodes = vec![];
716 if opts.kill_meta {
717 let rand = rand::rng().random_range(0..3);
718 for i in 1..=self.config.meta_nodes {
719 match rand {
720 0 => break, 1 => {} _ if !rand::rng().random_bool(0.5) => continue, _ => {}
724 }
725 nodes.push(format!("meta-{}", i));
726 }
727 if nodes.len() == self.config.meta_nodes {
729 nodes.truncate(1);
730 }
731 }
732 if opts.kill_frontend {
733 let rand = rand::rng().random_range(0..3);
734 for i in 1..=self.config.frontend_nodes {
735 match rand {
736 0 => break, 1 => {} _ if !rand::rng().random_bool(0.5) => continue, _ => {}
740 }
741 nodes.push(format!("frontend-{}", i));
742 }
743 }
744 if opts.kill_compute {
745 let rand = rand::rng().random_range(0..3);
746 for i in 1..=self.config.compute_nodes {
747 match rand {
748 0 => break, 1 => {} _ if !rand::rng().random_bool(0.5) => continue, _ => {}
752 }
753 nodes.push(format!("compute-{}", i));
754 }
755 }
756 if opts.kill_compactor {
757 let rand = rand::rng().random_range(0..3);
758 for i in 1..=self.config.compactor_nodes {
759 match rand {
760 0 => break, 1 => {} _ if !rand::rng().random_bool(0.5) => continue, _ => {}
764 }
765 nodes.push(format!("compactor-{}", i));
766 }
767 }
768
769 self.kill_nodes(nodes, opts.restart_delay_secs).await
770 }
771
772 #[cfg_or_panic(madsim)]
775 pub async fn kill_nodes(
776 &self,
777 nodes: impl IntoIterator<Item = impl AsRef<str>>,
778 restart_delay_secs: u32,
779 ) {
780 join_all(nodes.into_iter().map(|name| async move {
781 let name = name.as_ref();
782 let t = rand::rng().random_range(Duration::from_secs(0)..Duration::from_secs(1));
783 tokio::time::sleep(t).await;
784 tracing::info!("kill {name}");
785 Handle::current().kill(name);
786
787 let mut t = rand::rng().random_range(Duration::from_secs(0)..Duration::from_secs(1));
788 if rand::rng().random_bool(0.1) {
791 t += Duration::from_secs(restart_delay_secs as u64);
793 }
794 tokio::time::sleep(t).await;
795 tracing::info!("restart {name}");
796 Handle::current().restart(name);
797 }))
798 .await;
799 }
800
801 #[cfg_or_panic(madsim)]
802 pub async fn kill_nodes_and_restart(
803 &self,
804 nodes: impl IntoIterator<Item = impl AsRef<str>>,
805 restart_delay_secs: u32,
806 ) {
807 join_all(nodes.into_iter().map(|name| async move {
808 let name = name.as_ref();
809 tracing::info!("kill {name}");
810 Handle::current().kill(name);
811 tokio::time::sleep(Duration::from_secs(restart_delay_secs as u64)).await;
812 tracing::info!("restart {name}");
813 Handle::current().restart(name);
814 }))
815 .await;
816 }
817
818 #[cfg_or_panic(madsim)]
819 pub async fn simple_kill_nodes(&self, nodes: impl IntoIterator<Item = impl AsRef<str>>) {
820 join_all(nodes.into_iter().map(|name| async move {
821 let name = name.as_ref();
822 tracing::info!("kill {name}");
823 Handle::current().kill(name);
824 }))
825 .await;
826 }
827
828 #[cfg_or_panic(madsim)]
829 pub async fn simple_restart_nodes(&self, nodes: impl IntoIterator<Item = impl AsRef<str>>) {
830 join_all(nodes.into_iter().map(|name| async move {
831 let name = name.as_ref();
832 tracing::info!("restart {name}");
833 Handle::current().restart(name);
834 }))
835 .await;
836 }
837
838 #[cfg_or_panic(madsim)]
840 pub async fn create_kafka_producer(&self, datadir: &str) {
841 self.handle
842 .create_node()
843 .name("kafka-producer")
844 .ip("192.168.11.2".parse().unwrap())
845 .build()
846 .spawn(crate::kafka::producer(
847 "192.168.11.1:29092",
848 datadir.to_string(),
849 ))
850 .await
851 .unwrap();
852 }
853
854 #[cfg_or_panic(madsim)]
856 pub fn create_kafka_topics(&self, topics: HashMap<String, i32>) {
857 self.handle
858 .create_node()
859 .name("kafka-topic-create")
860 .ip("192.168.11.3".parse().unwrap())
861 .build()
862 .spawn(crate::kafka::create_topics("192.168.11.1:29092", topics));
863 }
864
865 pub fn config(&self) -> Configuration {
866 self.config.clone()
867 }
868
869 pub fn handle(&self) -> &Handle {
870 &self.handle
871 }
872
873 #[cfg_or_panic(madsim)]
875 pub async fn graceful_shutdown(&self) {
876 let mut nodes = vec![];
877 let mut metas = vec![];
878 for i in 1..=self.config.meta_nodes {
879 metas.push(format!("meta-{i}"));
880 }
881 for i in 1..=self.config.frontend_nodes {
882 nodes.push(format!("frontend-{i}"));
883 }
884 for i in 1..=self.config.compute_nodes {
885 nodes.push(format!("compute-{i}"));
886 }
887 for i in 1..=self.config.compactor_nodes {
888 nodes.push(format!("compactor-{i}"));
889 }
890
891 tracing::info!("graceful shutdown");
892 let waiting_time = Duration::from_secs(10);
893 for node in &nodes {
895 self.handle.send_ctrl_c(node);
896 }
897 tokio::time::sleep(waiting_time).await;
898 for meta in &metas {
900 self.handle.send_ctrl_c(meta);
901 }
902 tokio::time::sleep(waiting_time).await;
903
904 for node in nodes.iter().chain(metas.iter()) {
906 if !self.handle.is_exit(node) {
907 panic!("failed to graceful shutdown {node} in {waiting_time:?}");
908 }
909 }
910 }
911
912 pub async fn wait_for_recovery(&mut self) -> Result<()> {
913 let timeout = Duration::from_secs(200);
914 let mut session = self.start_session();
915 tokio::time::timeout(timeout, async {
916 loop {
917 if let Ok(result) = session.run("select rw_recovery_status()").await
918 && result == "RUNNING"
919 {
920 break;
921 }
922 tokio::time::sleep(Duration::from_nanos(10)).await;
923 }
924 })
925 .await?;
926 Ok(())
927 }
928
929 pub async fn wait_for_scale(&mut self, parallelism: usize) -> Result<()> {
931 let timeout = Duration::from_secs(200);
932 let mut session = self.start_session();
933 tokio::time::timeout(timeout, async {
934 loop {
935 let parallelism_sql = format!(
936 "select count(parallelism) filter (where parallelism != {parallelism})\
937 from (select count(*) parallelism from rw_actors group by fragment_id);"
938 );
939 if let Ok(result) = session.run(¶llelism_sql).await
940 && result == "0"
941 {
942 break;
943 }
944 tokio::time::sleep(Duration::from_nanos(10)).await;
945 }
946 })
947 .await?;
948 Ok(())
949 }
950}
951
952type SessionRequest = (
953 String, oneshot::Sender<Result<String>>, );
956
957#[derive(Debug, Clone)]
959pub struct Session {
960 query_tx: mpsc::Sender<SessionRequest>,
961}
962
963impl Session {
964 pub async fn run_all(&mut self, sqls: Vec<impl Into<String>>) -> Result<Vec<String>> {
966 let mut results = Vec::with_capacity(sqls.len());
967 for sql in sqls {
968 let result = self.run(sql).await?;
969 results.push(result);
970 }
971 Ok(results)
972 }
973
974 pub async fn run(&mut self, sql: impl Into<String>) -> Result<String> {
976 let (tx, rx) = oneshot::channel();
977 self.query_tx.send((sql.into(), tx)).await?;
978 rx.await?
979 }
980
981 pub async fn flush(&mut self) -> Result<()> {
983 self.run("FLUSH").await?;
984 Ok(())
985 }
986
987 pub async fn is_arrangement_backfill_enabled(&mut self) -> Result<bool> {
988 let result = self.run("show streaming_use_arrangement_backfill").await?;
989 Ok(result == "true")
990 }
991}
992
993#[derive(Debug, Clone, Copy, PartialEq)]
995pub struct KillOpts {
996 pub kill_rate: f32,
997 pub kill_meta: bool,
998 pub kill_frontend: bool,
999 pub kill_compute: bool,
1000 pub kill_compactor: bool,
1001 pub restart_delay_secs: u32,
1002}
1003
1004impl KillOpts {
1005 pub const ALL: Self = KillOpts {
1007 kill_rate: 1.0,
1008 kill_meta: false, kill_frontend: true,
1010 kill_compute: true,
1011 kill_compactor: true,
1012 restart_delay_secs: 20,
1013 };
1014 pub const ALL_FAST: Self = KillOpts {
1015 kill_rate: 1.0,
1016 kill_meta: false, kill_frontend: true,
1018 kill_compute: true,
1019 kill_compactor: true,
1020 restart_delay_secs: 2,
1021 };
1022}