1#![cfg_attr(not(madsim), allow(unused_imports))]
16
17use std::cmp::max;
18use std::collections::HashMap;
19use std::future::Future;
20use std::io::Write;
21use std::path::PathBuf;
22use std::sync::Arc;
23use std::time::Duration;
24
25use anyhow::{Result, anyhow, bail};
26use cfg_or_panic::cfg_or_panic;
27use clap::Parser;
28use futures::channel::{mpsc, oneshot};
29use futures::future::join_all;
30use futures::{SinkExt, StreamExt};
31use itertools::Itertools;
32#[cfg(madsim)]
33use madsim::runtime::{Handle, NodeHandle};
34use rand::Rng;
35use rand::seq::IteratorRandom;
36use risingwave_common::util::tokio_util::sync::CancellationToken;
37use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
38#[cfg(madsim)]
39use risingwave_object_store::object::sim::SimServer as ObjectStoreSimServer;
40use risingwave_pb::common::WorkerNode;
41use sqllogictest::AsyncDB;
42use tempfile::NamedTempFile;
43#[cfg(not(madsim))]
44use tokio::runtime::Handle;
45use uuid::Uuid;
46
47use crate::client::RisingWave;
48
49#[derive(Clone, Debug)]
51pub enum ConfigPath {
52 Regular(String),
54 Temp(Arc<tempfile::TempPath>),
56}
57
58impl ConfigPath {
59 pub fn as_str(&self) -> &str {
60 match self {
61 ConfigPath::Regular(s) => s,
62 ConfigPath::Temp(p) => p.as_os_str().to_str().unwrap(),
63 }
64 }
65}
66
67#[derive(Debug, Clone)]
69pub struct Configuration {
70 pub config_path: ConfigPath,
74
75 pub frontend_nodes: usize,
77
78 pub compute_nodes: usize,
80
81 pub meta_nodes: usize,
83
84 pub compactor_nodes: usize,
86
87 pub compute_node_cores: usize,
91
92 pub per_session_queries: Arc<Vec<String>>,
94
95 pub compute_resource_groups: HashMap<usize, String>,
97}
98
99impl Default for Configuration {
100 fn default() -> Self {
101 let config_path = {
102 let mut file =
103 tempfile::NamedTempFile::new().expect("failed to create temp config file");
104
105 let config_data = r#"
106[server]
107telemetry_enabled = false
108metrics_level = "Disabled"
109"#
110 .to_owned();
111 file.write_all(config_data.as_bytes())
112 .expect("failed to write config file");
113 file.into_temp_path()
114 };
115
116 Configuration {
117 config_path: ConfigPath::Temp(config_path.into()),
118 frontend_nodes: 1,
119 compute_nodes: 1,
120 meta_nodes: 1,
121 compactor_nodes: 1,
122 compute_node_cores: 1,
123 per_session_queries: vec![].into(),
124 compute_resource_groups: Default::default(),
125 }
126 }
127}
128
129impl Configuration {
130 pub fn for_scale() -> Self {
132 let config_path = {
135 let mut file =
136 tempfile::NamedTempFile::new().expect("failed to create temp config file");
137 file.write_all(include_bytes!("risingwave-scale.toml"))
138 .expect("failed to write config file");
139 file.into_temp_path()
140 };
141
142 Configuration {
143 config_path: ConfigPath::Temp(config_path.into()),
144 frontend_nodes: 2,
145 compute_nodes: 3,
146 meta_nodes: 1,
147 compactor_nodes: 2,
148 compute_node_cores: 2,
149 ..Default::default()
150 }
151 }
152
153 pub fn for_scale_no_shuffle() -> Self {
156 let mut conf = Self::for_scale();
157 conf.per_session_queries =
158 vec!["SET STREAMING_USE_ARRANGEMENT_BACKFILL = false;".into()].into();
159 conf
160 }
161
162 pub fn for_scale_shared_source() -> Self {
163 let mut conf = Self::for_scale();
164 conf.per_session_queries = vec!["SET STREAMING_USE_SHARED_SOURCE = true;".into()].into();
165 conf
166 }
167
168 pub fn for_auto_parallelism(
169 max_heartbeat_interval_secs: u64,
170 enable_auto_parallelism: bool,
171 ) -> Self {
172 let disable_automatic_parallelism_control = !enable_auto_parallelism;
173
174 let config_path = {
175 let mut file =
176 tempfile::NamedTempFile::new().expect("failed to create temp config file");
177
178 let config_data = format!(
179 r#"[meta]
180max_heartbeat_interval_secs = {max_heartbeat_interval_secs}
181disable_automatic_parallelism_control = {disable_automatic_parallelism_control}
182parallelism_control_trigger_first_delay_sec = 0
183parallelism_control_batch_size = 10
184parallelism_control_trigger_period_sec = 10
185
186[system]
187barrier_interval_ms = 250
188checkpoint_frequency = 4
189
190[server]
191telemetry_enabled = false
192metrics_level = "Disabled"
193"#
194 );
195 file.write_all(config_data.as_bytes())
196 .expect("failed to write config file");
197 file.into_temp_path()
198 };
199
200 Configuration {
201 config_path: ConfigPath::Temp(config_path.into()),
202 frontend_nodes: 1,
203 compute_nodes: 3,
204 meta_nodes: 1,
205 compactor_nodes: 1,
206 compute_node_cores: 2,
207 per_session_queries: vec![
208 "create view if not exists table_parallelism as select t.name, tf.parallelism from rw_tables t, rw_table_fragments tf where t.id = tf.table_id;".into(),
209 "create view if not exists mview_parallelism as select m.name, tf.parallelism from rw_materialized_views m, rw_table_fragments tf where m.id = tf.table_id;".into(),
210 ]
211 .into(),
212 ..Default::default()
213 }
214 }
215
216 pub fn for_default_parallelism(default_parallelism: usize) -> Self {
217 let config_path = {
218 let mut file =
219 tempfile::NamedTempFile::new().expect("failed to create temp config file");
220
221 let config_data = format!(
222 r#"
223[server]
224telemetry_enabled = false
225metrics_level = "Disabled"
226[meta]
227default_parallelism = {default_parallelism}
228"#
229 )
230 .to_owned();
231 file.write_all(config_data.as_bytes())
232 .expect("failed to write config file");
233 file.into_temp_path()
234 };
235
236 Configuration {
237 config_path: ConfigPath::Temp(config_path.into()),
238 frontend_nodes: 1,
239 compute_nodes: 1,
240 meta_nodes: 1,
241 compactor_nodes: 1,
242 compute_node_cores: default_parallelism * 2,
243 per_session_queries: vec![].into(),
244 compute_resource_groups: Default::default(),
245 }
246 }
247
248 pub fn for_backfill() -> Self {
250 let config_path = {
253 let mut file =
254 tempfile::NamedTempFile::new().expect("failed to create temp config file");
255 file.write_all(include_bytes!("backfill.toml"))
256 .expect("failed to write config file");
257 file.into_temp_path()
258 };
259
260 Configuration {
261 config_path: ConfigPath::Temp(config_path.into()),
262 frontend_nodes: 1,
263 compute_nodes: 1,
264 meta_nodes: 1,
265 compactor_nodes: 1,
266 compute_node_cores: 4,
267 ..Default::default()
268 }
269 }
270
271 pub fn for_arrangement_backfill() -> Self {
272 let config_path = {
275 let mut file =
276 tempfile::NamedTempFile::new().expect("failed to create temp config file");
277 file.write_all(include_bytes!("arrangement_backfill.toml"))
278 .expect("failed to write config file");
279 file.into_temp_path()
280 };
281
282 Configuration {
283 config_path: ConfigPath::Temp(config_path.into()),
284 frontend_nodes: 1,
285 compute_nodes: 3,
286 meta_nodes: 1,
287 compactor_nodes: 1,
288 compute_node_cores: 1,
289 per_session_queries: vec!["SET STREAMING_USE_ARRANGEMENT_BACKFILL = true;".into()]
290 .into(),
291 ..Default::default()
292 }
293 }
294
295 pub fn for_background_ddl() -> Self {
296 let config_path = {
299 let mut file =
300 tempfile::NamedTempFile::new().expect("failed to create temp config file");
301 file.write_all(include_bytes!("background_ddl.toml"))
302 .expect("failed to write config file");
303 file.into_temp_path()
304 };
305
306 Configuration {
307 config_path: ConfigPath::Temp(config_path.into()),
308 frontend_nodes: 1,
314 compute_nodes: 3,
315 meta_nodes: 1,
316 compactor_nodes: 2,
317 compute_node_cores: 2,
318 ..Default::default()
319 }
320 }
321
322 pub fn enable_arrangement_backfill() -> Self {
323 let config_path = {
324 let mut file =
325 tempfile::NamedTempFile::new().expect("failed to create temp config file");
326 file.write_all(include_bytes!("disable_arrangement_backfill.toml"))
327 .expect("failed to write config file");
328 file.into_temp_path()
329 };
330 Configuration {
331 config_path: ConfigPath::Temp(config_path.into()),
332 frontend_nodes: 1,
333 compute_nodes: 1,
334 meta_nodes: 1,
335 compactor_nodes: 1,
336 compute_node_cores: 1,
337 per_session_queries: vec![].into(),
338 ..Default::default()
339 }
340 }
341}
342
343pub struct Cluster {
359 config: Configuration,
360 handle: Handle,
361 #[cfg(madsim)]
362 pub(crate) client: NodeHandle,
363 #[cfg(madsim)]
364 pub(crate) ctl: NodeHandle,
365 #[cfg(madsim)]
366 pub(crate) sqlite_file_handle: NamedTempFile,
367}
368
369impl Cluster {
370 #[cfg_or_panic(madsim)]
374 pub async fn start(conf: Configuration) -> Result<Self> {
375 use madsim::net::ipvs::*;
376
377 let handle = madsim::runtime::Handle::current();
378 println!("seed = {}", handle.seed());
379 println!("{:#?}", conf);
380
381 assert_eq!(conf.meta_nodes, 1);
383
384 let net = madsim::net::NetSim::current();
386 for i in 1..=conf.meta_nodes {
387 net.add_dns_record(
388 &format!("meta-{i}"),
389 format!("192.168.1.{i}").parse().unwrap(),
390 );
391 }
392
393 net.add_dns_record("frontend", "192.168.2.0".parse().unwrap());
394 net.add_dns_record("message_queue", "192.168.11.1".parse().unwrap());
395 net.global_ipvs().add_service(
396 ServiceAddr::Tcp("192.168.2.0:4566".into()),
397 Scheduler::RoundRobin,
398 );
399 for i in 1..=conf.frontend_nodes {
400 net.global_ipvs().add_server(
401 ServiceAddr::Tcp("192.168.2.0:4566".into()),
402 &format!("192.168.2.{i}:4566"),
403 )
404 }
405
406 handle
408 .create_node()
409 .name("kafka-broker")
410 .ip("192.168.11.1".parse().unwrap())
411 .init(move || async move {
412 rdkafka::SimBroker::default()
413 .serve("0.0.0.0:29092".parse().unwrap())
414 .await
415 })
416 .build();
417
418 handle
420 .create_node()
421 .name("object_store_sim")
422 .ip("192.168.12.1".parse().unwrap())
423 .init(move || async move {
424 ObjectStoreSimServer::builder()
425 .serve("0.0.0.0:9301".parse().unwrap())
426 .await
427 })
428 .build();
429
430 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
432
433 let mut meta_addrs = vec![];
434 for i in 1..=conf.meta_nodes {
435 meta_addrs.push(format!("http://meta-{i}:5690"));
436 }
437 std::env::set_var("RW_META_ADDR", meta_addrs.join(","));
438
439 let sqlite_file_handle: NamedTempFile = NamedTempFile::new().unwrap();
440 let file_path = sqlite_file_handle.path().display().to_string();
441 tracing::info!(?file_path, "sqlite_file_path");
442 let sql_endpoint = format!("sqlite://{}?mode=rwc", file_path);
443 let backend_args = vec!["--backend", "sql", "--sql-endpoint", &sql_endpoint];
444
445 for i in 1..=conf.meta_nodes {
447 let args = [
448 "meta-node",
449 "--config-path",
450 conf.config_path.as_str(),
451 "--listen-addr",
452 "0.0.0.0:5690",
453 "--advertise-addr",
454 &format!("meta-{i}:5690"),
455 "--state-store",
456 "hummock+sim://hummockadmin:hummockadmin@192.168.12.1:9301/hummock001",
457 "--data-directory",
458 "hummock_001",
459 "--temp-secret-file-dir",
460 &format!("./secrets/meta-{i}"),
461 ];
462 let args = args.into_iter().chain(backend_args.clone().into_iter());
463 let opts = risingwave_meta_node::MetaNodeOpts::parse_from(args);
464 handle
465 .create_node()
466 .name(format!("meta-{i}"))
467 .ip([192, 168, 1, i as u8].into())
468 .init(move || {
469 risingwave_meta_node::start(
470 opts.clone(),
471 CancellationToken::new(), )
473 })
474 .build();
475 }
476
477 tokio::time::sleep(std::time::Duration::from_secs(15)).await;
479
480 for i in 1..=conf.frontend_nodes {
482 let opts = risingwave_frontend::FrontendOpts::parse_from([
483 "frontend-node",
484 "--config-path",
485 conf.config_path.as_str(),
486 "--listen-addr",
487 "0.0.0.0:4566",
488 "--advertise-addr",
489 &format!("192.168.2.{i}:4566"),
490 "--temp-secret-file-dir",
491 &format!("./secrets/frontend-{i}"),
492 ]);
493 handle
494 .create_node()
495 .name(format!("frontend-{i}"))
496 .ip([192, 168, 2, i as u8].into())
497 .init(move || {
498 risingwave_frontend::start(
499 opts.clone(),
500 CancellationToken::new(), )
502 })
503 .build();
504 }
505
506 for i in 1..=conf.compute_nodes {
508 let opts = risingwave_compute::ComputeNodeOpts::parse_from([
509 "compute-node",
510 "--config-path",
511 conf.config_path.as_str(),
512 "--listen-addr",
513 "0.0.0.0:5688",
514 "--advertise-addr",
515 &format!("192.168.3.{i}:5688"),
516 "--total-memory-bytes",
517 "6979321856",
518 "--parallelism",
519 &conf.compute_node_cores.to_string(),
520 "--temp-secret-file-dir",
521 &format!("./secrets/compute-{i}"),
522 "--resource-group",
523 &conf
524 .compute_resource_groups
525 .get(&i)
526 .cloned()
527 .unwrap_or(DEFAULT_RESOURCE_GROUP.to_string()),
528 ]);
529 handle
530 .create_node()
531 .name(format!("compute-{i}"))
532 .ip([192, 168, 3, i as u8].into())
533 .cores(conf.compute_node_cores)
534 .init(move || {
535 risingwave_compute::start(
536 opts.clone(),
537 CancellationToken::new(), )
539 })
540 .build();
541 }
542
543 for i in 1..=conf.compactor_nodes {
545 let opts = risingwave_compactor::CompactorOpts::parse_from([
546 "compactor-node",
547 "--config-path",
548 conf.config_path.as_str(),
549 "--listen-addr",
550 "0.0.0.0:6660",
551 "--advertise-addr",
552 &format!("192.168.4.{i}:6660"),
553 ]);
554 handle
555 .create_node()
556 .name(format!("compactor-{i}"))
557 .ip([192, 168, 4, i as u8].into())
558 .init(move || {
559 risingwave_compactor::start(
560 opts.clone(),
561 CancellationToken::new(), )
563 })
564 .build();
565 }
566
567 tokio::time::sleep(Duration::from_secs(15)).await;
569
570 let client = handle
572 .create_node()
573 .name("client")
574 .ip([192, 168, 100, 1].into())
575 .build();
576
577 let ctl = handle
579 .create_node()
580 .name("ctl")
581 .ip([192, 168, 101, 1].into())
582 .build();
583
584 Ok(Self {
585 config: conf,
586 handle,
587 client,
588 ctl,
589 sqlite_file_handle,
590 })
591 }
592
593 #[cfg_or_panic(madsim)]
594 fn per_session_queries(&self) -> Arc<Vec<String>> {
595 self.config.per_session_queries.clone()
596 }
597
598 #[cfg_or_panic(madsim)]
600 pub fn start_session(&mut self) -> Session {
601 let (query_tx, mut query_rx) = mpsc::channel::<SessionRequest>(0);
602 let per_session_queries = self.per_session_queries();
603
604 self.client.spawn(async move {
605 let mut client = RisingWave::connect("frontend".into(), "dev".into()).await?;
606
607 for sql in per_session_queries.as_ref() {
608 client.run(sql).await?;
609 }
610 drop(per_session_queries);
611
612 while let Some((sql, tx)) = query_rx.next().await {
613 let result = client
614 .run(&sql)
615 .await
616 .map(|output| match output {
617 sqllogictest::DBOutput::Rows { rows, .. } => rows
618 .into_iter()
619 .map(|row| {
620 row.into_iter()
621 .map(|v| v.to_string())
622 .collect::<Vec<_>>()
623 .join(" ")
624 })
625 .collect::<Vec<_>>()
626 .join("\n"),
627 _ => "".to_string(),
628 })
629 .map_err(Into::into);
630
631 let _ = tx.send(result);
632 }
633
634 Ok::<_, anyhow::Error>(())
635 });
636
637 Session { query_tx }
638 }
639
640 pub async fn run(&mut self, sql: impl Into<String>) -> Result<String> {
645 self.start_session().run(sql).await
646 }
647
648 #[cfg_or_panic(madsim)]
650 pub async fn run_on_client<F>(&self, future: F) -> F::Output
651 where
652 F: Future + Send + 'static,
653 F::Output: Send + 'static,
654 {
655 self.client.spawn(future).await.unwrap()
656 }
657
658 pub async fn get_random_worker_nodes(&self, n: usize) -> Result<Vec<WorkerNode>> {
659 let worker_nodes = self.get_cluster_info().await?.get_worker_nodes().clone();
660 if worker_nodes.len() < n {
661 return Err(anyhow!("cannot remove more nodes than present"));
662 }
663 let rand_nodes = worker_nodes
664 .iter()
665 .choose_multiple(&mut rand::rng(), n)
666 .to_vec();
667 Ok(rand_nodes.iter().cloned().cloned().collect_vec())
668 }
669
670 pub async fn wait_until(
672 &mut self,
673 sql: impl Into<String> + Clone,
674 mut p: impl FnMut(&str) -> bool,
675 interval: Duration,
676 timeout: Duration,
677 ) -> Result<String> {
678 let fut = async move {
679 let mut interval = tokio::time::interval(interval);
680 loop {
681 interval.tick().await;
682 let result = self.run(sql.clone()).await?;
683 if p(&result) {
684 return Ok::<_, anyhow::Error>(result);
685 }
686 }
687 };
688
689 match tokio::time::timeout(timeout, fut).await {
690 Ok(r) => Ok(r?),
691 Err(_) => bail!("wait_until timeout"),
692 }
693 }
694
695 pub async fn wait_until_non_empty(
697 &mut self,
698 sql: &str,
699 interval: Duration,
700 timeout: Duration,
701 ) -> Result<String> {
702 self.wait_until(sql, |r| !r.trim().is_empty(), interval, timeout)
703 .await
704 }
705
706 pub async fn kill_node(&self, opts: &KillOpts) {
709 let mut nodes = vec![];
710 if opts.kill_meta {
711 let rand = rand::rng().random_range(0..3);
712 for i in 1..=self.config.meta_nodes {
713 match rand {
714 0 => break, 1 => {} _ if !rand::rng().random_bool(0.5) => continue, _ => {}
718 }
719 nodes.push(format!("meta-{}", i));
720 }
721 if nodes.len() == self.config.meta_nodes {
723 nodes.truncate(1);
724 }
725 }
726 if opts.kill_frontend {
727 let rand = rand::rng().random_range(0..3);
728 for i in 1..=self.config.frontend_nodes {
729 match rand {
730 0 => break, 1 => {} _ if !rand::rng().random_bool(0.5) => continue, _ => {}
734 }
735 nodes.push(format!("frontend-{}", i));
736 }
737 }
738 if opts.kill_compute {
739 let rand = rand::rng().random_range(0..3);
740 for i in 1..=self.config.compute_nodes {
741 match rand {
742 0 => break, 1 => {} _ if !rand::rng().random_bool(0.5) => continue, _ => {}
746 }
747 nodes.push(format!("compute-{}", i));
748 }
749 }
750 if opts.kill_compactor {
751 let rand = rand::rng().random_range(0..3);
752 for i in 1..=self.config.compactor_nodes {
753 match rand {
754 0 => break, 1 => {} _ if !rand::rng().random_bool(0.5) => continue, _ => {}
758 }
759 nodes.push(format!("compactor-{}", i));
760 }
761 }
762
763 self.kill_nodes(nodes, opts.restart_delay_secs).await
764 }
765
766 #[cfg_or_panic(madsim)]
769 pub async fn kill_nodes(
770 &self,
771 nodes: impl IntoIterator<Item = impl AsRef<str>>,
772 restart_delay_secs: u32,
773 ) {
774 join_all(nodes.into_iter().map(|name| async move {
775 let name = name.as_ref();
776 let t = rand::rng().random_range(Duration::from_secs(0)..Duration::from_secs(1));
777 tokio::time::sleep(t).await;
778 tracing::info!("kill {name}");
779 Handle::current().kill(name);
780
781 let mut t = rand::rng().random_range(Duration::from_secs(0)..Duration::from_secs(1));
782 if rand::rng().random_bool(0.1) {
785 t += Duration::from_secs(restart_delay_secs as u64);
787 }
788 tokio::time::sleep(t).await;
789 tracing::info!("restart {name}");
790 Handle::current().restart(name);
791 }))
792 .await;
793 }
794
795 #[cfg_or_panic(madsim)]
796 pub async fn kill_nodes_and_restart(
797 &self,
798 nodes: impl IntoIterator<Item = impl AsRef<str>>,
799 restart_delay_secs: u32,
800 ) {
801 join_all(nodes.into_iter().map(|name| async move {
802 let name = name.as_ref();
803 tracing::info!("kill {name}");
804 Handle::current().kill(name);
805 tokio::time::sleep(Duration::from_secs(restart_delay_secs as u64)).await;
806 tracing::info!("restart {name}");
807 Handle::current().restart(name);
808 }))
809 .await;
810 }
811
812 #[cfg_or_panic(madsim)]
813 pub async fn simple_kill_nodes(&self, nodes: impl IntoIterator<Item = impl AsRef<str>>) {
814 join_all(nodes.into_iter().map(|name| async move {
815 let name = name.as_ref();
816 tracing::info!("kill {name}");
817 Handle::current().kill(name);
818 }))
819 .await;
820 }
821
822 #[cfg_or_panic(madsim)]
823 pub async fn simple_restart_nodes(&self, nodes: impl IntoIterator<Item = impl AsRef<str>>) {
824 join_all(nodes.into_iter().map(|name| async move {
825 let name = name.as_ref();
826 tracing::info!("restart {name}");
827 Handle::current().restart(name);
828 }))
829 .await;
830 }
831
832 #[cfg_or_panic(madsim)]
834 pub async fn create_kafka_producer(&self, datadir: &str) {
835 self.handle
836 .create_node()
837 .name("kafka-producer")
838 .ip("192.168.11.2".parse().unwrap())
839 .build()
840 .spawn(crate::kafka::producer(
841 "192.168.11.1:29092",
842 datadir.to_string(),
843 ))
844 .await
845 .unwrap();
846 }
847
848 #[cfg_or_panic(madsim)]
850 pub fn create_kafka_topics(&self, topics: HashMap<String, i32>) {
851 self.handle
852 .create_node()
853 .name("kafka-topic-create")
854 .ip("192.168.11.3".parse().unwrap())
855 .build()
856 .spawn(crate::kafka::create_topics("192.168.11.1:29092", topics));
857 }
858
859 pub fn config(&self) -> Configuration {
860 self.config.clone()
861 }
862
863 pub fn handle(&self) -> &Handle {
864 &self.handle
865 }
866
867 #[cfg_or_panic(madsim)]
869 pub async fn graceful_shutdown(&self) {
870 let mut nodes = vec![];
871 let mut metas = vec![];
872 for i in 1..=self.config.meta_nodes {
873 metas.push(format!("meta-{i}"));
874 }
875 for i in 1..=self.config.frontend_nodes {
876 nodes.push(format!("frontend-{i}"));
877 }
878 for i in 1..=self.config.compute_nodes {
879 nodes.push(format!("compute-{i}"));
880 }
881 for i in 1..=self.config.compactor_nodes {
882 nodes.push(format!("compactor-{i}"));
883 }
884
885 tracing::info!("graceful shutdown");
886 let waiting_time = Duration::from_secs(10);
887 for node in &nodes {
889 self.handle.send_ctrl_c(node);
890 }
891 tokio::time::sleep(waiting_time).await;
892 for meta in &metas {
894 self.handle.send_ctrl_c(meta);
895 }
896 tokio::time::sleep(waiting_time).await;
897
898 for node in nodes.iter().chain(metas.iter()) {
900 if !self.handle.is_exit(node) {
901 panic!("failed to graceful shutdown {node} in {waiting_time:?}");
902 }
903 }
904 }
905}
906
907type SessionRequest = (
908 String, oneshot::Sender<Result<String>>, );
911
912#[derive(Debug, Clone)]
914pub struct Session {
915 query_tx: mpsc::Sender<SessionRequest>,
916}
917
918impl Session {
919 pub async fn run(&mut self, sql: impl Into<String>) -> Result<String> {
921 let (tx, rx) = oneshot::channel();
922 self.query_tx.send((sql.into(), tx)).await?;
923 rx.await?
924 }
925
926 pub async fn flush(&mut self) -> Result<()> {
928 self.run("FLUSH").await?;
929 Ok(())
930 }
931
932 pub async fn is_arrangement_backfill_enabled(&mut self) -> Result<bool> {
933 let result = self.run("show streaming_use_arrangement_backfill").await?;
934 Ok(result == "true")
935 }
936}
937
938#[derive(Debug, Clone, Copy, PartialEq)]
940pub struct KillOpts {
941 pub kill_rate: f32,
942 pub kill_meta: bool,
943 pub kill_frontend: bool,
944 pub kill_compute: bool,
945 pub kill_compactor: bool,
946 pub restart_delay_secs: u32,
947}
948
949impl KillOpts {
950 pub const ALL: Self = KillOpts {
952 kill_rate: 1.0,
953 kill_meta: false, kill_frontend: true,
955 kill_compute: true,
956 kill_compactor: true,
957 restart_delay_secs: 20,
958 };
959 pub const ALL_FAST: Self = KillOpts {
960 kill_rate: 1.0,
961 kill_meta: false, kill_frontend: true,
963 kill_compute: true,
964 kill_compactor: true,
965 restart_delay_secs: 2,
966 };
967}