1use std::cmp::min;
16use std::env;
17use std::hash::{DefaultHasher, Hash as _, Hasher as _};
18use std::path::Path;
19use std::sync::{Arc, LazyLock};
20use std::time::Duration;
21
22use anyhow::{Result, bail};
23use itertools::Itertools;
24use rand::seq::IteratorRandom;
25use rand::{Rng, SeedableRng, rng as thread_rng};
26use rand_chacha::ChaChaRng;
27use sqllogictest::{
28 Condition, ParallelTestError, Partitioner, QueryExpect, Record, StatementExpect,
29};
30
31use crate::client::RisingWave;
32use crate::cluster::{Cluster, KillOpts};
33use crate::utils::TimedExt;
34
35const MAX_RETRY: usize = 10;
37
38fn is_create_table_as(sql: &str) -> bool {
39 let parts: Vec<String> = sql.split_whitespace().map(|s| s.to_lowercase()).collect();
40
41 parts.len() >= 4 && parts[0] == "create" && parts[1] == "table" && parts[3] == "as"
42}
43
44fn is_sink_into_table(sql: &str) -> bool {
45 let parts: Vec<String> = sql.split_whitespace().map(|s| s.to_lowercase()).collect();
46
47 parts.len() >= 4 && parts[0] == "create" && parts[1] == "sink" && parts[3] == "into"
48}
49
50#[derive(Debug, PartialEq, Eq)]
51enum SqlCmd {
52 Create {
54 is_create_table_as: bool,
55 },
56 CreateSink {
58 is_sink_into_table: bool,
59 },
60 CreateMaterializedView {
62 name: String,
63 },
64 SetBackgroundDdl {
66 enable: bool,
67 },
68 Drop,
69 Dml,
70 Flush,
71 Alter,
72 Others,
73}
74
75impl SqlCmd {
76 fn allow_kill(&self) -> bool {
77 matches!(
78 self,
79 SqlCmd::Create {
80 is_create_table_as: false,
82 ..
83 } | SqlCmd::CreateSink {
84 is_sink_into_table: false,
85 } | SqlCmd::CreateMaterializedView { .. }
86 | SqlCmd::Drop
87 )
88 }
93
94 fn is_create(&self) -> bool {
95 matches!(
96 self,
97 SqlCmd::Create { .. }
98 | SqlCmd::CreateSink { .. }
99 | SqlCmd::CreateMaterializedView { .. }
100 )
101 }
102}
103
104fn extract_sql_command(sql: &str) -> SqlCmd {
105 let sql = sql.to_lowercase();
106 let tokens = sql.split_whitespace();
107 let mut tokens = tokens.multipeek();
108 let first_token = tokens.next().unwrap_or("");
109
110 match first_token {
111 "create" => {
117 let result: Option<SqlCmd> = try {
118 match tokens.next()? {
119 "materialized" => {
120 tokens.next()?;
122
123 let next = *tokens.peek()?;
125 if "if" == next
126 && let Some("not") = tokens.peek().cloned()
127 && let Some("exists") = tokens.peek().cloned()
128 {
129 tokens.next();
130 tokens.next();
131 tokens.next();
132 let name = tokens.next()?.to_owned();
133 SqlCmd::CreateMaterializedView { name }
134 } else {
135 let name = next.to_owned();
136 SqlCmd::CreateMaterializedView { name }
137 }
138 }
139 "sink" => SqlCmd::CreateSink {
140 is_sink_into_table: is_sink_into_table(&sql),
141 },
142 _ => SqlCmd::Create {
143 is_create_table_as: is_create_table_as(&sql),
144 },
145 }
146 };
147 result.unwrap_or(SqlCmd::Others)
148 }
149 "set" => {
150 if sql.contains("background_ddl") {
151 let enable = sql.contains("true");
152 SqlCmd::SetBackgroundDdl { enable }
153 } else {
154 SqlCmd::Others
155 }
156 }
157 "drop" => SqlCmd::Drop,
158 "insert" | "update" | "delete" => SqlCmd::Dml,
159 "flush" => SqlCmd::Flush,
160 "alter" => SqlCmd::Alter,
161 _ => SqlCmd::Others,
162 }
163}
164
165const KILL_IGNORE_FILES: &[&str] = &[
166 "tpch_snapshot.slt",
168 "tpch_upstream.slt",
169 "search_path.slt",
171 "transaction/now.slt",
173 "transaction/read_only_multi_conn.slt",
174 "transaction/read_only.slt",
175 "transaction/tolerance.slt",
176 "transaction/cursor.slt",
177 "transaction/cursor_multi_conn.slt",
178];
179
180async fn wait_background_mv_finished(mview_name: &str) -> Result<()> {
182 let Ok(rw) = RisingWave::connect("frontend".into(), "dev".into()).await else {
183 bail!("failed to connect to frontend for {mview_name}");
184 };
185 let client = rw.pg_client();
186 if client.simple_query("WAIT;").await.is_err() {
187 bail!("failed to wait for background mv to finish creating for {mview_name}");
188 }
189
190 let Ok(result) = client
191 .query(
192 "select count(*) from pg_matviews where matviewname=$1;",
193 &[&mview_name],
194 )
195 .await
196 else {
197 bail!("failed to query pg_matviews for {mview_name}");
198 };
199
200 match result[0].try_get::<_, i64>(0) {
201 Ok(1) => Ok(()),
202 r => bail!("expected 1 row in pg_matviews, got {r:#?} instead for {mview_name}"),
203 }
204}
205
206#[derive(Clone)]
208struct HashPartitioner {
209 count: u64,
210 id: u64,
211}
212
213impl HashPartitioner {
214 fn new(count: u64, id: u64) -> Result<Self> {
215 if count == 0 {
216 bail!("partition count must be greater than zero");
217 }
218 if id >= count {
219 bail!("partition id (zero-based) must be less than count");
220 }
221 Ok(Self { count, id })
222 }
223}
224
225impl Partitioner for HashPartitioner {
226 fn matches(&self, file_name: &str) -> bool {
227 let mut hasher = DefaultHasher::new();
228 file_name.hash(&mut hasher);
229 hasher.finish() % self.count == self.id
230 }
231}
232
233static PARTITIONER: LazyLock<Option<HashPartitioner>> = LazyLock::new(|| {
234 let count = env::var("BUILDKITE_PARALLEL_JOB_COUNT")
235 .ok()?
236 .parse::<u64>()
237 .unwrap();
238 let id = env::var("BUILDKITE_PARALLEL_JOB")
239 .ok()?
240 .parse::<u64>()
241 .unwrap();
242 Some(HashPartitioner::new(count, id).unwrap())
243});
244
245pub struct Opts {
246 pub kill_opts: KillOpts,
247 pub background_ddl_rate: f64,
249 pub random_vnode_count: bool,
251}
252
253pub async fn run_slt_task(
255 cluster: Arc<Cluster>,
256 glob: &str,
257 Opts {
258 kill_opts,
259 background_ddl_rate,
260 random_vnode_count,
261 }: Opts,
262) {
263 tracing::info!("background_ddl_rate: {}", background_ddl_rate);
264 let seed = std::env::var("MADSIM_TEST_SEED")
265 .unwrap_or("0".to_owned())
266 .parse::<u64>()
267 .unwrap();
268 let mut rng = ChaChaRng::seed_from_u64(seed);
269 let kill = kill_opts.kill_compute
270 || kill_opts.kill_meta
271 || kill_opts.kill_frontend
272 || kill_opts.kill_compactor;
273 let files = glob::glob(glob).expect("failed to read glob pattern");
274 for file in files {
275 let mut tester =
277 sqllogictest::Runner::new(|| RisingWave::connect("frontend".into(), "dev".into()));
278 tester.add_label("madsim");
279
280 let file = file.unwrap();
281 let path = file.as_path();
282
283 if let Some(partitioner) = PARTITIONER.as_ref()
284 && !partitioner.matches(path.to_str().unwrap())
285 {
286 println!("{} [partition skipped]", path.display());
287 continue;
288 } else if kill && KILL_IGNORE_FILES.iter().any(|s| path.ends_with(s)) {
289 println!("{} [kill ignored]", path.display());
290 continue;
291 }
292 println!("{}", path.display());
293
294 let tempfile = (path.ends_with("kafka.slt") || path.ends_with("kafka_batch.slt"))
296 .then(|| hack_kafka_test(path));
297 let path = tempfile.as_ref().map(|p| p.path()).unwrap_or(path);
298
299 let mut background_ddl_enabled = false;
301
302 let mut manual_background_ddl_enabled = false;
305
306 let records = sqllogictest::parse_file(path).expect("failed to parse file");
307 let random_vnode_count = random_vnode_count
308 && records.iter().all(|record| {
311 if let Record::Statement { sql, .. } | Record::Query { sql, .. } = record
312 && sql.to_lowercase().contains("parallelism")
313 {
314 println!("[RANDOM VNODE COUNT] skip: {}", path.display());
315 false
316 } else {
317 true
318 }
319 });
320
321 for record in records {
322 if let sqllogictest::Record::Halt { .. } = record {
327 break;
328 }
329
330 let cmd = match &record {
331 sqllogictest::Record::Statement {
332 sql, conditions, ..
333 }
334 | sqllogictest::Record::Query {
335 sql, conditions, ..
336 } if conditions
337 .iter()
338 .all(|c| !matches!(c, Condition::SkipIf{ label } if label == "madsim"))
339 && !conditions
340 .iter()
341 .any(|c| matches!(c, Condition::OnlyIf{ label} if label != "madsim" )) =>
342 {
343 extract_sql_command(sql)
344 }
345 _ => SqlCmd::Others,
346 };
347
348 if !kill {
350 if random_vnode_count
352 && cmd.is_create()
353 && let Record::Statement {
354 loc,
355 conditions,
356 connection,
357 ..
358 } = &record
359 {
360 let vnode_count = (2..=64) .chain(224..=288) .chain(992..=1056) .choose(&mut thread_rng())
364 .unwrap();
365 let sql = format!("SET STREAMING_MAX_PARALLELISM = {vnode_count};");
366 println!("[RANDOM VNODE COUNT] set: {vnode_count}");
367 let set_random_vnode_count = Record::Statement {
368 loc: loc.clone(),
369 conditions: conditions.clone(),
370 connection: connection.clone(),
371 sql,
372 expected: StatementExpect::Ok,
373 retry: None,
374 };
375 tester.run_async(set_random_vnode_count).await.unwrap();
376 println!("[RANDOM VNODE COUNT] run: {record}");
377 }
378
379 match tester
380 .run_async(record.clone())
381 .timed(|_res, elapsed| {
382 tracing::debug!("Record {:?} finished in {:?}", record, elapsed)
383 })
384 .await
385 {
386 Ok(_) => continue,
387 Err(e) => panic!("{}", e),
388 }
389 }
390
391 tracing::debug!(?cmd, "Running");
393
394 if let SqlCmd::SetBackgroundDdl { enable } = cmd {
395 manual_background_ddl_enabled = enable;
396 background_ddl_enabled = enable;
397 }
398
399 if let Record::Statement {
401 loc,
402 conditions,
403 connection,
404 ..
405 } = &record
406 && matches!(cmd, SqlCmd::CreateMaterializedView { .. })
407 && !manual_background_ddl_enabled
408 && conditions.iter().all(|c| {
409 *c != Condition::SkipIf {
410 label: "madsim".to_owned(),
411 }
412 })
413 && background_ddl_rate > 0.0
414 {
415 let background_ddl_setting = rng.random_bool(background_ddl_rate);
416 let set_background_ddl = Record::Statement {
417 loc: loc.clone(),
418 conditions: conditions.clone(),
419 connection: connection.clone(),
420 expected: StatementExpect::Ok,
421 sql: format!("SET BACKGROUND_DDL={background_ddl_setting};"),
422 retry: None,
423 };
424 tester.run_async(set_background_ddl).await.unwrap();
425 background_ddl_enabled = background_ddl_setting;
426 };
427
428 if !cmd.allow_kill() {
429 for i in 0usize.. {
430 let delay = Duration::from_secs(1 << i);
431 if let Err(err) = tester
432 .run_async(record.clone())
433 .timed(|_res, elapsed| {
434 tracing::debug!("Record {:?} finished in {:?}", record, elapsed)
435 })
436 .await
437 {
438 let err_string = err.to_string();
439 let allowed_errs = [
442 "no reader for dml in table",
443 "error reading a body from connection: broken pipe",
444 "failed to inject barrier",
445 "get error from control stream",
446 "cluster is under recovering",
447 ];
448 let should_retry = i < MAX_RETRY
449 && allowed_errs
450 .iter()
451 .any(|allowed_err| err_string.contains(allowed_err));
452 if !should_retry {
453 panic!("{}", err);
454 }
455 tracing::error!("failed to run test: {err}\nretry after {delay:?}");
456 } else {
457 break;
458 }
459 tokio::time::sleep(delay).await;
460 }
461 continue;
462 }
463
464 let should_kill = thread_rng().random_bool(kill_opts.kill_rate as f64);
465 let handle = if should_kill {
467 let cluster = cluster.clone();
468 let opts = kill_opts;
469 Some(tokio::spawn(async move {
470 let t = thread_rng().random_range(Duration::default()..Duration::from_secs(1));
471 tokio::time::sleep(t).await;
472 cluster.kill_node(&opts).await;
473 tokio::time::sleep(Duration::from_secs(15)).await;
474 }))
475 } else {
476 None
477 };
478
479 for i in 0usize.. {
480 tracing::debug!(iteration = i, "retry count");
481 let delay = Duration::from_secs(min(1 << i, 10));
482 if i > 0 {
483 tokio::time::sleep(delay).await;
484 }
485 match tester
486 .run_async(record.clone())
487 .timed(|_res, elapsed| {
488 tracing::debug!("Record {:?} finished in {:?}", record, elapsed)
489 })
490 .await
491 {
492 Ok(_) => {
493 if let SqlCmd::CreateMaterializedView { ref name } = cmd
495 && background_ddl_enabled
496 && !matches!(
497 record,
498 Record::Statement {
499 expected: StatementExpect::Error(_),
500 ..
501 } | Record::Query {
502 expected: QueryExpect::Error(_),
503 ..
504 }
505 )
506 {
507 tracing::debug!(iteration = i, "Retry for background ddl");
508 match wait_background_mv_finished(name).await {
509 Ok(_) => {
510 tracing::debug!(
511 iteration = i,
512 "Record with background_ddl {:?} finished",
513 record
514 );
515 break;
516 }
517 Err(err) => {
518 tracing::error!(
519 iteration = i,
520 ?err,
521 "failed to wait for background mv to finish creating"
522 );
523 if i >= MAX_RETRY {
524 panic!(
525 "failed to run test after retry {i} times, error={err:#?}"
526 );
527 }
528 continue;
529 }
530 }
531 }
532 break;
533 }
534 Err(e) => {
535 match cmd {
536 SqlCmd::Create {
538 is_create_table_as: false,
539 }
540 | SqlCmd::CreateSink {
541 is_sink_into_table: false,
542 }
543 | SqlCmd::CreateMaterializedView { .. }
544 if i != 0
545 && let e = e.to_string()
546 && !e.contains("gRPC request to meta service failed")
549 && e.contains("exists")
550 && !e.contains("under creation")
551 && e.contains("Catalog error") =>
552 {
553 break;
554 }
555 SqlCmd::Drop
557 if i != 0
558 && e.to_string().contains("not found")
559 && e.to_string().contains("Catalog error") =>
560 {
561 break;
562 }
563
564 _ if i >= MAX_RETRY => {
566 panic!("failed to run test after retry {i} times: {e}")
567 }
568 SqlCmd::CreateMaterializedView { ref name }
569 if i != 0
570 && e.to_string().contains("table is in creating procedure")
571 && background_ddl_enabled =>
572 {
573 tracing::debug!(iteration = i, name, "Retry for background ddl");
574 match wait_background_mv_finished(name).await {
575 Ok(_) => {
576 tracing::debug!(
577 iteration = i,
578 "Record with background_ddl {:?} finished",
579 record
580 );
581 break;
582 }
583 Err(err) => {
584 tracing::error!(
585 iteration = i,
586 ?err,
587 "failed to wait for background mv to finish creating"
588 );
589 if i >= MAX_RETRY {
590 panic!(
591 "failed to run test after retry {i} times, error={err:#?}"
592 );
593 }
594 continue;
595 }
596 }
597 }
598 _ => tracing::error!(
599 iteration = i,
600 "failed to run test: {e}\nretry after {delay:?}"
601 ),
602 }
603 }
604 }
605 }
606 if let SqlCmd::SetBackgroundDdl { enable } = cmd {
607 background_ddl_enabled = enable;
608 };
609 if let Some(handle) = handle {
610 handle.await.unwrap();
611 }
612 }
613 }
614}
615
616pub async fn run_parallel_slt_task(glob: &str, jobs: usize) -> Result<(), ParallelTestError> {
617 let mut tester =
618 sqllogictest::Runner::new(|| RisingWave::connect("frontend".into(), "dev".into()));
619 tester.add_label("madsim");
620
621 if let Some(partitioner) = PARTITIONER.as_ref() {
622 tester.with_partitioner(partitioner.clone());
623 }
624
625 tester
626 .run_parallel_async(
627 glob,
628 vec!["frontend".into()],
629 |host, dbname| async move { RisingWave::connect(host, dbname).await.unwrap() },
630 jobs,
631 )
632 .await
633 .map_err(|e| panic!("{e}"))
634}
635
636fn hack_kafka_test(path: &Path) -> tempfile::NamedTempFile {
638 let content = std::fs::read_to_string(path).expect("failed to read file");
639 let simple_avsc_full_path =
640 std::fs::canonicalize("src/connector/src/test_data/simple-schema.avsc")
641 .expect("failed to get schema path");
642 let complex_avsc_full_path =
643 std::fs::canonicalize("src/connector/src/test_data/complex-schema.avsc")
644 .expect("failed to get schema path");
645 let json_schema_full_path =
646 std::fs::canonicalize("src/connector/src/test_data/complex-schema.json")
647 .expect("failed to get schema path");
648 let content = content
649 .replace("127.0.0.1:29092", "192.168.11.1:29092")
650 .replace("localhost:29092", "192.168.11.1:29092")
651 .replace(
652 "/risingwave/avro-simple-schema.avsc",
653 simple_avsc_full_path.to_str().unwrap(),
654 )
655 .replace(
656 "/risingwave/avro-complex-schema.avsc",
657 complex_avsc_full_path.to_str().unwrap(),
658 )
659 .replace(
660 "/risingwave/json-complex-schema",
661 json_schema_full_path.to_str().unwrap(),
662 );
663 let file = tempfile::NamedTempFile::new().expect("failed to create temp file");
664 std::fs::write(file.path(), content).expect("failed to write file");
665 println!("created a temp file for kafka test: {:?}", file.path());
666 file
667}
668
669#[cfg(test)]
670mod tests {
671 use std::fmt::Debug;
672
673 use expect_test::{Expect, expect};
674
675 use super::*;
676
677 fn check(actual: impl Debug, expect: Expect) {
678 let actual = format!("{:#?}", actual);
679 expect.assert_eq(&actual);
680 }
681
682 #[test]
683 fn test_is_create_table_as() {
684 assert!(is_create_table_as(" create table xx as select 1;"));
685 assert!(!is_create_table_as(
686 " create table xx not as select 1;"
687 ));
688 assert!(!is_create_table_as(" create view xx as select 1;"));
689 }
690
691 #[test]
692 fn test_extract_sql_command() {
693 check(
694 extract_sql_command("create table t as select 1;"),
695 expect![[r#"
696 Create {
697 is_create_table_as: true,
698 }"#]],
699 );
700 check(
701 extract_sql_command(" create table t (a int);"),
702 expect![[r#"
703 Create {
704 is_create_table_as: false,
705 }"#]],
706 );
707 check(
708 extract_sql_command(" create materialized view m_1 as select 1;"),
709 expect![[r#"
710 CreateMaterializedView {
711 name: "m_1",
712 }"#]],
713 );
714 check(
715 extract_sql_command("set background_ddl= true;"),
716 expect![[r#"
717 SetBackgroundDdl {
718 enable: true,
719 }"#]],
720 );
721 check(
722 extract_sql_command("SET BACKGROUND_DDL=true;"),
723 expect![[r#"
724 SetBackgroundDdl {
725 enable: true,
726 }"#]],
727 );
728 check(
729 extract_sql_command("CREATE MATERIALIZED VIEW if not exists m_1 as select 1;"),
730 expect![[r#"
731 CreateMaterializedView {
732 name: "m_1",
733 }"#]],
734 )
735 }
736}