1use std::cmp::min;
16use std::path::Path;
17use std::sync::Arc;
18use std::time::Duration;
19
20use anyhow::Result;
21use rand::{Rng, rng as thread_rng};
22use sqllogictest::{
23 Condition, ParallelTestError, Partitioner, QueryExpect, Record, StatementExpect,
24};
25
26use crate::client::RisingWave;
27use crate::cluster::Cluster;
28use crate::parse::extract_sql_command;
29use crate::slt::background_ddl_mode::*;
30use crate::slt::runner::{random_vnode_count, run_kill_not_allowed, run_no_kill};
31use crate::slt::slt_env::{Env, Opts};
32use crate::slt::vnode_mode::*;
33use crate::utils::TimedExt;
34use crate::{evaluate_skip, wait_or_retry_background_ddl};
35
36const MAX_RETRY: usize = 10;
38
39#[derive(Debug, PartialEq, Eq)]
40pub enum SqlCmd {
41 Create {
43 is_create_table_as: bool,
44 },
45 CreateSink {
47 is_sink_into_table: bool,
48 },
49 CreateMaterializedView {
51 name: String,
52 },
53 SetBackgroundDdl {
55 enable: bool,
56 },
57 Drop,
58 Dml,
59 Flush,
60 Others,
61}
62
63impl SqlCmd {
64 fn allow_kill(&self) -> bool {
65 matches!(
66 self,
67 SqlCmd::Create {
68 is_create_table_as: false,
70 ..
71 } | SqlCmd::CreateSink {
72 is_sink_into_table: false,
73 } | SqlCmd::CreateMaterializedView { .. }
74 | SqlCmd::Drop
75 )
76 }
81
82 fn is_create(&self) -> bool {
83 matches!(
84 self,
85 SqlCmd::Create { .. }
86 | SqlCmd::CreateSink { .. }
87 | SqlCmd::CreateMaterializedView { .. }
88 )
89 }
90}
91
92const KILL_IGNORE_FILES: &[&str] = &[
93 "tpch_snapshot.slt",
95 "tpch_upstream.slt",
96 "search_path.slt",
98 "transaction/now.slt",
100 "transaction/read_only_multi_conn.slt",
101 "transaction/read_only.slt",
102 "transaction/tolerance.slt",
103 "transaction/cursor.slt",
104 "transaction/cursor_multi_conn.slt",
105];
106
107mod background_ddl_mode {
109 use anyhow::bail;
110 use rand::Rng;
111 use rand_chacha::ChaChaRng;
112 use sqllogictest::{Condition, Record, StatementExpect};
113
114 use crate::client::RisingWave;
115 use crate::slt::SqlCmd;
116 use crate::slt::slt_env::Env;
117
118 pub(super) async fn wait_background_mv_finished(mview_name: &str) -> anyhow::Result<()> {
120 let Ok(rw) = RisingWave::connect("frontend".into(), "dev".into()).await else {
121 bail!("failed to connect to frontend for {mview_name}");
122 };
123 let client = rw.pg_client();
124 if client.simple_query("WAIT;").await.is_err() {
125 bail!("failed to wait for background mv to finish creating for {mview_name}");
126 }
127
128 let Ok(result) = client
129 .query(
130 "select count(*) from pg_matviews where matviewname=$1;",
131 &[&mview_name],
132 )
133 .await
134 else {
135 bail!("failed to query pg_matviews for {mview_name}");
136 };
137
138 match result[0].try_get::<_, i64>(0) {
139 Ok(1) => Ok(()),
140 r => bail!("expected 1 row in pg_matviews, got {r:#?} instead for {mview_name}"),
141 }
142 }
143
144 pub(super) async fn set_background_ddl<D, M, T>(
146 tester: &mut sqllogictest::Runner<D, M>,
147 env: &Env,
148 record: &Record<T>,
149 cmd: &SqlCmd,
150 manual_background_ddl_enabled: bool,
151 rng: &mut ChaChaRng,
152 background_ddl_enabled: &mut bool,
153 ) where
154 D: sqllogictest::AsyncDB<ColumnType = T>,
155 M: sqllogictest::MakeConnection<Conn = D>,
156 T: sqllogictest::ColumnType,
157 {
158 if let Record::Statement {
159 loc,
160 conditions,
161 connection,
162 ..
163 } = &record
164 && matches!(cmd, SqlCmd::CreateMaterializedView { .. })
165 && !manual_background_ddl_enabled
166 && conditions.iter().all(|c| {
167 *c != Condition::SkipIf {
168 label: "madsim".to_owned(),
169 }
170 })
171 && env.background_ddl_rate() > 0.0
172 {
173 let background_ddl_setting = rng.random_bool(env.background_ddl_rate());
174 let set_background_ddl = Record::Statement {
175 loc: loc.clone(),
176 conditions: conditions.clone(),
177 connection: connection.clone(),
178 expected: StatementExpect::Ok,
179 sql: format!("SET BACKGROUND_DDL={background_ddl_setting};"),
180 retry: None,
181 };
182 tester.run_async(set_background_ddl).await.unwrap();
183 *background_ddl_enabled = background_ddl_setting;
184 };
185 }
186
187 #[macro_export]
189 macro_rules! wait_or_retry_background_ddl {
190 ($record:expr, $name:expr, $iteration:expr) => {
191 let record = $record;
192 let i = $iteration;
193 tracing::debug!(iteration = i, "Retry for background ddl");
194 match wait_background_mv_finished($name).await {
195 Ok(_) => {
196 tracing::debug!(
197 iteration = i,
198 "Record with background_ddl {:?} finished",
199 record
200 );
201 break;
202 }
203 Err(err) => {
204 tracing::error!(
205 iteration = i,
206 ?err,
207 "failed to wait for background mv to finish creating"
208 );
209 if i >= MAX_RETRY {
210 panic!("failed to run test after retry {i} times, error={err:#?}");
211 }
212 continue;
213 }
214 }
215 };
216 }
217}
218
219mod vnode_mode {
220 use std::env;
221 use std::hash::{DefaultHasher, Hash, Hasher};
222 use std::sync::LazyLock;
223
224 use anyhow::bail;
225 use sqllogictest::Partitioner;
226
227 #[derive(Clone)]
229 pub(super) struct HashPartitioner {
230 count: u64,
231 id: u64,
232 }
233
234 impl HashPartitioner {
235 pub(super) fn new(count: u64, id: u64) -> anyhow::Result<Self> {
236 if count == 0 {
237 bail!("partition count must be greater than zero");
238 }
239 if id >= count {
240 bail!("partition id (zero-based) must be less than count");
241 }
242 Ok(Self { count, id })
243 }
244 }
245
246 impl Partitioner for HashPartitioner {
247 fn matches(&self, file_name: &str) -> bool {
248 let mut hasher = DefaultHasher::new();
249 file_name.hash(&mut hasher);
250 hasher.finish() % self.count == self.id
251 }
252 }
253
254 pub(super) static PARTITIONER: LazyLock<Option<HashPartitioner>> = LazyLock::new(|| {
255 let count = env::var("BUILDKITE_PARALLEL_JOB_COUNT")
256 .ok()?
257 .parse::<u64>()
258 .unwrap();
259 let id = env::var("BUILDKITE_PARALLEL_JOB")
260 .ok()?
261 .parse::<u64>()
262 .unwrap();
263 Some(HashPartitioner::new(count, id).unwrap())
264 });
265}
266
267pub mod slt_env {
268 use rand::SeedableRng;
269 use rand_chacha::ChaChaRng;
270
271 use crate::cluster::KillOpts;
272
273 pub struct Opts {
274 pub kill_opts: KillOpts,
275 pub background_ddl_rate: f64,
277 pub random_vnode_count: bool,
279 }
280
281 pub(super) struct Env {
282 opts: Opts,
283 }
284
285 impl Env {
286 pub fn new(opts: Opts) -> Self {
287 Self { opts }
288 }
289
290 pub fn get_rng() -> ChaChaRng {
291 let seed = std::env::var("MADSIM_TEST_SEED")
292 .unwrap_or("0".to_owned())
293 .parse::<u64>()
294 .unwrap();
295 ChaChaRng::seed_from_u64(seed)
296 }
297
298 pub fn background_ddl_rate(&self) -> f64 {
299 self.opts.background_ddl_rate
300 }
301
302 pub fn kill(&self) -> bool {
303 self.opts.kill_opts.kill_compute
304 || self.opts.kill_opts.kill_meta
305 || self.opts.kill_opts.kill_frontend
306 || self.opts.kill_opts.kill_compactor
307 }
308
309 pub fn random_vnode_count(&self) -> bool {
310 self.opts.random_vnode_count
311 }
312
313 pub fn kill_opts(&self) -> KillOpts {
314 self.opts.kill_opts
315 }
316
317 pub fn kill_rate(&self) -> f64 {
318 self.opts.kill_opts.kill_rate as f64
319 }
320 }
321}
322
323mod runner {
324 use std::time::Duration;
325
326 use rand::prelude::IteratorRandom;
327 use rand::rng as thread_rng;
328 use sqllogictest::{Record, StatementExpect, TestError};
329
330 use crate::slt::slt_env::Env;
331 use crate::slt::{MAX_RETRY, SqlCmd};
332 use crate::utils::TimedExt;
333 #[macro_export]
334 macro_rules! evaluate_skip {
335 ($env:expr, $path:expr) => {
336 if let Some(partitioner) = PARTITIONER.as_ref()
337 && !partitioner.matches($path.to_str().unwrap())
338 {
339 println!("[skip partition] {}", $path.display());
340 continue;
341 } else if $env.kill() && KILL_IGNORE_FILES.iter().any(|s| $path.ends_with(s)) {
342 println!("[skip kill] {}", $path.display());
343 continue;
344 } else {
345 println!("[run] {}", $path.display());
346 }
347 };
348 }
349
350 pub(super) fn random_vnode_count<T: sqllogictest::ColumnType>(
351 env: &Env,
352 records: &[Record<T>],
353 ) -> bool {
354 env.random_vnode_count()
355 && records.iter().all(|record| {
356 if let Record::Statement { sql, .. } | Record::Query { sql, .. } = record
357 && sql.to_lowercase().contains("parallelism")
358 {
359 println!("[RANDOM VNODE COUNT] skip: {}", sql);
360 false
361 } else {
362 true
363 }
364 })
365 }
366
367 pub(super) async fn run_no_kill<D, M, T>(
369 tester: &mut sqllogictest::Runner<D, M>,
370 random_vnode_count: bool,
371 cmd: &SqlCmd,
372 record: Record<T>,
373 ) -> Result<(), TestError>
374 where
375 D: sqllogictest::AsyncDB<ColumnType = T>,
376 M: sqllogictest::MakeConnection<Conn = D>,
377 T: sqllogictest::ColumnType,
378 {
379 if random_vnode_count
381 && cmd.is_create()
382 && let Record::Statement {
383 loc,
384 conditions,
385 connection,
386 ..
387 } = &record
388 {
389 let vnode_count = (2..=64) .chain(224..=288) .chain(992..=1056) .choose(&mut thread_rng())
393 .unwrap();
394 let sql = format!("SET STREAMING_MAX_PARALLELISM = {vnode_count};");
395 println!("[RANDOM VNODE COUNT] set: {vnode_count}");
396 let set_random_vnode_count = Record::Statement {
397 loc: loc.clone(),
398 conditions: conditions.clone(),
399 connection: connection.clone(),
400 sql,
401 expected: StatementExpect::Ok,
402 retry: None,
403 };
404 tester.run_async(set_random_vnode_count).await.unwrap();
405 println!("[RANDOM VNODE COUNT] run: {record}");
406 }
407
408 tester.run_async(record).await.map(|_| ())
409 }
410
411 pub(super) async fn run_kill_not_allowed<D, M, T>(
413 tester: &mut sqllogictest::Runner<D, M>,
414 record: Record<T>,
415 ) where
416 D: sqllogictest::AsyncDB<ColumnType = T>,
417 M: sqllogictest::MakeConnection<Conn = D>,
418 T: sqllogictest::ColumnType,
419 {
420 for i in 0usize.. {
421 let delay = Duration::from_secs(1 << i);
422 if let Err(err) = tester
423 .run_async(record.clone())
424 .timed(|_res, elapsed| {
425 tracing::debug!("Record {:?} finished in {:?}", record, elapsed)
426 })
427 .await
428 {
429 let err_string = err.to_string();
430 let allowed_errs = [
433 "no reader for dml in table",
434 "error reading a body from connection: broken pipe",
435 "failed to inject barrier",
436 "get error from control stream",
437 "cluster is under recovering",
438 ];
439 let should_retry = i < MAX_RETRY
440 && allowed_errs
441 .iter()
442 .any(|allowed_err| err_string.contains(allowed_err));
443 if !should_retry {
444 panic!("{}", err);
445 }
446 tracing::error!("failed to run test: {err}\nretry after {delay:?}");
447 } else {
448 break;
449 }
450 tokio::time::sleep(delay).await;
451 }
452 }
453}
454
455pub async fn run_slt_task(cluster: Arc<Cluster>, glob: &str, opts: Opts) {
457 let env = slt_env::Env::new(opts);
458 tracing::info!("background_ddl_rate: {}", env.background_ddl_rate());
459 let mut rng = Env::get_rng();
460 let files = glob::glob(glob).expect("failed to read glob pattern");
461 for file in files {
462 let mut tester =
464 sqllogictest::Runner::new(|| RisingWave::connect("frontend".into(), "dev".into()));
465 tester.add_label("madsim");
466
467 let file = file.unwrap();
468 let path = file.as_path();
469
470 evaluate_skip!(env, path);
471
472 let tempfile = (path.ends_with("kafka.slt") || path.ends_with("kafka_batch.slt"))
474 .then(|| hack_kafka_test(path));
475 let path = tempfile.as_ref().map(|p| p.path()).unwrap_or(path);
476
477 let mut background_ddl_enabled = false;
479
480 let mut manual_background_ddl_enabled = false;
483
484 let records = sqllogictest::parse_file(path).expect("failed to parse file");
485 let random_vnode_count = random_vnode_count(&env, &records);
486
487 for record in records {
488 if let sqllogictest::Record::Halt { .. } = record {
493 break;
494 }
495
496 let cmd = match &record {
497 sqllogictest::Record::Statement {
498 sql, conditions, ..
499 }
500 | sqllogictest::Record::Query {
501 sql, conditions, ..
502 } if conditions
503 .iter()
504 .all(|c| !matches!(c, Condition::SkipIf{ label } if label == "madsim"))
505 && !conditions
506 .iter()
507 .any(|c| matches!(c, Condition::OnlyIf{ label} if label != "madsim" )) =>
508 {
509 extract_sql_command(sql).unwrap_or(SqlCmd::Others)
510 }
511 _ => SqlCmd::Others,
512 };
513
514 if !env.kill() {
516 match run_no_kill(&mut tester, random_vnode_count, &cmd, record).await {
517 Ok(_) => continue,
518 Err(e) => panic!("{}", e),
519 }
520 }
521
522 tracing::debug!(?cmd, "Running");
524
525 if let SqlCmd::SetBackgroundDdl { enable } = cmd {
526 manual_background_ddl_enabled = enable;
527 background_ddl_enabled = enable;
528 }
529
530 set_background_ddl(
532 &mut tester,
533 &env,
534 &record,
535 &cmd,
536 manual_background_ddl_enabled,
537 &mut rng,
538 &mut background_ddl_enabled,
539 )
540 .await;
541
542 if !cmd.allow_kill() {
543 run_kill_not_allowed(&mut tester, record.clone()).await;
544 continue;
545 }
546
547 let should_kill = thread_rng().random_bool(env.kill_rate());
548 let handle = if should_kill {
550 let cluster = cluster.clone();
551 let opts = env.kill_opts();
552 Some(tokio::spawn(async move {
553 let t = thread_rng().random_range(Duration::default()..Duration::from_secs(1));
554 tokio::time::sleep(t).await;
555 cluster.kill_node(&opts).await;
556 tokio::time::sleep(Duration::from_secs(15)).await;
557 }))
558 } else {
559 None
560 };
561
562 for i in 0usize.. {
563 tracing::debug!(iteration = i, "retry count");
564 let delay = Duration::from_secs(min(1 << i, 10));
565 if i > 0 {
566 tokio::time::sleep(delay).await;
567 }
568 match tester
569 .run_async(record.clone())
570 .timed(|_res, elapsed| {
571 tracing::debug!("Record {:?} finished in {:?}", record, elapsed)
572 })
573 .await
574 {
575 Ok(_) => {
576 if let SqlCmd::CreateMaterializedView { ref name } = cmd
578 && background_ddl_enabled
579 && !matches!(
580 record,
581 Record::Statement {
582 expected: StatementExpect::Error(_),
583 ..
584 } | Record::Query {
585 expected: QueryExpect::Error(_),
586 ..
587 }
588 )
589 {
590 wait_or_retry_background_ddl!(&record, name, i);
591 }
592 break;
593 }
594 Err(e) => {
595 match cmd {
596 SqlCmd::Create {
598 is_create_table_as: false,
599 }
600 | SqlCmd::CreateSink {
601 is_sink_into_table: false,
602 }
603 | SqlCmd::CreateMaterializedView { .. }
604 if i != 0
605 && let e = e.to_string()
606 && !e.contains("gRPC request to meta service failed")
609 && e.contains("exists")
610 && !e.contains("under creation")
611 && e.contains("Catalog error") =>
612 {
613 break;
614 }
615 SqlCmd::Drop
617 if i != 0
618 && e.to_string().contains("not found")
619 && e.to_string().contains("Catalog error") =>
620 {
621 break;
622 }
623
624 _ if i >= MAX_RETRY => {
626 panic!("failed to run test after retry {i} times: {e}")
627 }
628 SqlCmd::CreateMaterializedView { ref name }
629 if i != 0
630 && e.to_string().contains("table is in creating procedure")
631 && background_ddl_enabled =>
632 {
633 wait_or_retry_background_ddl!(&record, name, i);
634 }
635 _ => tracing::error!(
636 iteration = i,
637 "failed to run test: {e}\nretry after {delay:?}"
638 ),
639 }
640 }
641 }
642 }
643 if let SqlCmd::SetBackgroundDdl { enable } = cmd {
644 background_ddl_enabled = enable;
645 };
646 if let Some(handle) = handle {
647 handle.await.unwrap();
648 }
649 }
650 }
651}
652
653pub async fn run_parallel_slt_task(glob: &str, jobs: usize) -> Result<(), ParallelTestError> {
654 let mut tester =
655 sqllogictest::Runner::new(|| RisingWave::connect("frontend".into(), "dev".into()));
656 tester.add_label("madsim");
657
658 if let Some(partitioner) = PARTITIONER.as_ref() {
659 tester.with_partitioner(partitioner.clone());
660 }
661
662 tester
663 .run_parallel_async(
664 glob,
665 vec!["frontend".into()],
666 |host, dbname| async move { RisingWave::connect(host, dbname).await.unwrap() },
667 jobs,
668 )
669 .await
670 .map_err(|e| panic!("{e}"))
671}
672
673fn hack_kafka_test(path: &Path) -> tempfile::NamedTempFile {
675 let content = std::fs::read_to_string(path).expect("failed to read file");
676 let simple_avsc_full_path =
677 std::fs::canonicalize("src/connector/src/test_data/simple-schema.avsc")
678 .expect("failed to get schema path");
679 let complex_avsc_full_path =
680 std::fs::canonicalize("src/connector/src/test_data/complex-schema.avsc")
681 .expect("failed to get schema path");
682 let json_schema_full_path =
683 std::fs::canonicalize("src/connector/src/test_data/complex-schema.json")
684 .expect("failed to get schema path");
685 let content = content
686 .replace("127.0.0.1:29092", "192.168.11.1:29092")
687 .replace("localhost:29092", "192.168.11.1:29092")
688 .replace(
689 "/risingwave/avro-simple-schema.avsc",
690 simple_avsc_full_path.to_str().unwrap(),
691 )
692 .replace(
693 "/risingwave/avro-complex-schema.avsc",
694 complex_avsc_full_path.to_str().unwrap(),
695 )
696 .replace(
697 "/risingwave/json-complex-schema",
698 json_schema_full_path.to_str().unwrap(),
699 );
700 let file = tempfile::NamedTempFile::new().expect("failed to create temp file");
701 std::fs::write(file.path(), content).expect("failed to write file");
702 println!("created a temp file for kafka test: {:?}", file.path());
703 file
704}
705
706#[cfg(test)]
707mod tests {
708 use std::fmt::Debug;
709
710 use expect_test::{Expect, expect};
711
712 use super::*;
713
714 fn check(actual: impl Debug, expect: Expect) {
715 let actual = format!("{:#?}", actual);
716 expect.assert_eq(&actual);
717 }
718
719 #[test]
720 fn test_extract_sql_command() {
721 check(
722 extract_sql_command("create table t as select 1;"),
723 expect![[r#"
724 Ok(
725 Create {
726 is_create_table_as: true,
727 },
728 )"#]],
729 );
730 check(
731 extract_sql_command(" create table t (a int);"),
732 expect![[r#"
733 Ok(
734 Create {
735 is_create_table_as: false,
736 },
737 )"#]],
738 );
739 check(
740 extract_sql_command(" create materialized view m_1 as select 1;"),
741 expect![[r#"
742 Ok(
743 CreateMaterializedView {
744 name: "m_1",
745 },
746 )"#]],
747 );
748 check(
749 extract_sql_command("set background_ddl= true;"),
750 expect![[r#"
751 Ok(
752 SetBackgroundDdl {
753 enable: true,
754 },
755 )"#]],
756 );
757 check(
758 extract_sql_command("SET BACKGROUND_DDL=true;"),
759 expect![[r#"
760 Ok(
761 SetBackgroundDdl {
762 enable: true,
763 },
764 )"#]],
765 );
766 check(
767 extract_sql_command("CREATE MATERIALIZED VIEW if not exists m_1 as select 1;"),
768 expect![[r#"
769 Ok(
770 CreateMaterializedView {
771 name: "m_1",
772 },
773 )"#]],
774 )
775 }
776}