1use std::cmp::min;
16use std::path::Path;
17use std::sync::Arc;
18use std::time::Duration;
19
20use anyhow::Result;
21use rand::{Rng, rng as thread_rng};
22use sqllogictest::substitution::well_known;
23use sqllogictest::{
24 Condition, ParallelTestError, Partitioner, QueryExpect, Record, StatementExpect,
25};
26
27use crate::client::RisingWave;
28use crate::cluster::Cluster;
29use crate::parse::extract_sql_command;
30use crate::slt::background_ddl_mode::*;
31use crate::slt::runner::{random_vnode_count, run_kill_not_allowed, run_no_kill};
32use crate::slt::slt_env::{Env, Opts};
33use crate::slt::vnode_mode::*;
34use crate::utils::TimedExt;
35use crate::{evaluate_skip, wait_or_retry_background_ddl};
36
37const MAX_RETRY: usize = 10;
39
40#[derive(Debug, PartialEq, Eq)]
41pub enum SqlCmd {
42 Create {
44 is_create_table_as: bool,
45 },
46 CreateMaterializedView {
48 name: String,
49 },
50 SetBackgroundDdl {
52 enable: bool,
53 },
54 Drop,
55 Dml,
56 Flush,
57 Others,
58}
59
60impl SqlCmd {
61 fn allow_kill(&self) -> bool {
62 matches!(
63 self,
64 SqlCmd::Create {
65 is_create_table_as: false,
67 ..
68 } | SqlCmd::CreateMaterializedView { .. }
69 | SqlCmd::Drop
70 )
71 }
76
77 fn is_create(&self) -> bool {
78 matches!(
79 self,
80 SqlCmd::Create { .. } | SqlCmd::CreateMaterializedView { .. }
81 )
82 }
83}
84
85const KILL_IGNORE_FILES: &[&str] = &[
86 "tpch_snapshot.slt",
88 "tpch_upstream.slt",
89 "search_path.slt",
91 "transaction/now.slt",
93 "transaction/read_only_multi_conn.slt",
94 "transaction/read_only.slt",
95 "transaction/tolerance.slt",
96 "transaction/cursor.slt",
97 "transaction/cursor_multi_conn.slt",
98];
99
100mod background_ddl_mode {
102 use anyhow::bail;
103 use rand::Rng;
104 use rand_chacha::ChaChaRng;
105 use sqllogictest::{Condition, Record, StatementExpect};
106
107 use crate::client::RisingWave;
108 use crate::slt::SqlCmd;
109 use crate::slt::slt_env::Env;
110
111 pub(super) async fn wait_background_mv_finished(mview_name: &str) -> anyhow::Result<()> {
113 let Ok(rw) = RisingWave::connect("frontend".into(), "dev".into()).await else {
114 bail!("failed to connect to frontend for {mview_name}");
115 };
116 let client = rw.pg_client();
117 if client.simple_query("WAIT;").await.is_err() {
118 bail!("failed to wait for background mv to finish creating for {mview_name}");
119 }
120
121 let Ok(result) = client
122 .query(
123 "select count(*) from pg_matviews where matviewname=$1;",
124 &[&mview_name],
125 )
126 .await
127 else {
128 bail!("failed to query pg_matviews for {mview_name}");
129 };
130
131 match result[0].try_get::<_, i64>(0) {
132 Ok(1) => Ok(()),
133 r => bail!("expected 1 row in pg_matviews, got {r:#?} instead for {mview_name}"),
134 }
135 }
136
137 pub(super) async fn set_background_ddl<D, M, T>(
139 tester: &mut sqllogictest::Runner<D, M>,
140 env: &Env,
141 record: &Record<T>,
142 cmd: &SqlCmd,
143 manual_background_ddl_enabled: bool,
144 rng: &mut ChaChaRng,
145 background_ddl_enabled: &mut bool,
146 ) where
147 D: sqllogictest::AsyncDB<ColumnType = T>,
148 M: sqllogictest::MakeConnection<Conn = D>,
149 T: sqllogictest::ColumnType,
150 {
151 if let Record::Statement {
152 loc,
153 conditions,
154 connection,
155 ..
156 } = &record
157 && matches!(cmd, SqlCmd::CreateMaterializedView { .. })
158 && !manual_background_ddl_enabled
159 && conditions.iter().all(|c| {
160 *c != Condition::SkipIf {
161 label: "madsim".to_owned(),
162 }
163 })
164 && env.background_ddl_rate() > 0.0
165 {
166 let background_ddl_setting = rng.random_bool(env.background_ddl_rate());
167 let set_background_ddl = Record::Statement {
168 loc: loc.clone(),
169 conditions: conditions.clone(),
170 connection: connection.clone(),
171 expected: StatementExpect::Ok,
172 sql: format!("SET BACKGROUND_DDL={background_ddl_setting};"),
173 retry: None,
174 };
175 tester.run_async(set_background_ddl).await.unwrap();
176 *background_ddl_enabled = background_ddl_setting;
177 };
178 }
179
180 #[macro_export]
182 macro_rules! wait_or_retry_background_ddl {
183 ($record:expr, $name:expr, $iteration:expr) => {
184 let record = $record;
185 let i = $iteration;
186 tracing::debug!(iteration = i, "Retry for background ddl");
187 match wait_background_mv_finished($name).await {
188 Ok(_) => {
189 tracing::debug!(
190 iteration = i,
191 "Record with background_ddl {:?} finished",
192 record
193 );
194 break;
195 }
196 Err(err) => {
197 tracing::error!(
198 iteration = i,
199 ?err,
200 "failed to wait for background mv to finish creating"
201 );
202 if i >= MAX_RETRY {
203 panic!("failed to run test after retry {i} times, error={err:#?}");
204 }
205 continue;
206 }
207 }
208 };
209 }
210}
211
212mod vnode_mode {
213 use std::env;
214 use std::hash::{DefaultHasher, Hash, Hasher};
215 use std::sync::LazyLock;
216
217 use anyhow::bail;
218 use sqllogictest::Partitioner;
219
220 #[derive(Clone)]
222 pub(super) struct HashPartitioner {
223 count: u64,
224 id: u64,
225 }
226
227 impl HashPartitioner {
228 pub(super) fn new(count: u64, id: u64) -> anyhow::Result<Self> {
229 if count == 0 {
230 bail!("partition count must be greater than zero");
231 }
232 if id >= count {
233 bail!("partition id (zero-based) must be less than count");
234 }
235 Ok(Self { count, id })
236 }
237 }
238
239 impl Partitioner for HashPartitioner {
240 fn matches(&self, file_name: &str) -> bool {
241 let mut hasher = DefaultHasher::new();
242 file_name.hash(&mut hasher);
243 hasher.finish() % self.count == self.id
244 }
245 }
246
247 pub(super) static PARTITIONER: LazyLock<Option<HashPartitioner>> = LazyLock::new(|| {
248 let count = env::var("BUILDKITE_PARALLEL_JOB_COUNT")
249 .ok()?
250 .parse::<u64>()
251 .unwrap();
252 let id = env::var("BUILDKITE_PARALLEL_JOB")
253 .ok()?
254 .parse::<u64>()
255 .unwrap();
256 Some(HashPartitioner::new(count, id).unwrap())
257 });
258}
259
260pub mod slt_env {
261 use rand::SeedableRng;
262 use rand_chacha::ChaChaRng;
263
264 use crate::cluster::KillOpts;
265
266 pub struct Opts {
267 pub kill_opts: KillOpts,
268 pub background_ddl_rate: f64,
270 pub random_vnode_count: bool,
272 }
273
274 pub(super) struct Env {
275 opts: Opts,
276 }
277
278 impl Env {
279 pub fn new(opts: Opts) -> Self {
280 Self { opts }
281 }
282
283 pub fn get_rng() -> ChaChaRng {
284 let seed = std::env::var("MADSIM_TEST_SEED")
285 .unwrap_or("0".to_owned())
286 .parse::<u64>()
287 .unwrap();
288 ChaChaRng::seed_from_u64(seed)
289 }
290
291 pub fn background_ddl_rate(&self) -> f64 {
292 self.opts.background_ddl_rate
293 }
294
295 pub fn kill(&self) -> bool {
296 self.opts.kill_opts.kill_compute
297 || self.opts.kill_opts.kill_meta
298 || self.opts.kill_opts.kill_frontend
299 || self.opts.kill_opts.kill_compactor
300 }
301
302 pub fn random_vnode_count(&self) -> bool {
303 self.opts.random_vnode_count
304 }
305
306 pub fn kill_opts(&self) -> KillOpts {
307 self.opts.kill_opts
308 }
309
310 pub fn kill_rate(&self) -> f64 {
311 self.opts.kill_opts.kill_rate as f64
312 }
313 }
314}
315
316mod runner {
317 use std::time::Duration;
318
319 use rand::prelude::IteratorRandom;
320 use rand::rng as thread_rng;
321 use sqllogictest::{Record, StatementExpect, TestError};
322
323 use crate::slt::slt_env::Env;
324 use crate::slt::{MAX_RETRY, SqlCmd};
325 use crate::utils::TimedExt;
326 #[macro_export]
327 macro_rules! evaluate_skip {
328 ($env:expr, $path:expr) => {
329 if let Some(partitioner) = PARTITIONER.as_ref()
330 && !partitioner.matches($path.to_str().unwrap())
331 {
332 println!("[skip partition] {}", $path.display());
333 continue;
334 } else if $env.kill() && KILL_IGNORE_FILES.iter().any(|s| $path.ends_with(s)) {
335 println!("[skip kill] {}", $path.display());
336 continue;
337 } else {
338 println!("[run] {}", $path.display());
339 }
340 };
341 }
342
343 pub(super) fn random_vnode_count<T: sqllogictest::ColumnType>(
344 env: &Env,
345 records: &[Record<T>],
346 ) -> bool {
347 env.random_vnode_count()
348 && records.iter().all(|record| {
349 if let Record::Statement { sql, .. } | Record::Query { sql, .. } = record
350 && sql.to_lowercase().contains("parallelism")
351 {
352 println!("[RANDOM VNODE COUNT] skip: {}", sql);
353 false
354 } else {
355 true
356 }
357 })
358 }
359
360 pub(super) async fn run_no_kill<D, M, T>(
362 tester: &mut sqllogictest::Runner<D, M>,
363 random_vnode_count: bool,
364 cmd: &SqlCmd,
365 record: Record<T>,
366 ) -> Result<(), TestError>
367 where
368 D: sqllogictest::AsyncDB<ColumnType = T>,
369 M: sqllogictest::MakeConnection<Conn = D>,
370 T: sqllogictest::ColumnType,
371 {
372 if random_vnode_count
374 && cmd.is_create()
375 && let Record::Statement {
376 loc,
377 conditions,
378 connection,
379 ..
380 } = &record
381 {
382 let vnode_count = (2..=64) .chain(224..=288) .chain(992..=1056) .choose(&mut thread_rng())
386 .unwrap();
387 let sql = format!("SET STREAMING_MAX_PARALLELISM = {vnode_count};");
388 println!("[RANDOM VNODE COUNT] set: {vnode_count}");
389 let set_random_vnode_count = Record::Statement {
390 loc: loc.clone(),
391 conditions: conditions.clone(),
392 connection: connection.clone(),
393 sql,
394 expected: StatementExpect::Ok,
395 retry: None,
396 };
397 tester.run_async(set_random_vnode_count).await.unwrap();
398 println!("[RANDOM VNODE COUNT] run: {record}");
399 }
400
401 tester.run_async(record).await.map(|_| ())
402 }
403
404 pub(super) async fn run_kill_not_allowed<D, M, T>(
406 tester: &mut sqllogictest::Runner<D, M>,
407 record: Record<T>,
408 ) where
409 D: sqllogictest::AsyncDB<ColumnType = T>,
410 M: sqllogictest::MakeConnection<Conn = D>,
411 T: sqllogictest::ColumnType,
412 {
413 for i in 0usize.. {
414 let delay = Duration::from_secs(1 << i);
415 if let Err(err) = tester
416 .run_async(record.clone())
417 .timed(|_res, elapsed| {
418 tracing::debug!("Record {:?} finished in {:?}", record, elapsed)
419 })
420 .await
421 {
422 let err_string = err.to_string().to_ascii_lowercase();
423 let allowed_errs = [
426 "no reader for dml in table",
427 "error reading a body from connection: broken pipe",
428 "failed to inject barrier",
429 "get error from control stream",
430 "cluster is under recovering",
431 "streaming vnode mapping has not been initialized",
432 ];
433 let should_retry = i < MAX_RETRY
434 && allowed_errs
435 .iter()
436 .any(|allowed_err| err_string.contains(&allowed_err.to_ascii_lowercase()));
437 if !should_retry {
438 panic!("{}", err);
439 }
440 tracing::error!("failed to run test: {err}\nretry after {delay:?}");
441 } else {
442 break;
443 }
444 tokio::time::sleep(delay).await;
445 }
446 }
447}
448
449pub async fn run_slt_task(cluster: Arc<Cluster>, glob: &str, opts: Opts) {
451 let env = slt_env::Env::new(opts);
452 tracing::info!("background_ddl_rate: {}", env.background_ddl_rate());
453 let mut rng = Env::get_rng();
454 let files = glob::glob(glob).expect("failed to read glob pattern");
455 for file in files {
456 let mut tester =
458 sqllogictest::Runner::new(|| RisingWave::connect("frontend".into(), "dev".into()));
459 tester.add_label("madsim");
460 tester.set_var(well_known::DATABASE.to_owned(), "dev".to_owned());
461
462 let file = file.unwrap();
463 let path = file.as_path();
464
465 evaluate_skip!(env, path);
466
467 let tempfile = (path.ends_with("kafka.slt") || path.ends_with("kafka_batch.slt"))
469 .then(|| hack_kafka_test(path));
470 let path = tempfile.as_ref().map(|p| p.path()).unwrap_or(path);
471
472 let mut background_ddl_enabled = false;
474
475 let mut manual_background_ddl_enabled = false;
478
479 let records = sqllogictest::parse_file(path).expect("failed to parse file");
480 let random_vnode_count = random_vnode_count(&env, &records);
481
482 for record in records {
483 if let sqllogictest::Record::Halt { .. } = record {
488 break;
489 }
490
491 let cmd = match &record {
492 sqllogictest::Record::Statement {
493 sql, conditions, ..
494 }
495 | sqllogictest::Record::Query {
496 sql, conditions, ..
497 } if conditions
498 .iter()
499 .all(|c| !matches!(c, Condition::SkipIf{ label } if label == "madsim"))
500 && !conditions
501 .iter()
502 .any(|c| matches!(c, Condition::OnlyIf{ label} if label != "madsim" )) =>
503 {
504 extract_sql_command(sql).unwrap_or(SqlCmd::Others)
505 }
506 _ => SqlCmd::Others,
507 };
508
509 if !env.kill() {
511 match run_no_kill(&mut tester, random_vnode_count, &cmd, record).await {
512 Ok(_) => continue,
513 Err(e) => panic!("{}", e),
514 }
515 }
516
517 tracing::debug!(?cmd, "Running");
519
520 if let SqlCmd::SetBackgroundDdl { enable } = cmd {
521 manual_background_ddl_enabled = enable;
522 background_ddl_enabled = enable;
523 }
524
525 set_background_ddl(
527 &mut tester,
528 &env,
529 &record,
530 &cmd,
531 manual_background_ddl_enabled,
532 &mut rng,
533 &mut background_ddl_enabled,
534 )
535 .await;
536
537 if !cmd.allow_kill() {
538 run_kill_not_allowed(&mut tester, record.clone()).await;
539 continue;
540 }
541
542 let should_kill = thread_rng().random_bool(env.kill_rate());
543 let handle = if should_kill {
545 let cluster = cluster.clone();
546 let opts = env.kill_opts();
547 Some(tokio::spawn(async move {
548 let t = thread_rng().random_range(Duration::default()..Duration::from_secs(1));
549 tokio::time::sleep(t).await;
550 cluster.kill_node(&opts).await;
551 tokio::time::sleep(Duration::from_secs(15)).await;
552 }))
553 } else {
554 None
555 };
556
557 for i in 0usize.. {
558 tracing::debug!(iteration = i, "retry count");
559 let delay = Duration::from_secs(min(1 << i, 10));
560 if i > 0 {
561 tokio::time::sleep(delay).await;
562 }
563 match tester
564 .run_async(record.clone())
565 .timed(|_res, elapsed| {
566 tracing::debug!("Record {:?} finished in {:?}", record, elapsed)
567 })
568 .await
569 {
570 Ok(_) => {
571 if let SqlCmd::CreateMaterializedView { ref name } = cmd
573 && background_ddl_enabled
574 && !matches!(
575 record,
576 Record::Statement {
577 expected: StatementExpect::Error(_),
578 ..
579 } | Record::Query {
580 expected: QueryExpect::Error(_),
581 ..
582 }
583 )
584 {
585 wait_or_retry_background_ddl!(&record, name, i);
586 }
587 break;
588 }
589 Err(e) => {
590 match cmd {
591 SqlCmd::Create {
593 is_create_table_as: false,
594 }
595 | SqlCmd::CreateMaterializedView { .. }
596 if i != 0
597 && let e = e.to_string()
598 && !e.contains("gRPC request to meta service failed")
601 && e.contains("exists")
602 && !e.contains("under creation")
603 && e.contains("Catalog error") =>
604 {
605 break;
606 }
607 SqlCmd::Drop
609 if i != 0
610 && e.to_string().contains("not found")
611 && e.to_string().contains("Catalog error") =>
612 {
613 break;
614 }
615
616 _ if i >= MAX_RETRY => {
618 panic!("failed to run test after retry {i} times: {e}")
619 }
620 SqlCmd::CreateMaterializedView { ref name }
621 if i != 0
622 && e.to_string().contains("table is in creating procedure")
623 && background_ddl_enabled =>
624 {
625 wait_or_retry_background_ddl!(&record, name, i);
626 }
627 _ => tracing::error!(
628 iteration = i,
629 "failed to run test: {e}\nretry after {delay:?}"
630 ),
631 }
632 }
633 }
634 }
635 if let SqlCmd::SetBackgroundDdl { enable } = cmd {
636 background_ddl_enabled = enable;
637 };
638 if let Some(handle) = handle {
639 handle.await.unwrap();
640 }
641 }
642 }
643}
644
645pub async fn run_parallel_slt_task(glob: &str, jobs: usize) -> Result<(), ParallelTestError> {
646 let mut tester =
647 sqllogictest::Runner::new(|| RisingWave::connect("frontend".into(), "dev".into()));
648 tester.add_label("madsim");
649
650 if let Some(partitioner) = PARTITIONER.as_ref() {
651 tester.with_partitioner(partitioner.clone());
652 }
653
654 tester
655 .run_parallel_async(
656 glob,
657 vec!["frontend".into()],
658 |host, dbname| async move { RisingWave::connect(host, dbname).await.unwrap() },
659 jobs,
660 )
661 .await
662 .map_err(|e| panic!("{e}"))
663}
664
665fn hack_kafka_test(path: &Path) -> tempfile::NamedTempFile {
667 let content = std::fs::read_to_string(path).expect("failed to read file");
668 let simple_avsc_full_path =
669 std::fs::canonicalize("src/connector/src/test_data/simple-schema.avsc")
670 .expect("failed to get schema path");
671 let complex_avsc_full_path =
672 std::fs::canonicalize("src/connector/src/test_data/complex-schema.avsc")
673 .expect("failed to get schema path");
674 let json_schema_full_path =
675 std::fs::canonicalize("src/connector/src/test_data/complex-schema.json")
676 .expect("failed to get schema path");
677 let content = content
678 .replace("127.0.0.1:29092", "192.168.11.1:29092")
679 .replace("localhost:29092", "192.168.11.1:29092")
680 .replace(
681 "/risingwave/avro-simple-schema.avsc",
682 simple_avsc_full_path.to_str().unwrap(),
683 )
684 .replace(
685 "/risingwave/avro-complex-schema.avsc",
686 complex_avsc_full_path.to_str().unwrap(),
687 )
688 .replace(
689 "/risingwave/json-complex-schema",
690 json_schema_full_path.to_str().unwrap(),
691 );
692 let file = tempfile::NamedTempFile::new().expect("failed to create temp file");
693 std::fs::write(file.path(), content).expect("failed to write file");
694 println!("created a temp file for kafka test: {:?}", file.path());
695 file
696}
697
698#[cfg(test)]
699mod tests {
700 use std::fmt::Debug;
701
702 use expect_test::{Expect, expect};
703
704 use super::*;
705
706 fn check(actual: impl Debug, expect: Expect) {
707 let actual = format!("{:#?}", actual);
708 expect.assert_eq(&actual);
709 }
710
711 #[test]
712 fn test_extract_sql_command() {
713 check(
714 extract_sql_command("create table t as select 1;"),
715 expect![[r#"
716 Ok(
717 Create {
718 is_create_table_as: true,
719 },
720 )"#]],
721 );
722 check(
723 extract_sql_command(" create table t (a int);"),
724 expect![[r#"
725 Ok(
726 Create {
727 is_create_table_as: false,
728 },
729 )"#]],
730 );
731 check(
732 extract_sql_command(" create materialized view m_1 as select 1;"),
733 expect![[r#"
734 Ok(
735 CreateMaterializedView {
736 name: "m_1",
737 },
738 )"#]],
739 );
740 check(
741 extract_sql_command("set background_ddl= true;"),
742 expect![[r#"
743 Ok(
744 SetBackgroundDdl {
745 enable: true,
746 },
747 )"#]],
748 );
749 check(
750 extract_sql_command("SET BACKGROUND_DDL=true;"),
751 expect![[r#"
752 Ok(
753 SetBackgroundDdl {
754 enable: true,
755 },
756 )"#]],
757 );
758 check(
759 extract_sql_command("CREATE MATERIALIZED VIEW if not exists m_1 as select 1;"),
760 expect![[r#"
761 Ok(
762 CreateMaterializedView {
763 name: "m_1",
764 },
765 )"#]],
766 )
767 }
768}