1use std::cmp::min;
16use std::path::Path;
17use std::sync::Arc;
18use std::time::Duration;
19
20use anyhow::Result;
21use rand::{Rng, rng as thread_rng};
22use sqllogictest::substitution::well_known;
23use sqllogictest::{
24 Condition, ParallelTestError, Partitioner, QueryExpect, Record, StatementExpect,
25};
26
27use crate::client::RisingWave;
28use crate::cluster::Cluster;
29use crate::parse::extract_sql_command;
30use crate::slt::background_ddl_mode::*;
31use crate::slt::runner::{random_vnode_count, run_kill_not_allowed, run_no_kill};
32use crate::slt::slt_env::{Env, Opts};
33use crate::slt::vnode_mode::*;
34use crate::utils::TimedExt;
35use crate::{evaluate_skip, wait_or_retry_background_ddl};
36
37const MAX_RETRY: usize = 10;
39
40#[derive(Debug, PartialEq, Eq)]
41pub enum SqlCmd {
42 Create {
44 is_create_table_as: bool,
45 },
46 CreateMaterializedView {
48 name: String,
49 },
50 SetBackgroundDdl {
52 enable: bool,
53 },
54 Drop,
55 Dml,
56 Flush,
57 Others,
58}
59
60impl SqlCmd {
61 fn allow_kill(&self) -> bool {
62 matches!(
63 self,
64 SqlCmd::Create {
65 is_create_table_as: false,
67 ..
68 } | SqlCmd::CreateMaterializedView { .. }
69 | SqlCmd::Drop
70 )
71 }
76
77 fn is_create(&self) -> bool {
78 matches!(
79 self,
80 SqlCmd::Create { .. } | SqlCmd::CreateMaterializedView { .. }
81 )
82 }
83}
84
85const KILL_IGNORE_FILES: &[&str] = &[
86 "tpch_snapshot.slt",
88 "tpch_upstream.slt",
89 "search_path.slt",
91 "transaction/now.slt",
93 "transaction/read_only_multi_conn.slt",
94 "transaction/read_only.slt",
95 "transaction/tolerance.slt",
96 "transaction/cursor.slt",
97 "transaction/cursor_multi_conn.slt",
98];
99
100mod background_ddl_mode {
102 use anyhow::bail;
103 use rand::Rng;
104 use rand_chacha::ChaChaRng;
105 use sqllogictest::{Condition, Record, StatementExpect};
106
107 use crate::client::RisingWave;
108 use crate::slt::SqlCmd;
109 use crate::slt::slt_env::Env;
110
111 pub(super) async fn wait_background_mv_finished(mview_name: &str) -> anyhow::Result<()> {
113 let Ok(rw) = RisingWave::connect("frontend".into(), "dev".into()).await else {
114 bail!("failed to connect to frontend for {mview_name}");
115 };
116 let client = rw.pg_client();
117 if client.simple_query("WAIT;").await.is_err() {
118 bail!("failed to wait for background mv to finish creating for {mview_name}");
119 }
120
121 let Ok(result) = client
122 .query(
123 "select count(*) from pg_matviews where matviewname=$1;",
124 &[&mview_name],
125 )
126 .await
127 else {
128 bail!("failed to query pg_matviews for {mview_name}");
129 };
130
131 match result[0].try_get::<_, i64>(0) {
132 Ok(1) => Ok(()),
133 r => bail!("expected 1 row in pg_matviews, got {r:#?} instead for {mview_name}"),
134 }
135 }
136
137 pub(super) async fn set_background_ddl<D, M, T>(
139 tester: &mut sqllogictest::Runner<D, M>,
140 env: &Env,
141 record: &Record<T>,
142 cmd: &SqlCmd,
143 manual_background_ddl_enabled: bool,
144 rng: &mut ChaChaRng,
145 background_ddl_enabled: &mut bool,
146 ) where
147 D: sqllogictest::AsyncDB<ColumnType = T>,
148 M: sqllogictest::MakeConnection<Conn = D>,
149 T: sqllogictest::ColumnType,
150 {
151 if let Record::Statement {
152 loc,
153 conditions,
154 connection,
155 ..
156 } = &record
157 && matches!(cmd, SqlCmd::CreateMaterializedView { .. })
158 && !manual_background_ddl_enabled
159 && conditions.iter().all(|c| {
160 *c != Condition::SkipIf {
161 label: "madsim".to_owned(),
162 }
163 })
164 && env.background_ddl_rate() > 0.0
165 {
166 let background_ddl_setting = rng.random_bool(env.background_ddl_rate());
167 let set_background_ddl = Record::Statement {
168 loc: loc.clone(),
169 conditions: conditions.clone(),
170 connection: connection.clone(),
171 expected: StatementExpect::Ok,
172 sql: format!("SET BACKGROUND_DDL={background_ddl_setting};"),
173 retry: None,
174 };
175 tester.run_async(set_background_ddl).await.unwrap();
176 *background_ddl_enabled = background_ddl_setting;
177 };
178 }
179
180 #[macro_export]
182 macro_rules! wait_or_retry_background_ddl {
183 ($record:expr, $name:expr, $iteration:expr) => {
184 let record = $record;
185 let i = $iteration;
186 tracing::debug!(iteration = i, "Retry for background ddl");
187 match wait_background_mv_finished($name).await {
188 Ok(_) => {
189 tracing::debug!(
190 iteration = i,
191 "Record with background_ddl {:?} finished",
192 record
193 );
194 break;
195 }
196 Err(err) => {
197 tracing::error!(
198 iteration = i,
199 ?err,
200 "failed to wait for background mv to finish creating"
201 );
202 if i >= MAX_RETRY {
203 panic!("failed to run test after retry {i} times, error={err:#?}");
204 }
205 continue;
206 }
207 }
208 };
209 }
210}
211
212mod vnode_mode {
213 use std::env;
214 use std::hash::{DefaultHasher, Hash, Hasher};
215 use std::sync::LazyLock;
216
217 use anyhow::bail;
218 use sqllogictest::Partitioner;
219
220 #[derive(Clone)]
222 pub(super) struct HashPartitioner {
223 count: u64,
224 id: u64,
225 }
226
227 impl HashPartitioner {
228 pub(super) fn new(count: u64, id: u64) -> anyhow::Result<Self> {
229 if count == 0 {
230 bail!("partition count must be greater than zero");
231 }
232 if id >= count {
233 bail!("partition id (zero-based) must be less than count");
234 }
235 Ok(Self { count, id })
236 }
237 }
238
239 impl Partitioner for HashPartitioner {
240 fn matches(&self, file_name: &str) -> bool {
241 let mut hasher = DefaultHasher::new();
242 file_name.hash(&mut hasher);
243 hasher.finish() % self.count == self.id
244 }
245 }
246
247 pub(super) static PARTITIONER: LazyLock<Option<HashPartitioner>> = LazyLock::new(|| {
248 let count = env::var("BUILDKITE_PARALLEL_JOB_COUNT")
249 .ok()?
250 .parse::<u64>()
251 .unwrap();
252 let id = env::var("BUILDKITE_PARALLEL_JOB")
253 .ok()?
254 .parse::<u64>()
255 .unwrap();
256 Some(HashPartitioner::new(count, id).unwrap())
257 });
258}
259
260pub mod slt_env {
261 use rand::SeedableRng;
262 use rand_chacha::ChaChaRng;
263
264 use crate::cluster::KillOpts;
265
266 pub struct Opts {
267 pub kill_opts: KillOpts,
268 pub background_ddl_rate: f64,
270 pub random_vnode_count: bool,
272 }
273
274 pub(super) struct Env {
275 opts: Opts,
276 }
277
278 impl Env {
279 pub fn new(opts: Opts) -> Self {
280 Self { opts }
281 }
282
283 pub fn get_rng() -> ChaChaRng {
284 let seed = std::env::var("MADSIM_TEST_SEED")
285 .unwrap_or("0".to_owned())
286 .parse::<u64>()
287 .unwrap();
288 ChaChaRng::seed_from_u64(seed)
289 }
290
291 pub fn background_ddl_rate(&self) -> f64 {
292 self.opts.background_ddl_rate
293 }
294
295 pub fn kill(&self) -> bool {
296 self.opts.kill_opts.kill_compute
297 || self.opts.kill_opts.kill_meta
298 || self.opts.kill_opts.kill_frontend
299 || self.opts.kill_opts.kill_compactor
300 }
301
302 pub fn random_vnode_count(&self) -> bool {
303 self.opts.random_vnode_count
304 }
305
306 pub fn kill_opts(&self) -> KillOpts {
307 self.opts.kill_opts
308 }
309
310 pub fn kill_rate(&self) -> f64 {
311 self.opts.kill_opts.kill_rate as f64
312 }
313 }
314}
315
316mod runner {
317 use std::time::Duration;
318
319 use rand::prelude::IteratorRandom;
320 use rand::rng as thread_rng;
321 use sqllogictest::{Record, StatementExpect, TestError};
322
323 use crate::slt::slt_env::Env;
324 use crate::slt::{MAX_RETRY, SqlCmd};
325 use crate::utils::TimedExt;
326 #[macro_export]
327 macro_rules! evaluate_skip {
328 ($env:expr, $path:expr) => {
329 if let Some(partitioner) = PARTITIONER.as_ref()
330 && !partitioner.matches($path.to_str().unwrap())
331 {
332 println!("[skip partition] {}", $path.display());
333 continue;
334 } else if $env.kill() && KILL_IGNORE_FILES.iter().any(|s| $path.ends_with(s)) {
335 println!("[skip kill] {}", $path.display());
336 continue;
337 } else {
338 println!("[run] {}", $path.display());
339 }
340 };
341 }
342
343 pub(super) fn random_vnode_count<T: sqllogictest::ColumnType>(
344 env: &Env,
345 records: &[Record<T>],
346 ) -> bool {
347 env.random_vnode_count()
348 && records.iter().all(|record| {
349 if let Record::Statement { sql, .. } | Record::Query { sql, .. } = record
350 && sql.to_lowercase().contains("parallelism")
351 {
352 println!("[RANDOM VNODE COUNT] skip: {}", sql);
353 false
354 } else {
355 true
356 }
357 })
358 }
359
360 pub(super) async fn run_no_kill<D, M, T>(
362 tester: &mut sqllogictest::Runner<D, M>,
363 random_vnode_count: bool,
364 cmd: &SqlCmd,
365 record: Record<T>,
366 ) -> Result<(), TestError>
367 where
368 D: sqllogictest::AsyncDB<ColumnType = T>,
369 M: sqllogictest::MakeConnection<Conn = D>,
370 T: sqllogictest::ColumnType,
371 {
372 if random_vnode_count
374 && cmd.is_create()
375 && let Record::Statement {
376 loc,
377 conditions,
378 connection,
379 ..
380 } = &record
381 {
382 let vnode_count = (2..=64) .chain(224..=288) .chain(992..=1056) .choose(&mut thread_rng())
386 .unwrap();
387 let sql = format!("SET STREAMING_MAX_PARALLELISM = {vnode_count};");
388 println!("[RANDOM VNODE COUNT] set: {vnode_count}");
389 let set_random_vnode_count = Record::Statement {
390 loc: loc.clone(),
391 conditions: conditions.clone(),
392 connection: connection.clone(),
393 sql,
394 expected: StatementExpect::Ok,
395 retry: None,
396 };
397 tester.run_async(set_random_vnode_count).await.unwrap();
398 println!("[RANDOM VNODE COUNT] run: {record}");
399 }
400
401 tester.run_async(record).await.map(|_| ())
402 }
403
404 pub(super) async fn run_kill_not_allowed<D, M, T>(
406 tester: &mut sqllogictest::Runner<D, M>,
407 record: Record<T>,
408 ) where
409 D: sqllogictest::AsyncDB<ColumnType = T>,
410 M: sqllogictest::MakeConnection<Conn = D>,
411 T: sqllogictest::ColumnType,
412 {
413 for i in 0usize.. {
414 let delay = Duration::from_secs(1 << i);
415 if let Err(err) = tester
416 .run_async(record.clone())
417 .timed(|_res, elapsed| {
418 tracing::debug!("Record {:?} finished in {:?}", record, elapsed)
419 })
420 .await
421 {
422 let err_string = err.to_string();
423 let allowed_errs = [
426 "no reader for dml in table",
427 "error reading a body from connection: broken pipe",
428 "failed to inject barrier",
429 "get error from control stream",
430 "cluster is under recovering",
431 ];
432 let should_retry = i < MAX_RETRY
433 && allowed_errs
434 .iter()
435 .any(|allowed_err| err_string.contains(allowed_err));
436 if !should_retry {
437 panic!("{}", err);
438 }
439 tracing::error!("failed to run test: {err}\nretry after {delay:?}");
440 } else {
441 break;
442 }
443 tokio::time::sleep(delay).await;
444 }
445 }
446}
447
448pub async fn run_slt_task(cluster: Arc<Cluster>, glob: &str, opts: Opts) {
450 let env = slt_env::Env::new(opts);
451 tracing::info!("background_ddl_rate: {}", env.background_ddl_rate());
452 let mut rng = Env::get_rng();
453 let files = glob::glob(glob).expect("failed to read glob pattern");
454 for file in files {
455 let mut tester =
457 sqllogictest::Runner::new(|| RisingWave::connect("frontend".into(), "dev".into()));
458 tester.add_label("madsim");
459 tester.set_var(well_known::DATABASE.to_owned(), "dev".to_owned());
460
461 let file = file.unwrap();
462 let path = file.as_path();
463
464 evaluate_skip!(env, path);
465
466 let tempfile = (path.ends_with("kafka.slt") || path.ends_with("kafka_batch.slt"))
468 .then(|| hack_kafka_test(path));
469 let path = tempfile.as_ref().map(|p| p.path()).unwrap_or(path);
470
471 let mut background_ddl_enabled = false;
473
474 let mut manual_background_ddl_enabled = false;
477
478 let records = sqllogictest::parse_file(path).expect("failed to parse file");
479 let random_vnode_count = random_vnode_count(&env, &records);
480
481 for record in records {
482 if let sqllogictest::Record::Halt { .. } = record {
487 break;
488 }
489
490 let cmd = match &record {
491 sqllogictest::Record::Statement {
492 sql, conditions, ..
493 }
494 | sqllogictest::Record::Query {
495 sql, conditions, ..
496 } if conditions
497 .iter()
498 .all(|c| !matches!(c, Condition::SkipIf{ label } if label == "madsim"))
499 && !conditions
500 .iter()
501 .any(|c| matches!(c, Condition::OnlyIf{ label} if label != "madsim" )) =>
502 {
503 extract_sql_command(sql).unwrap_or(SqlCmd::Others)
504 }
505 _ => SqlCmd::Others,
506 };
507
508 if !env.kill() {
510 match run_no_kill(&mut tester, random_vnode_count, &cmd, record).await {
511 Ok(_) => continue,
512 Err(e) => panic!("{}", e),
513 }
514 }
515
516 tracing::debug!(?cmd, "Running");
518
519 if let SqlCmd::SetBackgroundDdl { enable } = cmd {
520 manual_background_ddl_enabled = enable;
521 background_ddl_enabled = enable;
522 }
523
524 set_background_ddl(
526 &mut tester,
527 &env,
528 &record,
529 &cmd,
530 manual_background_ddl_enabled,
531 &mut rng,
532 &mut background_ddl_enabled,
533 )
534 .await;
535
536 if !cmd.allow_kill() {
537 run_kill_not_allowed(&mut tester, record.clone()).await;
538 continue;
539 }
540
541 let should_kill = thread_rng().random_bool(env.kill_rate());
542 let handle = if should_kill {
544 let cluster = cluster.clone();
545 let opts = env.kill_opts();
546 Some(tokio::spawn(async move {
547 let t = thread_rng().random_range(Duration::default()..Duration::from_secs(1));
548 tokio::time::sleep(t).await;
549 cluster.kill_node(&opts).await;
550 tokio::time::sleep(Duration::from_secs(15)).await;
551 }))
552 } else {
553 None
554 };
555
556 for i in 0usize.. {
557 tracing::debug!(iteration = i, "retry count");
558 let delay = Duration::from_secs(min(1 << i, 10));
559 if i > 0 {
560 tokio::time::sleep(delay).await;
561 }
562 match tester
563 .run_async(record.clone())
564 .timed(|_res, elapsed| {
565 tracing::debug!("Record {:?} finished in {:?}", record, elapsed)
566 })
567 .await
568 {
569 Ok(_) => {
570 if let SqlCmd::CreateMaterializedView { ref name } = cmd
572 && background_ddl_enabled
573 && !matches!(
574 record,
575 Record::Statement {
576 expected: StatementExpect::Error(_),
577 ..
578 } | Record::Query {
579 expected: QueryExpect::Error(_),
580 ..
581 }
582 )
583 {
584 wait_or_retry_background_ddl!(&record, name, i);
585 }
586 break;
587 }
588 Err(e) => {
589 match cmd {
590 SqlCmd::Create {
592 is_create_table_as: false,
593 }
594 | SqlCmd::CreateMaterializedView { .. }
595 if i != 0
596 && let e = e.to_string()
597 && !e.contains("gRPC request to meta service failed")
600 && e.contains("exists")
601 && !e.contains("under creation")
602 && e.contains("Catalog error") =>
603 {
604 break;
605 }
606 SqlCmd::Drop
608 if i != 0
609 && e.to_string().contains("not found")
610 && e.to_string().contains("Catalog error") =>
611 {
612 break;
613 }
614
615 _ if i >= MAX_RETRY => {
617 panic!("failed to run test after retry {i} times: {e}")
618 }
619 SqlCmd::CreateMaterializedView { ref name }
620 if i != 0
621 && e.to_string().contains("table is in creating procedure")
622 && background_ddl_enabled =>
623 {
624 wait_or_retry_background_ddl!(&record, name, i);
625 }
626 _ => tracing::error!(
627 iteration = i,
628 "failed to run test: {e}\nretry after {delay:?}"
629 ),
630 }
631 }
632 }
633 }
634 if let SqlCmd::SetBackgroundDdl { enable } = cmd {
635 background_ddl_enabled = enable;
636 };
637 if let Some(handle) = handle {
638 handle.await.unwrap();
639 }
640 }
641 }
642}
643
644pub async fn run_parallel_slt_task(glob: &str, jobs: usize) -> Result<(), ParallelTestError> {
645 let mut tester =
646 sqllogictest::Runner::new(|| RisingWave::connect("frontend".into(), "dev".into()));
647 tester.add_label("madsim");
648
649 if let Some(partitioner) = PARTITIONER.as_ref() {
650 tester.with_partitioner(partitioner.clone());
651 }
652
653 tester
654 .run_parallel_async(
655 glob,
656 vec!["frontend".into()],
657 |host, dbname| async move { RisingWave::connect(host, dbname).await.unwrap() },
658 jobs,
659 )
660 .await
661 .map_err(|e| panic!("{e}"))
662}
663
664fn hack_kafka_test(path: &Path) -> tempfile::NamedTempFile {
666 let content = std::fs::read_to_string(path).expect("failed to read file");
667 let simple_avsc_full_path =
668 std::fs::canonicalize("src/connector/src/test_data/simple-schema.avsc")
669 .expect("failed to get schema path");
670 let complex_avsc_full_path =
671 std::fs::canonicalize("src/connector/src/test_data/complex-schema.avsc")
672 .expect("failed to get schema path");
673 let json_schema_full_path =
674 std::fs::canonicalize("src/connector/src/test_data/complex-schema.json")
675 .expect("failed to get schema path");
676 let content = content
677 .replace("127.0.0.1:29092", "192.168.11.1:29092")
678 .replace("localhost:29092", "192.168.11.1:29092")
679 .replace(
680 "/risingwave/avro-simple-schema.avsc",
681 simple_avsc_full_path.to_str().unwrap(),
682 )
683 .replace(
684 "/risingwave/avro-complex-schema.avsc",
685 complex_avsc_full_path.to_str().unwrap(),
686 )
687 .replace(
688 "/risingwave/json-complex-schema",
689 json_schema_full_path.to_str().unwrap(),
690 );
691 let file = tempfile::NamedTempFile::new().expect("failed to create temp file");
692 std::fs::write(file.path(), content).expect("failed to write file");
693 println!("created a temp file for kafka test: {:?}", file.path());
694 file
695}
696
697#[cfg(test)]
698mod tests {
699 use std::fmt::Debug;
700
701 use expect_test::{Expect, expect};
702
703 use super::*;
704
705 fn check(actual: impl Debug, expect: Expect) {
706 let actual = format!("{:#?}", actual);
707 expect.assert_eq(&actual);
708 }
709
710 #[test]
711 fn test_extract_sql_command() {
712 check(
713 extract_sql_command("create table t as select 1;"),
714 expect![[r#"
715 Ok(
716 Create {
717 is_create_table_as: true,
718 },
719 )"#]],
720 );
721 check(
722 extract_sql_command(" create table t (a int);"),
723 expect![[r#"
724 Ok(
725 Create {
726 is_create_table_as: false,
727 },
728 )"#]],
729 );
730 check(
731 extract_sql_command(" create materialized view m_1 as select 1;"),
732 expect![[r#"
733 Ok(
734 CreateMaterializedView {
735 name: "m_1",
736 },
737 )"#]],
738 );
739 check(
740 extract_sql_command("set background_ddl= true;"),
741 expect![[r#"
742 Ok(
743 SetBackgroundDdl {
744 enable: true,
745 },
746 )"#]],
747 );
748 check(
749 extract_sql_command("SET BACKGROUND_DDL=true;"),
750 expect![[r#"
751 Ok(
752 SetBackgroundDdl {
753 enable: true,
754 },
755 )"#]],
756 );
757 check(
758 extract_sql_command("CREATE MATERIALIZED VIEW if not exists m_1 as select 1;"),
759 expect![[r#"
760 Ok(
761 CreateMaterializedView {
762 name: "m_1",
763 },
764 )"#]],
765 )
766 }
767}