1use std::cmp::min;
16use std::path::Path;
17use std::sync::Arc;
18use std::time::Duration;
19
20use anyhow::Result;
21use rand::{Rng, rng as thread_rng};
22use sqllogictest::substitution::well_known;
23use sqllogictest::{
24 Condition, ParallelTestError, Partitioner, QueryExpect, Record, StatementExpect,
25};
26
27use crate::client::RisingWave;
28use crate::cluster::Cluster;
29use crate::parse::extract_sql_command;
30use crate::slt::background_ddl_mode::*;
31use crate::slt::runner::{random_vnode_count, run_kill_not_allowed, run_no_kill};
32use crate::slt::slt_env::{Env, Opts};
33use crate::slt::vnode_mode::*;
34use crate::utils::TimedExt;
35use crate::{evaluate_skip, wait_or_retry_background_ddl};
36
37const MAX_RETRY: usize = 10;
39
40#[derive(Debug, PartialEq, Eq)]
41pub enum SqlCmd {
42 Create {
44 is_create_table_as: bool,
45 },
46 CreateSink {
48 is_sink_into_table: bool,
49 },
50 CreateMaterializedView {
52 name: String,
53 },
54 SetBackgroundDdl {
56 enable: bool,
57 },
58 Drop,
59 Dml,
60 Flush,
61 Others,
62}
63
64impl SqlCmd {
65 fn allow_kill(&self) -> bool {
66 matches!(
67 self,
68 SqlCmd::Create {
69 is_create_table_as: false,
71 ..
72 } | SqlCmd::CreateSink {
73 is_sink_into_table: false,
74 } | SqlCmd::CreateMaterializedView { .. }
75 | SqlCmd::Drop
76 )
77 }
82
83 fn is_create(&self) -> bool {
84 matches!(
85 self,
86 SqlCmd::Create { .. }
87 | SqlCmd::CreateSink { .. }
88 | SqlCmd::CreateMaterializedView { .. }
89 )
90 }
91}
92
93const KILL_IGNORE_FILES: &[&str] = &[
94 "tpch_snapshot.slt",
96 "tpch_upstream.slt",
97 "search_path.slt",
99 "transaction/now.slt",
101 "transaction/read_only_multi_conn.slt",
102 "transaction/read_only.slt",
103 "transaction/tolerance.slt",
104 "transaction/cursor.slt",
105 "transaction/cursor_multi_conn.slt",
106];
107
108mod background_ddl_mode {
110 use anyhow::bail;
111 use rand::Rng;
112 use rand_chacha::ChaChaRng;
113 use sqllogictest::{Condition, Record, StatementExpect};
114
115 use crate::client::RisingWave;
116 use crate::slt::SqlCmd;
117 use crate::slt::slt_env::Env;
118
119 pub(super) async fn wait_background_mv_finished(mview_name: &str) -> anyhow::Result<()> {
121 let Ok(rw) = RisingWave::connect("frontend".into(), "dev".into()).await else {
122 bail!("failed to connect to frontend for {mview_name}");
123 };
124 let client = rw.pg_client();
125 if client.simple_query("WAIT;").await.is_err() {
126 bail!("failed to wait for background mv to finish creating for {mview_name}");
127 }
128
129 let Ok(result) = client
130 .query(
131 "select count(*) from pg_matviews where matviewname=$1;",
132 &[&mview_name],
133 )
134 .await
135 else {
136 bail!("failed to query pg_matviews for {mview_name}");
137 };
138
139 match result[0].try_get::<_, i64>(0) {
140 Ok(1) => Ok(()),
141 r => bail!("expected 1 row in pg_matviews, got {r:#?} instead for {mview_name}"),
142 }
143 }
144
145 pub(super) async fn set_background_ddl<D, M, T>(
147 tester: &mut sqllogictest::Runner<D, M>,
148 env: &Env,
149 record: &Record<T>,
150 cmd: &SqlCmd,
151 manual_background_ddl_enabled: bool,
152 rng: &mut ChaChaRng,
153 background_ddl_enabled: &mut bool,
154 ) where
155 D: sqllogictest::AsyncDB<ColumnType = T>,
156 M: sqllogictest::MakeConnection<Conn = D>,
157 T: sqllogictest::ColumnType,
158 {
159 if let Record::Statement {
160 loc,
161 conditions,
162 connection,
163 ..
164 } = &record
165 && matches!(cmd, SqlCmd::CreateMaterializedView { .. })
166 && !manual_background_ddl_enabled
167 && conditions.iter().all(|c| {
168 *c != Condition::SkipIf {
169 label: "madsim".to_owned(),
170 }
171 })
172 && env.background_ddl_rate() > 0.0
173 {
174 let background_ddl_setting = rng.random_bool(env.background_ddl_rate());
175 let set_background_ddl = Record::Statement {
176 loc: loc.clone(),
177 conditions: conditions.clone(),
178 connection: connection.clone(),
179 expected: StatementExpect::Ok,
180 sql: format!("SET BACKGROUND_DDL={background_ddl_setting};"),
181 retry: None,
182 };
183 tester.run_async(set_background_ddl).await.unwrap();
184 *background_ddl_enabled = background_ddl_setting;
185 };
186 }
187
188 #[macro_export]
190 macro_rules! wait_or_retry_background_ddl {
191 ($record:expr, $name:expr, $iteration:expr) => {
192 let record = $record;
193 let i = $iteration;
194 tracing::debug!(iteration = i, "Retry for background ddl");
195 match wait_background_mv_finished($name).await {
196 Ok(_) => {
197 tracing::debug!(
198 iteration = i,
199 "Record with background_ddl {:?} finished",
200 record
201 );
202 break;
203 }
204 Err(err) => {
205 tracing::error!(
206 iteration = i,
207 ?err,
208 "failed to wait for background mv to finish creating"
209 );
210 if i >= MAX_RETRY {
211 panic!("failed to run test after retry {i} times, error={err:#?}");
212 }
213 continue;
214 }
215 }
216 };
217 }
218}
219
220mod vnode_mode {
221 use std::env;
222 use std::hash::{DefaultHasher, Hash, Hasher};
223 use std::sync::LazyLock;
224
225 use anyhow::bail;
226 use sqllogictest::Partitioner;
227
228 #[derive(Clone)]
230 pub(super) struct HashPartitioner {
231 count: u64,
232 id: u64,
233 }
234
235 impl HashPartitioner {
236 pub(super) fn new(count: u64, id: u64) -> anyhow::Result<Self> {
237 if count == 0 {
238 bail!("partition count must be greater than zero");
239 }
240 if id >= count {
241 bail!("partition id (zero-based) must be less than count");
242 }
243 Ok(Self { count, id })
244 }
245 }
246
247 impl Partitioner for HashPartitioner {
248 fn matches(&self, file_name: &str) -> bool {
249 let mut hasher = DefaultHasher::new();
250 file_name.hash(&mut hasher);
251 hasher.finish() % self.count == self.id
252 }
253 }
254
255 pub(super) static PARTITIONER: LazyLock<Option<HashPartitioner>> = LazyLock::new(|| {
256 let count = env::var("BUILDKITE_PARALLEL_JOB_COUNT")
257 .ok()?
258 .parse::<u64>()
259 .unwrap();
260 let id = env::var("BUILDKITE_PARALLEL_JOB")
261 .ok()?
262 .parse::<u64>()
263 .unwrap();
264 Some(HashPartitioner::new(count, id).unwrap())
265 });
266}
267
268pub mod slt_env {
269 use rand::SeedableRng;
270 use rand_chacha::ChaChaRng;
271
272 use crate::cluster::KillOpts;
273
274 pub struct Opts {
275 pub kill_opts: KillOpts,
276 pub background_ddl_rate: f64,
278 pub random_vnode_count: bool,
280 }
281
282 pub(super) struct Env {
283 opts: Opts,
284 }
285
286 impl Env {
287 pub fn new(opts: Opts) -> Self {
288 Self { opts }
289 }
290
291 pub fn get_rng() -> ChaChaRng {
292 let seed = std::env::var("MADSIM_TEST_SEED")
293 .unwrap_or("0".to_owned())
294 .parse::<u64>()
295 .unwrap();
296 ChaChaRng::seed_from_u64(seed)
297 }
298
299 pub fn background_ddl_rate(&self) -> f64 {
300 self.opts.background_ddl_rate
301 }
302
303 pub fn kill(&self) -> bool {
304 self.opts.kill_opts.kill_compute
305 || self.opts.kill_opts.kill_meta
306 || self.opts.kill_opts.kill_frontend
307 || self.opts.kill_opts.kill_compactor
308 }
309
310 pub fn random_vnode_count(&self) -> bool {
311 self.opts.random_vnode_count
312 }
313
314 pub fn kill_opts(&self) -> KillOpts {
315 self.opts.kill_opts
316 }
317
318 pub fn kill_rate(&self) -> f64 {
319 self.opts.kill_opts.kill_rate as f64
320 }
321 }
322}
323
324mod runner {
325 use std::time::Duration;
326
327 use rand::prelude::IteratorRandom;
328 use rand::rng as thread_rng;
329 use sqllogictest::{Record, StatementExpect, TestError};
330
331 use crate::slt::slt_env::Env;
332 use crate::slt::{MAX_RETRY, SqlCmd};
333 use crate::utils::TimedExt;
334 #[macro_export]
335 macro_rules! evaluate_skip {
336 ($env:expr, $path:expr) => {
337 if let Some(partitioner) = PARTITIONER.as_ref()
338 && !partitioner.matches($path.to_str().unwrap())
339 {
340 println!("[skip partition] {}", $path.display());
341 continue;
342 } else if $env.kill() && KILL_IGNORE_FILES.iter().any(|s| $path.ends_with(s)) {
343 println!("[skip kill] {}", $path.display());
344 continue;
345 } else {
346 println!("[run] {}", $path.display());
347 }
348 };
349 }
350
351 pub(super) fn random_vnode_count<T: sqllogictest::ColumnType>(
352 env: &Env,
353 records: &[Record<T>],
354 ) -> bool {
355 env.random_vnode_count()
356 && records.iter().all(|record| {
357 if let Record::Statement { sql, .. } | Record::Query { sql, .. } = record
358 && sql.to_lowercase().contains("parallelism")
359 {
360 println!("[RANDOM VNODE COUNT] skip: {}", sql);
361 false
362 } else {
363 true
364 }
365 })
366 }
367
368 pub(super) async fn run_no_kill<D, M, T>(
370 tester: &mut sqllogictest::Runner<D, M>,
371 random_vnode_count: bool,
372 cmd: &SqlCmd,
373 record: Record<T>,
374 ) -> Result<(), TestError>
375 where
376 D: sqllogictest::AsyncDB<ColumnType = T>,
377 M: sqllogictest::MakeConnection<Conn = D>,
378 T: sqllogictest::ColumnType,
379 {
380 if random_vnode_count
382 && cmd.is_create()
383 && let Record::Statement {
384 loc,
385 conditions,
386 connection,
387 ..
388 } = &record
389 {
390 let vnode_count = (2..=64) .chain(224..=288) .chain(992..=1056) .choose(&mut thread_rng())
394 .unwrap();
395 let sql = format!("SET STREAMING_MAX_PARALLELISM = {vnode_count};");
396 println!("[RANDOM VNODE COUNT] set: {vnode_count}");
397 let set_random_vnode_count = Record::Statement {
398 loc: loc.clone(),
399 conditions: conditions.clone(),
400 connection: connection.clone(),
401 sql,
402 expected: StatementExpect::Ok,
403 retry: None,
404 };
405 tester.run_async(set_random_vnode_count).await.unwrap();
406 println!("[RANDOM VNODE COUNT] run: {record}");
407 }
408
409 tester.run_async(record).await.map(|_| ())
410 }
411
412 pub(super) async fn run_kill_not_allowed<D, M, T>(
414 tester: &mut sqllogictest::Runner<D, M>,
415 record: Record<T>,
416 ) where
417 D: sqllogictest::AsyncDB<ColumnType = T>,
418 M: sqllogictest::MakeConnection<Conn = D>,
419 T: sqllogictest::ColumnType,
420 {
421 for i in 0usize.. {
422 let delay = Duration::from_secs(1 << i);
423 if let Err(err) = tester
424 .run_async(record.clone())
425 .timed(|_res, elapsed| {
426 tracing::debug!("Record {:?} finished in {:?}", record, elapsed)
427 })
428 .await
429 {
430 let err_string = err.to_string();
431 let allowed_errs = [
434 "no reader for dml in table",
435 "error reading a body from connection: broken pipe",
436 "failed to inject barrier",
437 "get error from control stream",
438 "cluster is under recovering",
439 ];
440 let should_retry = i < MAX_RETRY
441 && allowed_errs
442 .iter()
443 .any(|allowed_err| err_string.contains(allowed_err));
444 if !should_retry {
445 panic!("{}", err);
446 }
447 tracing::error!("failed to run test: {err}\nretry after {delay:?}");
448 } else {
449 break;
450 }
451 tokio::time::sleep(delay).await;
452 }
453 }
454}
455
456pub async fn run_slt_task(cluster: Arc<Cluster>, glob: &str, opts: Opts) {
458 let env = slt_env::Env::new(opts);
459 tracing::info!("background_ddl_rate: {}", env.background_ddl_rate());
460 let mut rng = Env::get_rng();
461 let files = glob::glob(glob).expect("failed to read glob pattern");
462 for file in files {
463 let mut tester =
465 sqllogictest::Runner::new(|| RisingWave::connect("frontend".into(), "dev".into()));
466 tester.add_label("madsim");
467 tester.set_var(well_known::DATABASE.to_owned(), "dev".to_owned());
468
469 let file = file.unwrap();
470 let path = file.as_path();
471
472 evaluate_skip!(env, path);
473
474 let tempfile = (path.ends_with("kafka.slt") || path.ends_with("kafka_batch.slt"))
476 .then(|| hack_kafka_test(path));
477 let path = tempfile.as_ref().map(|p| p.path()).unwrap_or(path);
478
479 let mut background_ddl_enabled = false;
481
482 let mut manual_background_ddl_enabled = false;
485
486 let records = sqllogictest::parse_file(path).expect("failed to parse file");
487 let random_vnode_count = random_vnode_count(&env, &records);
488
489 for record in records {
490 if let sqllogictest::Record::Halt { .. } = record {
495 break;
496 }
497
498 let cmd = match &record {
499 sqllogictest::Record::Statement {
500 sql, conditions, ..
501 }
502 | sqllogictest::Record::Query {
503 sql, conditions, ..
504 } if conditions
505 .iter()
506 .all(|c| !matches!(c, Condition::SkipIf{ label } if label == "madsim"))
507 && !conditions
508 .iter()
509 .any(|c| matches!(c, Condition::OnlyIf{ label} if label != "madsim" )) =>
510 {
511 extract_sql_command(sql).unwrap_or(SqlCmd::Others)
512 }
513 _ => SqlCmd::Others,
514 };
515
516 if !env.kill() {
518 match run_no_kill(&mut tester, random_vnode_count, &cmd, record).await {
519 Ok(_) => continue,
520 Err(e) => panic!("{}", e),
521 }
522 }
523
524 tracing::debug!(?cmd, "Running");
526
527 if let SqlCmd::SetBackgroundDdl { enable } = cmd {
528 manual_background_ddl_enabled = enable;
529 background_ddl_enabled = enable;
530 }
531
532 set_background_ddl(
534 &mut tester,
535 &env,
536 &record,
537 &cmd,
538 manual_background_ddl_enabled,
539 &mut rng,
540 &mut background_ddl_enabled,
541 )
542 .await;
543
544 if !cmd.allow_kill() {
545 run_kill_not_allowed(&mut tester, record.clone()).await;
546 continue;
547 }
548
549 let should_kill = thread_rng().random_bool(env.kill_rate());
550 let handle = if should_kill {
552 let cluster = cluster.clone();
553 let opts = env.kill_opts();
554 Some(tokio::spawn(async move {
555 let t = thread_rng().random_range(Duration::default()..Duration::from_secs(1));
556 tokio::time::sleep(t).await;
557 cluster.kill_node(&opts).await;
558 tokio::time::sleep(Duration::from_secs(15)).await;
559 }))
560 } else {
561 None
562 };
563
564 for i in 0usize.. {
565 tracing::debug!(iteration = i, "retry count");
566 let delay = Duration::from_secs(min(1 << i, 10));
567 if i > 0 {
568 tokio::time::sleep(delay).await;
569 }
570 match tester
571 .run_async(record.clone())
572 .timed(|_res, elapsed| {
573 tracing::debug!("Record {:?} finished in {:?}", record, elapsed)
574 })
575 .await
576 {
577 Ok(_) => {
578 if let SqlCmd::CreateMaterializedView { ref name } = cmd
580 && background_ddl_enabled
581 && !matches!(
582 record,
583 Record::Statement {
584 expected: StatementExpect::Error(_),
585 ..
586 } | Record::Query {
587 expected: QueryExpect::Error(_),
588 ..
589 }
590 )
591 {
592 wait_or_retry_background_ddl!(&record, name, i);
593 }
594 break;
595 }
596 Err(e) => {
597 match cmd {
598 SqlCmd::Create {
600 is_create_table_as: false,
601 }
602 | SqlCmd::CreateSink {
603 is_sink_into_table: false,
604 }
605 | SqlCmd::CreateMaterializedView { .. }
606 if i != 0
607 && let e = e.to_string()
608 && !e.contains("gRPC request to meta service failed")
611 && e.contains("exists")
612 && !e.contains("under creation")
613 && e.contains("Catalog error") =>
614 {
615 break;
616 }
617 SqlCmd::Drop
619 if i != 0
620 && e.to_string().contains("not found")
621 && e.to_string().contains("Catalog error") =>
622 {
623 break;
624 }
625
626 _ if i >= MAX_RETRY => {
628 panic!("failed to run test after retry {i} times: {e}")
629 }
630 SqlCmd::CreateMaterializedView { ref name }
631 if i != 0
632 && e.to_string().contains("table is in creating procedure")
633 && background_ddl_enabled =>
634 {
635 wait_or_retry_background_ddl!(&record, name, i);
636 }
637 _ => tracing::error!(
638 iteration = i,
639 "failed to run test: {e}\nretry after {delay:?}"
640 ),
641 }
642 }
643 }
644 }
645 if let SqlCmd::SetBackgroundDdl { enable } = cmd {
646 background_ddl_enabled = enable;
647 };
648 if let Some(handle) = handle {
649 handle.await.unwrap();
650 }
651 }
652 }
653}
654
655pub async fn run_parallel_slt_task(glob: &str, jobs: usize) -> Result<(), ParallelTestError> {
656 let mut tester =
657 sqllogictest::Runner::new(|| RisingWave::connect("frontend".into(), "dev".into()));
658 tester.add_label("madsim");
659
660 if let Some(partitioner) = PARTITIONER.as_ref() {
661 tester.with_partitioner(partitioner.clone());
662 }
663
664 tester
665 .run_parallel_async(
666 glob,
667 vec!["frontend".into()],
668 |host, dbname| async move { RisingWave::connect(host, dbname).await.unwrap() },
669 jobs,
670 )
671 .await
672 .map_err(|e| panic!("{e}"))
673}
674
675fn hack_kafka_test(path: &Path) -> tempfile::NamedTempFile {
677 let content = std::fs::read_to_string(path).expect("failed to read file");
678 let simple_avsc_full_path =
679 std::fs::canonicalize("src/connector/src/test_data/simple-schema.avsc")
680 .expect("failed to get schema path");
681 let complex_avsc_full_path =
682 std::fs::canonicalize("src/connector/src/test_data/complex-schema.avsc")
683 .expect("failed to get schema path");
684 let json_schema_full_path =
685 std::fs::canonicalize("src/connector/src/test_data/complex-schema.json")
686 .expect("failed to get schema path");
687 let content = content
688 .replace("127.0.0.1:29092", "192.168.11.1:29092")
689 .replace("localhost:29092", "192.168.11.1:29092")
690 .replace(
691 "/risingwave/avro-simple-schema.avsc",
692 simple_avsc_full_path.to_str().unwrap(),
693 )
694 .replace(
695 "/risingwave/avro-complex-schema.avsc",
696 complex_avsc_full_path.to_str().unwrap(),
697 )
698 .replace(
699 "/risingwave/json-complex-schema",
700 json_schema_full_path.to_str().unwrap(),
701 );
702 let file = tempfile::NamedTempFile::new().expect("failed to create temp file");
703 std::fs::write(file.path(), content).expect("failed to write file");
704 println!("created a temp file for kafka test: {:?}", file.path());
705 file
706}
707
708#[cfg(test)]
709mod tests {
710 use std::fmt::Debug;
711
712 use expect_test::{Expect, expect};
713
714 use super::*;
715
716 fn check(actual: impl Debug, expect: Expect) {
717 let actual = format!("{:#?}", actual);
718 expect.assert_eq(&actual);
719 }
720
721 #[test]
722 fn test_extract_sql_command() {
723 check(
724 extract_sql_command("create table t as select 1;"),
725 expect![[r#"
726 Ok(
727 Create {
728 is_create_table_as: true,
729 },
730 )"#]],
731 );
732 check(
733 extract_sql_command(" create table t (a int);"),
734 expect![[r#"
735 Ok(
736 Create {
737 is_create_table_as: false,
738 },
739 )"#]],
740 );
741 check(
742 extract_sql_command(" create materialized view m_1 as select 1;"),
743 expect![[r#"
744 Ok(
745 CreateMaterializedView {
746 name: "m_1",
747 },
748 )"#]],
749 );
750 check(
751 extract_sql_command("set background_ddl= true;"),
752 expect![[r#"
753 Ok(
754 SetBackgroundDdl {
755 enable: true,
756 },
757 )"#]],
758 );
759 check(
760 extract_sql_command("SET BACKGROUND_DDL=true;"),
761 expect![[r#"
762 Ok(
763 SetBackgroundDdl {
764 enable: true,
765 },
766 )"#]],
767 );
768 check(
769 extract_sql_command("CREATE MATERIALIZED VIEW if not exists m_1 as select 1;"),
770 expect![[r#"
771 Ok(
772 CreateMaterializedView {
773 name: "m_1",
774 },
775 )"#]],
776 )
777 }
778}