1use std::cmp::min;
16use std::path::Path;
17use std::sync::Arc;
18use std::time::Duration;
19
20use anyhow::Result;
21use rand::{Rng, rng as thread_rng};
22use sqllogictest::substitution::well_known;
23use sqllogictest::{
24 Condition, ParallelTestError, Partitioner, QueryExpect, Record, StatementExpect,
25};
26
27use crate::client::RisingWave;
28use crate::cluster::Cluster;
29use crate::parse::extract_sql_command;
30use crate::slt::background_ddl_mode::*;
31use crate::slt::runner::{random_vnode_count, run_kill_not_allowed, run_no_kill};
32use crate::slt::slt_env::{Env, Opts};
33use crate::slt::vnode_mode::*;
34use crate::utils::TimedExt;
35use crate::{evaluate_skip, wait_or_retry_background_ddl};
36
37const MAX_RETRY: usize = 10;
39
40#[derive(Debug, PartialEq, Eq)]
41pub enum SqlCmd {
42 Create {
44 is_create_table_as: bool,
45 },
46 CreateMaterializedView {
48 name: String,
49 },
50 SetBackgroundDdl {
52 enable: bool,
53 },
54 Drop,
55 Dml,
56 Flush,
57 Others,
58}
59
60impl SqlCmd {
61 fn allow_kill(&self) -> bool {
62 matches!(
63 self,
64 SqlCmd::Create {
65 is_create_table_as: false,
67 ..
68 } | SqlCmd::CreateMaterializedView { .. }
69 | SqlCmd::Drop
70 )
71 }
76
77 fn is_create(&self) -> bool {
78 matches!(
79 self,
80 SqlCmd::Create { .. } | SqlCmd::CreateMaterializedView { .. }
81 )
82 }
83}
84
85const KILL_IGNORE_FILES: &[&str] = &[
86 "tpch_snapshot.slt",
88 "tpch_upstream.slt",
89 "search_path.slt",
91 "transaction/now.slt",
93 "transaction/read_only_multi_conn.slt",
94 "transaction/read_only.slt",
95 "transaction/tolerance.slt",
96 "transaction/cursor.slt",
97 "transaction/cursor_multi_conn.slt",
98];
99
100mod background_ddl_mode {
102 use anyhow::bail;
103 use rand::Rng;
104 use rand_chacha::ChaChaRng;
105 use sqllogictest::{Condition, Record, StatementExpect};
106
107 use crate::client::RisingWave;
108 use crate::slt::SqlCmd;
109 use crate::slt::slt_env::Env;
110
111 pub(super) async fn wait_background_mv_finished(mview_name: &str) -> anyhow::Result<()> {
113 let Ok(rw) = RisingWave::connect("frontend".into(), "dev".into()).await else {
114 bail!("failed to connect to frontend for {mview_name}");
115 };
116 let client = rw.pg_client();
117 if client.simple_query("WAIT;").await.is_err() {
118 bail!("failed to wait for background mv to finish creating for {mview_name}");
119 }
120
121 let Ok(result) = client
122 .query(
123 "select count(*) from pg_matviews where matviewname=$1;",
124 &[&mview_name],
125 )
126 .await
127 else {
128 bail!("failed to query pg_matviews for {mview_name}");
129 };
130
131 match result[0].try_get::<_, i64>(0) {
132 Ok(1) => Ok(()),
133 r => bail!("expected 1 row in pg_matviews, got {r:#?} instead for {mview_name}"),
134 }
135 }
136
137 pub(super) async fn set_background_ddl<D, M, T>(
139 tester: &mut sqllogictest::Runner<D, M>,
140 env: &Env,
141 record: &Record<T>,
142 cmd: &SqlCmd,
143 manual_background_ddl_enabled: bool,
144 rng: &mut ChaChaRng,
145 background_ddl_enabled: &mut bool,
146 ) where
147 D: sqllogictest::AsyncDB<ColumnType = T>,
148 M: sqllogictest::MakeConnection<Conn = D>,
149 T: sqllogictest::ColumnType,
150 {
151 if let Record::Statement {
152 loc,
153 conditions,
154 connection,
155 ..
156 } = &record
157 && matches!(cmd, SqlCmd::CreateMaterializedView { .. })
158 && !manual_background_ddl_enabled
159 && conditions.iter().all(|c| {
160 *c != Condition::SkipIf {
161 label: "madsim".to_owned(),
162 }
163 })
164 && env.background_ddl_rate() > 0.0
165 {
166 let background_ddl_setting = rng.random_bool(env.background_ddl_rate());
167 let set_background_ddl = Record::Statement {
168 loc: loc.clone(),
169 conditions: conditions.clone(),
170 connection: connection.clone(),
171 expected: StatementExpect::Ok,
172 sql: format!("SET BACKGROUND_DDL={background_ddl_setting};"),
173 retry: None,
174 };
175 tester.run_async(set_background_ddl).await.unwrap();
176 *background_ddl_enabled = background_ddl_setting;
177 };
178 }
179
180 #[macro_export]
182 macro_rules! wait_or_retry_background_ddl {
183 ($record:expr, $name:expr, $iteration:expr) => {
184 let record = $record;
185 let i = $iteration;
186 tracing::debug!(iteration = i, "Retry for background ddl");
187 match wait_background_mv_finished($name).await {
188 Ok(_) => {
189 tracing::debug!(
190 iteration = i,
191 "Record with background_ddl {:?} finished",
192 record
193 );
194 break;
195 }
196 Err(err) => {
197 tracing::error!(
198 iteration = i,
199 ?err,
200 "failed to wait for background mv to finish creating"
201 );
202 if i >= MAX_RETRY {
203 panic!("failed to run test after retry {i} times, error={err:#?}");
204 }
205 continue;
206 }
207 }
208 };
209 }
210}
211
212mod vnode_mode {
213 use std::env;
214 use std::hash::{DefaultHasher, Hash, Hasher};
215 use std::sync::LazyLock;
216
217 use anyhow::bail;
218 use sqllogictest::Partitioner;
219
220 #[derive(Clone)]
222 pub(super) struct HashPartitioner {
223 count: u64,
224 id: u64,
225 }
226
227 impl HashPartitioner {
228 pub(super) fn new(count: u64, id: u64) -> anyhow::Result<Self> {
229 if count == 0 {
230 bail!("partition count must be greater than zero");
231 }
232 if id >= count {
233 bail!("partition id (zero-based) must be less than count");
234 }
235 Ok(Self { count, id })
236 }
237 }
238
239 impl Partitioner for HashPartitioner {
240 fn matches(&self, file_name: &str) -> bool {
241 let mut hasher = DefaultHasher::new();
242 file_name.hash(&mut hasher);
243 hasher.finish() % self.count == self.id
244 }
245 }
246
247 pub(super) static PARTITIONER: LazyLock<Option<HashPartitioner>> = LazyLock::new(|| {
248 let count = env::var("BUILDKITE_PARALLEL_JOB_COUNT")
249 .ok()?
250 .parse::<u64>()
251 .unwrap();
252 let id = env::var("BUILDKITE_PARALLEL_JOB")
253 .ok()?
254 .parse::<u64>()
255 .unwrap();
256 Some(HashPartitioner::new(count, id).unwrap())
257 });
258}
259
260pub mod slt_env {
261 use rand::SeedableRng;
262 use rand_chacha::ChaChaRng;
263
264 use crate::cluster::KillOpts;
265
266 pub struct Opts {
267 pub kill_opts: KillOpts,
268 pub background_ddl_rate: f64,
270 pub random_vnode_count: bool,
272 }
273
274 pub(super) struct Env {
275 opts: Opts,
276 }
277
278 impl Env {
279 pub fn new(opts: Opts) -> Self {
280 Self { opts }
281 }
282
283 pub fn get_rng() -> ChaChaRng {
284 let seed = std::env::var("MADSIM_TEST_SEED")
285 .unwrap_or("0".to_owned())
286 .parse::<u64>()
287 .unwrap();
288 ChaChaRng::seed_from_u64(seed)
289 }
290
291 pub fn background_ddl_rate(&self) -> f64 {
292 self.opts.background_ddl_rate
293 }
294
295 pub fn kill(&self) -> bool {
296 self.opts.kill_opts.kill_compute
297 || self.opts.kill_opts.kill_meta
298 || self.opts.kill_opts.kill_frontend
299 || self.opts.kill_opts.kill_compactor
300 }
301
302 pub fn random_vnode_count(&self) -> bool {
303 self.opts.random_vnode_count
304 }
305
306 pub fn kill_opts(&self) -> KillOpts {
307 self.opts.kill_opts
308 }
309
310 pub fn kill_rate(&self) -> f64 {
311 self.opts.kill_opts.kill_rate as f64
312 }
313 }
314}
315
316mod runner {
317 use std::time::Duration;
318
319 use rand::prelude::IteratorRandom;
320 use rand::rng as thread_rng;
321 use sqllogictest::{Record, StatementExpect, TestError};
322
323 use crate::slt::slt_env::Env;
324 use crate::slt::{MAX_RETRY, SqlCmd};
325 use crate::utils::TimedExt;
326 #[macro_export]
327 macro_rules! evaluate_skip {
328 ($env:expr, $path:expr) => {
329 if let Some(partitioner) = PARTITIONER.as_ref()
330 && !partitioner.matches($path.to_str().unwrap())
331 {
332 println!("[skip partition] {}", $path.display());
333 continue;
334 } else if $env.kill() && KILL_IGNORE_FILES.iter().any(|s| $path.ends_with(s)) {
335 println!("[skip kill] {}", $path.display());
336 continue;
337 } else {
338 println!("[run] {}", $path.display());
339 }
340 };
341 }
342
343 pub(super) fn random_vnode_count<T: sqllogictest::ColumnType>(
344 env: &Env,
345 records: &[Record<T>],
346 ) -> bool {
347 env.random_vnode_count()
348 && records.iter().all(|record| {
349 if let Record::Statement { sql, .. } | Record::Query { sql, .. } = record
350 && sql.to_lowercase().contains("parallelism")
351 {
352 println!("[RANDOM VNODE COUNT] skip: {}", sql);
353 false
354 } else {
355 true
356 }
357 })
358 }
359
360 pub(super) async fn run_no_kill<D, M, T>(
362 tester: &mut sqllogictest::Runner<D, M>,
363 random_vnode_count: bool,
364 cmd: &SqlCmd,
365 record: Record<T>,
366 ) -> Result<(), TestError>
367 where
368 D: sqllogictest::AsyncDB<ColumnType = T>,
369 M: sqllogictest::MakeConnection<Conn = D>,
370 T: sqllogictest::ColumnType,
371 {
372 if random_vnode_count
374 && cmd.is_create()
375 && let Record::Statement {
376 loc,
377 conditions,
378 connection,
379 ..
380 } = &record
381 {
382 let vnode_count = (2..=64) .chain(224..=288) .chain(992..=1056) .choose(&mut thread_rng())
386 .unwrap();
387 let sql = format!("SET STREAMING_MAX_PARALLELISM = {vnode_count};");
388 println!("[RANDOM VNODE COUNT] set: {vnode_count}");
389 let set_random_vnode_count = Record::Statement {
390 loc: loc.clone(),
391 conditions: conditions.clone(),
392 connection: connection.clone(),
393 sql,
394 expected: StatementExpect::Ok,
395 retry: None,
396 };
397 tester.run_async(set_random_vnode_count).await.unwrap();
398 println!("[RANDOM VNODE COUNT] run: {record}");
399 }
400
401 tester.run_async(record).await.map(|_| ())
402 }
403
404 pub(super) async fn run_kill_not_allowed<D, M, T>(
406 tester: &mut sqllogictest::Runner<D, M>,
407 record: Record<T>,
408 ) where
409 D: sqllogictest::AsyncDB<ColumnType = T>,
410 M: sqllogictest::MakeConnection<Conn = D>,
411 T: sqllogictest::ColumnType,
412 {
413 for i in 0usize.. {
414 let delay = Duration::from_secs(1 << i);
415 if let Err(err) = tester
416 .run_async(record.clone())
417 .timed(|_res, elapsed| {
418 tracing::debug!("Record {:?} finished in {:?}", record, elapsed)
419 })
420 .await
421 {
422 let err_string = err.to_string().to_ascii_lowercase();
423 let allowed_errs = [
426 "no reader for dml in table",
427 "error reading a body from connection: broken pipe",
428 "failed to inject barrier",
429 "get error from control stream",
430 "cluster is under recovering",
431 "streaming vnode mapping has not been initialized",
432 "streaming vnode mapping not found",
433 ];
434 let should_retry = i < MAX_RETRY
435 && allowed_errs
436 .iter()
437 .any(|allowed_err| err_string.contains(&allowed_err.to_ascii_lowercase()));
438 if !should_retry {
439 panic!("{}", err);
440 }
441 tracing::error!("failed to run test: {err}\nretry after {delay:?}");
442 } else {
443 break;
444 }
445 tokio::time::sleep(delay).await;
446 }
447 }
448}
449
450pub async fn run_slt_task(cluster: Arc<Cluster>, glob: &str, opts: Opts) {
452 let env = slt_env::Env::new(opts);
453 tracing::info!("background_ddl_rate: {}", env.background_ddl_rate());
454 let mut rng = Env::get_rng();
455 let files = glob::glob(glob).expect("failed to read glob pattern");
456 for file in files {
457 let mut tester =
459 sqllogictest::Runner::new(|| RisingWave::connect("frontend".into(), "dev".into()));
460 tester.add_label("madsim");
461 tester.set_var(well_known::DATABASE.to_owned(), "dev".to_owned());
462
463 let file = file.unwrap();
464 let path = file.as_path();
465
466 evaluate_skip!(env, path);
467
468 let tempfile = (path.ends_with("kafka.slt") || path.ends_with("kafka_batch.slt"))
470 .then(|| hack_kafka_test(path));
471 let path = tempfile.as_ref().map(|p| p.path()).unwrap_or(path);
472
473 let mut background_ddl_enabled = false;
475
476 let mut manual_background_ddl_enabled = false;
479
480 let records = sqllogictest::parse_file(path).expect("failed to parse file");
481 let random_vnode_count = random_vnode_count(&env, &records);
482
483 for record in records {
484 if let sqllogictest::Record::Halt { .. } = record {
489 break;
490 }
491
492 let cmd = match &record {
493 sqllogictest::Record::Statement {
494 sql, conditions, ..
495 }
496 | sqllogictest::Record::Query {
497 sql, conditions, ..
498 } if conditions
499 .iter()
500 .all(|c| !matches!(c, Condition::SkipIf{ label } if label == "madsim"))
501 && !conditions
502 .iter()
503 .any(|c| matches!(c, Condition::OnlyIf{ label} if label != "madsim" )) =>
504 {
505 extract_sql_command(sql).unwrap_or(SqlCmd::Others)
506 }
507 _ => SqlCmd::Others,
508 };
509
510 if !env.kill() {
512 match run_no_kill(&mut tester, random_vnode_count, &cmd, record).await {
513 Ok(_) => continue,
514 Err(e) => panic!("{}", e),
515 }
516 }
517
518 tracing::debug!(?cmd, "Running");
520
521 if let SqlCmd::SetBackgroundDdl { enable } = cmd {
522 manual_background_ddl_enabled = enable;
523 background_ddl_enabled = enable;
524 }
525
526 set_background_ddl(
528 &mut tester,
529 &env,
530 &record,
531 &cmd,
532 manual_background_ddl_enabled,
533 &mut rng,
534 &mut background_ddl_enabled,
535 )
536 .await;
537
538 if !cmd.allow_kill() {
539 run_kill_not_allowed(&mut tester, record.clone()).await;
540 continue;
541 }
542
543 let should_kill = thread_rng().random_bool(env.kill_rate());
544 let handle = if should_kill {
546 let cluster = cluster.clone();
547 let opts = env.kill_opts();
548 Some(tokio::spawn(async move {
549 let t = thread_rng().random_range(Duration::default()..Duration::from_secs(1));
550 tokio::time::sleep(t).await;
551 cluster.kill_node(&opts).await;
552 tokio::time::sleep(Duration::from_secs(15)).await;
553 }))
554 } else {
555 None
556 };
557
558 for i in 0usize.. {
559 tracing::debug!(iteration = i, "retry count");
560 let delay = Duration::from_secs(min(1 << i, 10));
561 if i > 0 {
562 tokio::time::sleep(delay).await;
563 }
564 match tester
565 .run_async(record.clone())
566 .timed(|_res, elapsed| {
567 tracing::debug!("Record {:?} finished in {:?}", record, elapsed)
568 })
569 .await
570 {
571 Ok(_) => {
572 if let SqlCmd::CreateMaterializedView { ref name } = cmd
574 && background_ddl_enabled
575 && !matches!(
576 record,
577 Record::Statement {
578 expected: StatementExpect::Error(_),
579 ..
580 } | Record::Query {
581 expected: QueryExpect::Error(_),
582 ..
583 }
584 )
585 {
586 wait_or_retry_background_ddl!(&record, name, i);
587 }
588 break;
589 }
590 Err(e) => {
591 match cmd {
592 SqlCmd::Create {
594 is_create_table_as: false,
595 }
596 | SqlCmd::CreateMaterializedView { .. }
597 if i != 0
598 && let e = e.to_string()
599 && !e.contains("gRPC request to meta service failed")
602 && e.contains("exists")
603 && !e.contains("under creation")
604 && e.contains("Catalog error") =>
605 {
606 break;
607 }
608 SqlCmd::Drop
610 if i != 0
611 && e.to_string().contains("not found")
612 && e.to_string().contains("Catalog error") =>
613 {
614 break;
615 }
616
617 _ if i >= MAX_RETRY => {
619 panic!("failed to run test after retry {i} times: {e}")
620 }
621 SqlCmd::CreateMaterializedView { ref name }
622 if i != 0
623 && e.to_string().contains("table is in creating procedure")
624 && background_ddl_enabled =>
625 {
626 wait_or_retry_background_ddl!(&record, name, i);
627 }
628 _ => tracing::error!(
629 iteration = i,
630 "failed to run test: {e}\nretry after {delay:?}"
631 ),
632 }
633 }
634 }
635 }
636 if let SqlCmd::SetBackgroundDdl { enable } = cmd {
637 background_ddl_enabled = enable;
638 };
639 if let Some(handle) = handle {
640 handle.await.unwrap();
641 }
642 }
643 }
644}
645
646pub async fn run_parallel_slt_task(glob: &str, jobs: usize) -> Result<(), ParallelTestError> {
647 let mut tester =
648 sqllogictest::Runner::new(|| RisingWave::connect("frontend".into(), "dev".into()));
649 tester.add_label("madsim");
650
651 if let Some(partitioner) = PARTITIONER.as_ref() {
652 tester.with_partitioner(partitioner.clone());
653 }
654
655 tester
656 .run_parallel_async(
657 glob,
658 vec!["frontend".into()],
659 |host, dbname| async move { RisingWave::connect(host, dbname).await.unwrap() },
660 jobs,
661 )
662 .await
663 .map_err(|e| panic!("{e}"))
664}
665
666fn hack_kafka_test(path: &Path) -> tempfile::NamedTempFile {
668 let content = std::fs::read_to_string(path).expect("failed to read file");
669 let simple_avsc_full_path =
670 std::fs::canonicalize("src/connector/src/test_data/simple-schema.avsc")
671 .expect("failed to get schema path");
672 let complex_avsc_full_path =
673 std::fs::canonicalize("src/connector/src/test_data/complex-schema.avsc")
674 .expect("failed to get schema path");
675 let json_schema_full_path =
676 std::fs::canonicalize("src/connector/src/test_data/complex-schema.json")
677 .expect("failed to get schema path");
678 let content = content
679 .replace("127.0.0.1:29092", "192.168.11.1:29092")
680 .replace("localhost:29092", "192.168.11.1:29092")
681 .replace(
682 "/risingwave/avro-simple-schema.avsc",
683 simple_avsc_full_path.to_str().unwrap(),
684 )
685 .replace(
686 "/risingwave/avro-complex-schema.avsc",
687 complex_avsc_full_path.to_str().unwrap(),
688 )
689 .replace(
690 "/risingwave/json-complex-schema",
691 json_schema_full_path.to_str().unwrap(),
692 );
693 let file = tempfile::NamedTempFile::new().expect("failed to create temp file");
694 std::fs::write(file.path(), content).expect("failed to write file");
695 println!("created a temp file for kafka test: {:?}", file.path());
696 file
697}
698
699#[cfg(test)]
700mod tests {
701 use std::fmt::Debug;
702
703 use expect_test::{Expect, expect};
704
705 use super::*;
706
707 fn check(actual: impl Debug, expect: Expect) {
708 let actual = format!("{:#?}", actual);
709 expect.assert_eq(&actual);
710 }
711
712 #[test]
713 fn test_extract_sql_command() {
714 check(
715 extract_sql_command("create table t as select 1;"),
716 expect![[r#"
717 Ok(
718 Create {
719 is_create_table_as: true,
720 },
721 )"#]],
722 );
723 check(
724 extract_sql_command(" create table t (a int);"),
725 expect![[r#"
726 Ok(
727 Create {
728 is_create_table_as: false,
729 },
730 )"#]],
731 );
732 check(
733 extract_sql_command(" create materialized view m_1 as select 1;"),
734 expect![[r#"
735 Ok(
736 CreateMaterializedView {
737 name: "m_1",
738 },
739 )"#]],
740 );
741 check(
742 extract_sql_command("set background_ddl= true;"),
743 expect![[r#"
744 Ok(
745 SetBackgroundDdl {
746 enable: true,
747 },
748 )"#]],
749 );
750 check(
751 extract_sql_command("SET BACKGROUND_DDL=true;"),
752 expect![[r#"
753 Ok(
754 SetBackgroundDdl {
755 enable: true,
756 },
757 )"#]],
758 );
759 check(
760 extract_sql_command("CREATE MATERIALIZED VIEW if not exists m_1 as select 1;"),
761 expect![[r#"
762 Ok(
763 CreateMaterializedView {
764 name: "m_1",
765 },
766 )"#]],
767 )
768 }
769}