1#![cfg_attr(not(madsim), allow(unused_imports))]
16
17use std::cmp::max;
18use std::collections::HashMap;
19use std::future::Future;
20use std::io::Write;
21use std::path::PathBuf;
22use std::sync::Arc;
23use std::time::Duration;
24
25use anyhow::{Result, anyhow, bail};
26use cfg_or_panic::cfg_or_panic;
27use clap::Parser;
28use futures::channel::{mpsc, oneshot};
29use futures::future::join_all;
30use futures::{SinkExt, StreamExt};
31use itertools::Itertools;
32#[cfg(madsim)]
33use madsim::runtime::{Handle, NodeHandle};
34use rand::Rng;
35use rand::seq::IteratorRandom;
36use risingwave_common::util::tokio_util::sync::CancellationToken;
37use risingwave_common::util::worker_util::DEFAULT_RESOURCE_GROUP;
38#[cfg(madsim)]
39use risingwave_object_store::object::sim::SimServer as ObjectStoreSimServer;
40use risingwave_pb::common::WorkerNode;
41use sqllogictest::AsyncDB;
42use tempfile::NamedTempFile;
43#[cfg(not(madsim))]
44use tokio::runtime::Handle;
45use uuid::Uuid;
46
47use crate::client::RisingWave;
48
49#[derive(Clone, Debug)]
51pub enum ConfigPath {
52 Regular(String),
54 Temp(Arc<tempfile::TempPath>),
56}
57
58impl ConfigPath {
59 pub fn as_str(&self) -> &str {
60 match self {
61 ConfigPath::Regular(s) => s,
62 ConfigPath::Temp(p) => p.as_os_str().to_str().unwrap(),
63 }
64 }
65}
66
67#[derive(Debug, Clone)]
69pub struct Configuration {
70 pub config_path: ConfigPath,
74
75 pub frontend_nodes: usize,
77
78 pub compute_nodes: usize,
80
81 pub meta_nodes: usize,
83
84 pub compactor_nodes: usize,
86
87 pub compute_node_cores: usize,
91
92 pub per_session_queries: Arc<Vec<String>>,
94
95 pub compute_resource_groups: HashMap<usize, String>,
97}
98
99impl Default for Configuration {
100 fn default() -> Self {
101 let config_path = {
102 let mut file =
103 tempfile::NamedTempFile::new().expect("failed to create temp config file");
104
105 let config_data = r#"
106[server]
107telemetry_enabled = false
108metrics_level = "Disabled"
109"#
110 .to_owned();
111 file.write_all(config_data.as_bytes())
112 .expect("failed to write config file");
113 file.into_temp_path()
114 };
115
116 Configuration {
117 config_path: ConfigPath::Temp(config_path.into()),
118 frontend_nodes: 1,
119 compute_nodes: 1,
120 meta_nodes: 1,
121 compactor_nodes: 1,
122 compute_node_cores: 1,
123 per_session_queries: vec![].into(),
124 compute_resource_groups: Default::default(),
125 }
126 }
127}
128
129impl Configuration {
130 pub fn for_scale() -> Self {
132 let config_path = {
135 let mut file =
136 tempfile::NamedTempFile::new().expect("failed to create temp config file");
137 file.write_all(include_bytes!("risingwave-scale.toml"))
138 .expect("failed to write config file");
139 file.into_temp_path()
140 };
141
142 Configuration {
143 config_path: ConfigPath::Temp(config_path.into()),
144 frontend_nodes: 2,
145 compute_nodes: 3,
146 meta_nodes: 1,
147 compactor_nodes: 2,
148 compute_node_cores: 2,
149 per_session_queries: vec![
150 "set streaming_parallelism_strategy_for_table = 'DEFAULT'".into(),
151 "set streaming_parallelism_strategy_for_source = 'DEFAULT'".into(),
152 "alter system set adaptive_parallelism_strategy to AUTO".into(),
153 ]
154 .into(),
155 ..Default::default()
156 }
157 }
158
159 pub fn for_scale_no_shuffle() -> Self {
162 let mut conf = Self::for_scale();
163 conf.per_session_queries = vec![
164 "set streaming_parallelism_strategy_for_table = 'DEFAULT'".into(),
165 "set streaming_parallelism_strategy_for_source = 'DEFAULT'".into(),
166 "alter system set adaptive_parallelism_strategy to AUTO".into(),
167 "SET STREAMING_USE_ARRANGEMENT_BACKFILL = false;".into(),
168 "SET STREAMING_USE_SNAPSHOT_BACKFILL = false;".into(),
169 ]
170 .into();
171 conf
172 }
173
174 pub fn for_scale_shared_source() -> Self {
175 let mut conf = Self::for_scale();
176 conf.per_session_queries = vec![
177 "set streaming_parallelism_strategy_for_table = 'DEFAULT'".into(),
178 "set streaming_parallelism_strategy_for_source = 'DEFAULT'".into(),
179 "alter system set adaptive_parallelism_strategy to AUTO".into(),
180 "SET STREAMING_USE_SHARED_SOURCE = true;".into(),
181 ]
182 .into();
183 conf
184 }
185
186 pub fn for_auto_parallelism(
187 max_heartbeat_interval_secs: u64,
188 enable_auto_parallelism: bool,
189 ) -> Self {
190 let disable_automatic_parallelism_control = !enable_auto_parallelism;
191
192 let config_path = {
193 let mut file =
194 tempfile::NamedTempFile::new().expect("failed to create temp config file");
195
196 let config_data = format!(
197 r#"[meta]
198max_heartbeat_interval_secs = {max_heartbeat_interval_secs}
199disable_automatic_parallelism_control = {disable_automatic_parallelism_control}
200parallelism_control_trigger_first_delay_sec = 0
201parallelism_control_batch_size = 10
202parallelism_control_trigger_period_sec = 10
203
204[system]
205barrier_interval_ms = 250
206checkpoint_frequency = 4
207
208[server]
209telemetry_enabled = false
210metrics_level = "Disabled"
211"#
212 );
213 file.write_all(config_data.as_bytes())
214 .expect("failed to write config file");
215 file.into_temp_path()
216 };
217
218 Configuration {
219 config_path: ConfigPath::Temp(config_path.into()),
220 frontend_nodes: 1,
221 compute_nodes: 3,
222 meta_nodes: 1,
223 compactor_nodes: 1,
224 compute_node_cores: 2,
225 per_session_queries: vec![
226 "set streaming_parallelism_strategy_for_table = 'DEFAULT'".into(),
227 "set streaming_parallelism_strategy_for_source = 'DEFAULT'".into(),
228 "alter system set adaptive_parallelism_strategy to AUTO".into(),
229 "create view if not exists table_parallelism as select t.name, tf.parallelism from rw_tables t, rw_table_fragments tf where t.id = tf.table_id;".into(),
230 "create view if not exists mview_parallelism as select m.name, tf.parallelism from rw_materialized_views m, rw_table_fragments tf where m.id = tf.table_id;".into(),
231 ]
232 .into(),
233 ..Default::default()
234 }
235 }
236
237 pub fn for_default_parallelism(default_parallelism: usize) -> Self {
238 let config_path = {
239 let mut file =
240 tempfile::NamedTempFile::new().expect("failed to create temp config file");
241
242 let config_data = format!(
243 r#"
244[server]
245telemetry_enabled = false
246metrics_level = "Disabled"
247[meta]
248default_parallelism = {default_parallelism}
249"#
250 );
251 file.write_all(config_data.as_bytes())
252 .expect("failed to write config file");
253 file.into_temp_path()
254 };
255
256 Configuration {
257 config_path: ConfigPath::Temp(config_path.into()),
258 frontend_nodes: 1,
259 compute_nodes: 1,
260 meta_nodes: 1,
261 compactor_nodes: 1,
262 compute_node_cores: default_parallelism * 2,
263 per_session_queries: vec![].into(),
264 compute_resource_groups: Default::default(),
265 }
266 }
267
268 pub fn for_backfill() -> Self {
270 let config_path = {
273 let mut file =
274 tempfile::NamedTempFile::new().expect("failed to create temp config file");
275 file.write_all(include_bytes!("backfill.toml"))
276 .expect("failed to write config file");
277 file.into_temp_path()
278 };
279
280 Configuration {
281 config_path: ConfigPath::Temp(config_path.into()),
282 frontend_nodes: 1,
283 compute_nodes: 1,
284 meta_nodes: 1,
285 compactor_nodes: 1,
286 compute_node_cores: 4,
287 ..Default::default()
288 }
289 }
290
291 pub fn for_arrangement_backfill() -> Self {
292 let config_path = {
295 let mut file =
296 tempfile::NamedTempFile::new().expect("failed to create temp config file");
297 file.write_all(include_bytes!("arrangement_backfill.toml"))
298 .expect("failed to write config file");
299 file.into_temp_path()
300 };
301
302 Configuration {
303 config_path: ConfigPath::Temp(config_path.into()),
304 frontend_nodes: 1,
305 compute_nodes: 3,
306 meta_nodes: 1,
307 compactor_nodes: 1,
308 compute_node_cores: 1,
309 per_session_queries: vec![
310 "SET STREAMING_USE_ARRANGEMENT_BACKFILL = true;".into(),
311 "SET STREAMING_USE_SNAPSHOT_BACKFILL = false;".into(),
312 ]
313 .into(),
314 ..Default::default()
315 }
316 }
317
318 pub fn for_background_ddl() -> Self {
319 let config_path = {
322 let mut file =
323 tempfile::NamedTempFile::new().expect("failed to create temp config file");
324 file.write_all(include_bytes!("background_ddl.toml"))
325 .expect("failed to write config file");
326 file.into_temp_path()
327 };
328
329 Configuration {
330 config_path: ConfigPath::Temp(config_path.into()),
331 frontend_nodes: 1,
337 compute_nodes: 3,
338 meta_nodes: 1,
339 compactor_nodes: 2,
340 compute_node_cores: 2,
341 ..Default::default()
342 }
343 }
344
345 pub fn enable_arrangement_backfill() -> Self {
346 let config_path = {
347 let mut file =
348 tempfile::NamedTempFile::new().expect("failed to create temp config file");
349 file.write_all(include_bytes!("disable_arrangement_backfill.toml"))
350 .expect("failed to write config file");
351 file.into_temp_path()
352 };
353 Configuration {
354 config_path: ConfigPath::Temp(config_path.into()),
355 frontend_nodes: 1,
356 compute_nodes: 1,
357 meta_nodes: 1,
358 compactor_nodes: 1,
359 compute_node_cores: 1,
360 per_session_queries: vec![].into(),
361 ..Default::default()
362 }
363 }
364
365 pub fn total_streaming_cores(&self) -> u32 {
367 (self.compute_nodes * self.compute_node_cores) as u32
368 }
369}
370
371pub struct Cluster {
387 config: Configuration,
388 handle: Handle,
389 #[cfg(madsim)]
390 pub(crate) client: NodeHandle,
391 #[cfg(madsim)]
392 pub(crate) ctl: NodeHandle,
393 #[cfg(madsim)]
394 pub(crate) sqlite_file_handle: NamedTempFile,
395}
396
397impl Cluster {
398 #[cfg_or_panic(madsim)]
402 pub async fn start(conf: Configuration) -> Result<Self> {
403 use madsim::net::ipvs::*;
404
405 let handle = madsim::runtime::Handle::current();
406 println!("seed = {}", handle.seed());
407 println!("{:#?}", conf);
408
409 assert_eq!(conf.meta_nodes, 1);
411
412 let net = madsim::net::NetSim::current();
414 for i in 1..=conf.meta_nodes {
415 net.add_dns_record(
416 &format!("meta-{i}"),
417 format!("192.168.1.{i}").parse().unwrap(),
418 );
419 }
420
421 net.add_dns_record("frontend", "192.168.2.0".parse().unwrap());
422 net.add_dns_record("message_queue", "192.168.11.1".parse().unwrap());
423 net.global_ipvs().add_service(
424 ServiceAddr::Tcp("192.168.2.0:4566".into()),
425 Scheduler::RoundRobin,
426 );
427 for i in 1..=conf.frontend_nodes {
428 net.global_ipvs().add_server(
429 ServiceAddr::Tcp("192.168.2.0:4566".into()),
430 &format!("192.168.2.{i}:4566"),
431 )
432 }
433
434 handle
436 .create_node()
437 .name("kafka-broker")
438 .ip("192.168.11.1".parse().unwrap())
439 .init(move || async move {
440 rdkafka::SimBroker::default()
441 .serve("0.0.0.0:29092".parse().unwrap())
442 .await
443 })
444 .build();
445
446 handle
448 .create_node()
449 .name("object_store_sim")
450 .ip("192.168.12.1".parse().unwrap())
451 .init(move || async move {
452 ObjectStoreSimServer::builder()
453 .serve("0.0.0.0:9301".parse().unwrap())
454 .await
455 })
456 .build();
457
458 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
460
461 let mut meta_addrs = vec![];
462 for i in 1..=conf.meta_nodes {
463 meta_addrs.push(format!("http://meta-{i}:5690"));
464 }
465 unsafe { std::env::set_var("RW_META_ADDR", meta_addrs.join(",")) };
466
467 let sqlite_file_handle: NamedTempFile = NamedTempFile::new().unwrap();
468 let file_path = sqlite_file_handle.path().display().to_string();
469 tracing::info!(?file_path, "sqlite_file_path");
470 let sql_endpoint = format!("sqlite://{}?mode=rwc", file_path);
471 let backend_args = vec!["--backend", "sql", "--sql-endpoint", &sql_endpoint];
472
473 for i in 1..=conf.meta_nodes {
475 let args = [
476 "meta-node",
477 "--config-path",
478 conf.config_path.as_str(),
479 "--listen-addr",
480 "0.0.0.0:5690",
481 "--advertise-addr",
482 &format!("meta-{i}:5690"),
483 "--state-store",
484 "hummock+sim://hummockadmin:hummockadmin@192.168.12.1:9301/hummock001",
485 "--data-directory",
486 "hummock_001",
487 "--temp-secret-file-dir",
488 &format!("./secrets/meta-{i}"),
489 ];
490 let args = args.into_iter().chain(backend_args.clone().into_iter());
491 let opts = risingwave_meta_node::MetaNodeOpts::parse_from(args);
492 handle
493 .create_node()
494 .name(format!("meta-{i}"))
495 .ip([192, 168, 1, i as u8].into())
496 .init(move || {
497 risingwave_meta_node::start(
498 opts.clone(),
499 CancellationToken::new(), )
501 })
502 .build();
503 }
504
505 tokio::time::sleep(std::time::Duration::from_secs(15)).await;
507
508 for i in 1..=conf.frontend_nodes {
510 let opts = risingwave_frontend::FrontendOpts::parse_from([
511 "frontend-node",
512 "--config-path",
513 conf.config_path.as_str(),
514 "--listen-addr",
515 "0.0.0.0:4566",
516 "--health-check-listener-addr",
517 "0.0.0.0:6786",
518 "--advertise-addr",
519 &format!("192.168.2.{i}:4566"),
520 "--temp-secret-file-dir",
521 &format!("./secrets/frontend-{i}"),
522 ]);
523 handle
524 .create_node()
525 .name(format!("frontend-{i}"))
526 .ip([192, 168, 2, i as u8].into())
527 .init(move || {
528 risingwave_frontend::start(
529 opts.clone(),
530 CancellationToken::new(), )
532 })
533 .build();
534 }
535
536 for i in 1..=conf.compute_nodes {
538 let opts = risingwave_compute::ComputeNodeOpts::parse_from([
539 "compute-node",
540 "--config-path",
541 conf.config_path.as_str(),
542 "--listen-addr",
543 "0.0.0.0:5688",
544 "--advertise-addr",
545 &format!("192.168.3.{i}:5688"),
546 "--total-memory-bytes",
547 "6979321856",
548 "--parallelism",
549 &conf.compute_node_cores.to_string(),
550 "--temp-secret-file-dir",
551 &format!("./secrets/compute-{i}"),
552 "--resource-group",
553 &conf
554 .compute_resource_groups
555 .get(&i)
556 .cloned()
557 .unwrap_or(DEFAULT_RESOURCE_GROUP.to_string()),
558 ]);
559 handle
560 .create_node()
561 .name(format!("compute-{i}"))
562 .ip([192, 168, 3, i as u8].into())
563 .cores(conf.compute_node_cores)
564 .init(move || {
565 risingwave_compute::start(
566 opts.clone(),
567 CancellationToken::new(), )
569 })
570 .build();
571 }
572
573 for i in 1..=conf.compactor_nodes {
575 let opts = risingwave_compactor::CompactorOpts::parse_from([
576 "compactor-node",
577 "--config-path",
578 conf.config_path.as_str(),
579 "--listen-addr",
580 "0.0.0.0:6660",
581 "--advertise-addr",
582 &format!("192.168.4.{i}:6660"),
583 ]);
584 handle
585 .create_node()
586 .name(format!("compactor-{i}"))
587 .ip([192, 168, 4, i as u8].into())
588 .init(move || {
589 risingwave_compactor::start(
590 opts.clone(),
591 CancellationToken::new(), )
593 })
594 .build();
595 }
596
597 tokio::time::sleep(Duration::from_secs(15)).await;
599
600 let client = handle
602 .create_node()
603 .name("client")
604 .ip([192, 168, 100, 1].into())
605 .build();
606
607 let ctl = handle
609 .create_node()
610 .name("ctl")
611 .ip([192, 168, 101, 1].into())
612 .build();
613
614 Ok(Self {
615 config: conf,
616 handle,
617 client,
618 ctl,
619 sqlite_file_handle,
620 })
621 }
622
623 #[cfg_or_panic(madsim)]
624 fn per_session_queries(&self) -> Arc<Vec<String>> {
625 self.config.per_session_queries.clone()
626 }
627
628 #[cfg_or_panic(madsim)]
630 pub fn start_session(&mut self) -> Session {
631 let (query_tx, mut query_rx) = mpsc::channel::<SessionRequest>(0);
632 let per_session_queries = self.per_session_queries();
633
634 self.client.spawn(async move {
635 let mut client = RisingWave::connect("frontend".into(), "dev".into()).await?;
636
637 for sql in per_session_queries.as_ref() {
638 client.run(sql).await?;
639 }
640 drop(per_session_queries);
641
642 while let Some((sql, tx)) = query_rx.next().await {
643 let result = client
644 .run(&sql)
645 .await
646 .map(|output| match output {
647 sqllogictest::DBOutput::Rows { rows, .. } => rows
648 .into_iter()
649 .map(|row| {
650 row.into_iter()
651 .map(|v| v.to_string())
652 .collect::<Vec<_>>()
653 .join(" ")
654 })
655 .collect::<Vec<_>>()
656 .join("\n"),
657 _ => "".to_string(),
658 })
659 .map_err(Into::into);
660
661 let _ = tx.send(result);
662 }
663
664 Ok::<_, anyhow::Error>(())
665 });
666
667 Session { query_tx }
668 }
669
670 pub async fn run(&mut self, sql: impl Into<String>) -> Result<String> {
675 self.start_session().run(sql).await
676 }
677
678 #[cfg_or_panic(madsim)]
680 pub async fn run_on_client<F>(&self, future: F) -> F::Output
681 where
682 F: Future + Send + 'static,
683 F::Output: Send + 'static,
684 {
685 self.client.spawn(future).await.unwrap()
686 }
687
688 pub async fn get_random_worker_nodes(&self, n: usize) -> Result<Vec<WorkerNode>> {
689 let worker_nodes = self.get_cluster_info().await?.get_worker_nodes().clone();
690 if worker_nodes.len() < n {
691 return Err(anyhow!("cannot remove more nodes than present"));
692 }
693 let rand_nodes = worker_nodes
694 .iter()
695 .choose_multiple(&mut rand::rng(), n)
696 .clone();
697 Ok(rand_nodes.iter().cloned().cloned().collect_vec())
698 }
699
700 pub async fn wait_until(
702 &mut self,
703 sql: impl Into<String> + Clone,
704 mut p: impl FnMut(&str) -> bool,
705 interval: Duration,
706 timeout: Duration,
707 ) -> Result<String> {
708 let fut = async move {
709 let mut interval = tokio::time::interval(interval);
710 loop {
711 interval.tick().await;
712 let result = self.run(sql.clone()).await?;
713 if p(&result) {
714 return Ok::<_, anyhow::Error>(result);
715 }
716 }
717 };
718
719 match tokio::time::timeout(timeout, fut).await {
720 Ok(r) => Ok(r?),
721 Err(_) => bail!("wait_until timeout"),
722 }
723 }
724
725 pub async fn wait_until_non_empty(
727 &mut self,
728 sql: &str,
729 interval: Duration,
730 timeout: Duration,
731 ) -> Result<String> {
732 self.wait_until(sql, |r| !r.trim().is_empty(), interval, timeout)
733 .await
734 }
735
736 pub async fn kill_node(&self, opts: &KillOpts) {
739 let mut nodes = vec![];
740 if opts.kill_meta {
741 let rand = rand::rng().random_range(0..3);
742 for i in 1..=self.config.meta_nodes {
743 match rand {
744 0 => break, 1 => {} _ if !rand::rng().random_bool(0.5) => continue, _ => {}
748 }
749 nodes.push(format!("meta-{}", i));
750 }
751 if nodes.len() == self.config.meta_nodes {
753 nodes.truncate(1);
754 }
755 }
756 if opts.kill_frontend {
757 let rand = rand::rng().random_range(0..3);
758 for i in 1..=self.config.frontend_nodes {
759 match rand {
760 0 => break, 1 => {} _ if !rand::rng().random_bool(0.5) => continue, _ => {}
764 }
765 nodes.push(format!("frontend-{}", i));
766 }
767 }
768 if opts.kill_compute {
769 let rand = rand::rng().random_range(0..3);
770 for i in 1..=self.config.compute_nodes {
771 match rand {
772 0 => break, 1 => {} _ if !rand::rng().random_bool(0.5) => continue, _ => {}
776 }
777 nodes.push(format!("compute-{}", i));
778 }
779 }
780 if opts.kill_compactor {
781 let rand = rand::rng().random_range(0..3);
782 for i in 1..=self.config.compactor_nodes {
783 match rand {
784 0 => break, 1 => {} _ if !rand::rng().random_bool(0.5) => continue, _ => {}
788 }
789 nodes.push(format!("compactor-{}", i));
790 }
791 }
792
793 self.kill_nodes(nodes, opts.restart_delay_secs).await
794 }
795
796 #[cfg_or_panic(madsim)]
799 pub async fn kill_nodes(
800 &self,
801 nodes: impl IntoIterator<Item = impl AsRef<str>>,
802 restart_delay_secs: u32,
803 ) {
804 join_all(nodes.into_iter().map(|name| async move {
805 let name = name.as_ref();
806 let t = rand::rng().random_range(Duration::from_secs(0)..Duration::from_secs(1));
807 tokio::time::sleep(t).await;
808 tracing::info!("kill {name}");
809 Handle::current().kill(name);
810
811 let mut t = rand::rng().random_range(Duration::from_secs(0)..Duration::from_secs(1));
812 if rand::rng().random_bool(0.1) {
815 t += Duration::from_secs(restart_delay_secs as u64);
817 }
818 tokio::time::sleep(t).await;
819 tracing::info!("restart {name}");
820 Handle::current().restart(name);
821 }))
822 .await;
823 }
824
825 #[cfg_or_panic(madsim)]
826 pub async fn kill_nodes_and_restart(
827 &self,
828 nodes: impl IntoIterator<Item = impl AsRef<str>>,
829 restart_delay_secs: u32,
830 ) {
831 join_all(nodes.into_iter().map(|name| async move {
832 let name = name.as_ref();
833 tracing::info!("kill {name}");
834 Handle::current().kill(name);
835 tokio::time::sleep(Duration::from_secs(restart_delay_secs as u64)).await;
836 tracing::info!("restart {name}");
837 Handle::current().restart(name);
838 }))
839 .await;
840 }
841
842 #[cfg_or_panic(madsim)]
843 pub async fn simple_kill_nodes(&self, nodes: impl IntoIterator<Item = impl AsRef<str>>) {
844 join_all(nodes.into_iter().map(|name| async move {
845 let name = name.as_ref();
846 tracing::info!("kill {name}");
847 Handle::current().kill(name);
848 }))
849 .await;
850 }
851
852 #[cfg_or_panic(madsim)]
853 pub async fn simple_restart_nodes(&self, nodes: impl IntoIterator<Item = impl AsRef<str>>) {
854 join_all(nodes.into_iter().map(|name| async move {
855 let name = name.as_ref();
856 tracing::info!("restart {name}");
857 Handle::current().restart(name);
858 }))
859 .await;
860 }
861
862 #[cfg_or_panic(madsim)]
864 pub async fn create_kafka_producer(&self, datadir: &str) {
865 self.handle
866 .create_node()
867 .name("kafka-producer")
868 .ip("192.168.11.2".parse().unwrap())
869 .build()
870 .spawn(crate::kafka::producer(
871 "192.168.11.1:29092",
872 datadir.to_string(),
873 ))
874 .await
875 .unwrap();
876 }
877
878 #[cfg_or_panic(madsim)]
880 pub fn create_kafka_topics(&self, topics: HashMap<String, i32>) {
881 self.handle
882 .create_node()
883 .name("kafka-topic-create")
884 .ip("192.168.11.3".parse().unwrap())
885 .build()
886 .spawn(crate::kafka::create_topics("192.168.11.1:29092", topics));
887 }
888
889 pub fn config(&self) -> Configuration {
890 self.config.clone()
891 }
892
893 pub fn handle(&self) -> &Handle {
894 &self.handle
895 }
896
897 #[cfg_or_panic(madsim)]
899 pub async fn graceful_shutdown(&self) {
900 let mut nodes = vec![];
901 let mut metas = vec![];
902 for i in 1..=self.config.meta_nodes {
903 metas.push(format!("meta-{i}"));
904 }
905 for i in 1..=self.config.frontend_nodes {
906 nodes.push(format!("frontend-{i}"));
907 }
908 for i in 1..=self.config.compute_nodes {
909 nodes.push(format!("compute-{i}"));
910 }
911 for i in 1..=self.config.compactor_nodes {
912 nodes.push(format!("compactor-{i}"));
913 }
914
915 tracing::info!("graceful shutdown");
916 let waiting_time = Duration::from_secs(10);
917 for node in &nodes {
919 self.handle.send_ctrl_c(node);
920 }
921 tokio::time::sleep(waiting_time).await;
922 for meta in &metas {
924 self.handle.send_ctrl_c(meta);
925 }
926 tokio::time::sleep(waiting_time).await;
927
928 for node in nodes.iter().chain(metas.iter()) {
930 if !self.handle.is_exit(node) {
931 panic!("failed to graceful shutdown {node} in {waiting_time:?}");
932 }
933 }
934 }
935
936 pub async fn wait_for_recovery(&mut self) -> Result<()> {
937 let timeout = Duration::from_secs(200);
938 let mut session = self.start_session();
939 tokio::time::timeout(timeout, async {
940 loop {
941 if let Ok(result) = session.run("select rw_recovery_status()").await
942 && result == "RUNNING"
943 {
944 break;
945 }
946 tokio::time::sleep(Duration::from_nanos(10)).await;
947 }
948 })
949 .await?;
950 Ok(())
951 }
952
953 pub async fn wait_for_scale(&mut self, parallelism: usize) -> Result<()> {
955 let timeout = Duration::from_secs(200);
956 let mut session = self.start_session();
957 tokio::time::timeout(timeout, async {
958 loop {
959 let parallelism_sql = format!(
960 "select count(parallelism) filter (where parallelism != {parallelism})\
961 from (select count(*) parallelism from rw_actors group by fragment_id);"
962 );
963 if let Ok(result) = session.run(¶llelism_sql).await
964 && result == "0"
965 {
966 break;
967 }
968 tokio::time::sleep(Duration::from_nanos(10)).await;
969 }
970 })
971 .await?;
972 Ok(())
973 }
974}
975
976type SessionRequest = (
977 String, oneshot::Sender<Result<String>>, );
980
981#[derive(Debug, Clone)]
983pub struct Session {
984 query_tx: mpsc::Sender<SessionRequest>,
985}
986
987impl Session {
988 pub async fn run_all(&mut self, sqls: Vec<impl Into<String>>) -> Result<Vec<String>> {
990 let mut results = Vec::with_capacity(sqls.len());
991 for sql in sqls {
992 let result = self.run(sql).await?;
993 results.push(result);
994 }
995 Ok(results)
996 }
997
998 pub async fn run(&mut self, sql: impl Into<String>) -> Result<String> {
1000 let (tx, rx) = oneshot::channel();
1001 self.query_tx.send((sql.into(), tx)).await?;
1002 rx.await?
1003 }
1004
1005 pub async fn flush(&mut self) -> Result<()> {
1007 self.run("FLUSH").await?;
1008 Ok(())
1009 }
1010
1011 pub async fn is_arrangement_backfill_enabled(&mut self) -> Result<bool> {
1012 let result = self.run("show streaming_use_arrangement_backfill").await?;
1013 Ok(result == "true")
1014 }
1015}
1016
1017#[derive(Debug, Clone, Copy, PartialEq)]
1019pub struct KillOpts {
1020 pub kill_rate: f32,
1021 pub kill_meta: bool,
1022 pub kill_frontend: bool,
1023 pub kill_compute: bool,
1024 pub kill_compactor: bool,
1025 pub restart_delay_secs: u32,
1026}
1027
1028impl KillOpts {
1029 pub const ALL: Self = KillOpts {
1031 kill_rate: 1.0,
1032 kill_meta: false, kill_frontend: true,
1034 kill_compute: true,
1035 kill_compactor: true,
1036 restart_delay_secs: 20,
1037 };
1038 pub const ALL_FAST: Self = KillOpts {
1039 kill_rate: 1.0,
1040 kill_meta: false, kill_frontend: true,
1042 kill_compute: true,
1043 kill_compactor: true,
1044 restart_delay_secs: 2,
1045 };
1046}