risingwave_simulation/
slt.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::min;
16use std::env;
17use std::hash::{DefaultHasher, Hash as _, Hasher as _};
18use std::path::Path;
19use std::sync::{Arc, LazyLock};
20use std::time::Duration;
21
22use anyhow::{Result, bail};
23use itertools::Itertools;
24use rand::seq::IteratorRandom;
25use rand::{Rng, SeedableRng, rng as thread_rng};
26use rand_chacha::ChaChaRng;
27use sqllogictest::{
28    Condition, ParallelTestError, Partitioner, QueryExpect, Record, StatementExpect,
29};
30
31use crate::client::RisingWave;
32use crate::cluster::{Cluster, KillOpts};
33use crate::utils::TimedExt;
34
35// retry a maximum times until it succeed
36const MAX_RETRY: usize = 10;
37
38fn is_create_table_as(sql: &str) -> bool {
39    let parts: Vec<String> = sql.split_whitespace().map(|s| s.to_lowercase()).collect();
40
41    parts.len() >= 4 && parts[0] == "create" && parts[1] == "table" && parts[3] == "as"
42}
43
44fn is_sink_into_table(sql: &str) -> bool {
45    let parts: Vec<String> = sql.split_whitespace().map(|s| s.to_lowercase()).collect();
46
47    parts.len() >= 4 && parts[0] == "create" && parts[1] == "sink" && parts[3] == "into"
48}
49
50#[derive(Debug, PartialEq, Eq)]
51enum SqlCmd {
52    /// Other create statements.
53    Create {
54        is_create_table_as: bool,
55    },
56    /// Create sink.
57    CreateSink {
58        is_sink_into_table: bool,
59    },
60    /// Create Materialized views
61    CreateMaterializedView {
62        name: String,
63    },
64    /// Set background ddl
65    SetBackgroundDdl {
66        enable: bool,
67    },
68    Drop,
69    Dml,
70    Flush,
71    Alter,
72    Others,
73}
74
75impl SqlCmd {
76    fn allow_kill(&self) -> bool {
77        matches!(
78            self,
79            SqlCmd::Create {
80                // `create table as` is also not atomic in our system.
81                is_create_table_as: false,
82                ..
83            } | SqlCmd::CreateSink {
84                is_sink_into_table: false,
85            } | SqlCmd::CreateMaterializedView { .. }
86                | SqlCmd::Drop
87        )
88        // We won't kill during insert/update/delete/alter since the atomicity is not guaranteed.
89        // TODO: For `SqlCmd::Alter`, since table fragment and catalog commit for table schema change
90        // are not transactional, we can't kill during `alter table add/drop columns` for now, will
91        // remove it until transactional commit of table fragment and catalog is supported.
92    }
93
94    fn is_create(&self) -> bool {
95        matches!(
96            self,
97            SqlCmd::Create { .. }
98                | SqlCmd::CreateSink { .. }
99                | SqlCmd::CreateMaterializedView { .. }
100        )
101    }
102}
103
104fn extract_sql_command(sql: &str) -> SqlCmd {
105    let sql = sql.to_lowercase();
106    let tokens = sql.split_whitespace();
107    let mut tokens = tokens.multipeek();
108    let first_token = tokens.next().unwrap_or("");
109
110    match first_token {
111        // NOTE(kwannoel):
112        // It's entirely possible for a malformed command to be parsed as `SqlCmd::Create`.
113        // BUT an error should be expected for such a test.
114        // So we don't need to handle this case.
115        // Eventually if there are too many edge cases, we can opt to use our parser.
116        "create" => {
117            let result: Option<SqlCmd> = try {
118                match tokens.next()? {
119                    "materialized" => {
120                        // view
121                        tokens.next()?;
122
123                        // if not exists | name
124                        let next = *tokens.peek()?;
125                        if "if" == next
126                            && let Some("not") = tokens.peek().cloned()
127                            && let Some("exists") = tokens.peek().cloned()
128                        {
129                            tokens.next();
130                            tokens.next();
131                            tokens.next();
132                            let name = tokens.next()?.to_owned();
133                            SqlCmd::CreateMaterializedView { name }
134                        } else {
135                            let name = next.to_owned();
136                            SqlCmd::CreateMaterializedView { name }
137                        }
138                    }
139                    "sink" => SqlCmd::CreateSink {
140                        is_sink_into_table: is_sink_into_table(&sql),
141                    },
142                    _ => SqlCmd::Create {
143                        is_create_table_as: is_create_table_as(&sql),
144                    },
145                }
146            };
147            result.unwrap_or(SqlCmd::Others)
148        }
149        "set" => {
150            if sql.contains("background_ddl") {
151                let enable = sql.contains("true");
152                SqlCmd::SetBackgroundDdl { enable }
153            } else {
154                SqlCmd::Others
155            }
156        }
157        "drop" => SqlCmd::Drop,
158        "insert" | "update" | "delete" => SqlCmd::Dml,
159        "flush" => SqlCmd::Flush,
160        "alter" => SqlCmd::Alter,
161        _ => SqlCmd::Others,
162    }
163}
164
165const KILL_IGNORE_FILES: &[&str] = &[
166    // TPCH queries are too slow for recovery.
167    "tpch_snapshot.slt",
168    "tpch_upstream.slt",
169    // Drop is not retryable in search path test.
170    "search_path.slt",
171    // Transaction statements are not retryable.
172    "transaction/now.slt",
173    "transaction/read_only_multi_conn.slt",
174    "transaction/read_only.slt",
175    "transaction/tolerance.slt",
176    "transaction/cursor.slt",
177    "transaction/cursor_multi_conn.slt",
178];
179
180/// Wait for background mv to finish creating
181async fn wait_background_mv_finished(mview_name: &str) -> Result<()> {
182    let Ok(rw) = RisingWave::connect("frontend".into(), "dev".into()).await else {
183        bail!("failed to connect to frontend for {mview_name}");
184    };
185    let client = rw.pg_client();
186    if client.simple_query("WAIT;").await.is_err() {
187        bail!("failed to wait for background mv to finish creating for {mview_name}");
188    }
189
190    let Ok(result) = client
191        .query(
192            "select count(*) from pg_matviews where matviewname=$1;",
193            &[&mview_name],
194        )
195        .await
196    else {
197        bail!("failed to query pg_matviews for {mview_name}");
198    };
199
200    match result[0].try_get::<_, i64>(0) {
201        Ok(1) => Ok(()),
202        r => bail!("expected 1 row in pg_matviews, got {r:#?} instead for {mview_name}"),
203    }
204}
205
206// Copied from sqllogictest-bin.
207#[derive(Clone)]
208struct HashPartitioner {
209    count: u64,
210    id: u64,
211}
212
213impl HashPartitioner {
214    fn new(count: u64, id: u64) -> Result<Self> {
215        if count == 0 {
216            bail!("partition count must be greater than zero");
217        }
218        if id >= count {
219            bail!("partition id (zero-based) must be less than count");
220        }
221        Ok(Self { count, id })
222    }
223}
224
225impl Partitioner for HashPartitioner {
226    fn matches(&self, file_name: &str) -> bool {
227        let mut hasher = DefaultHasher::new();
228        file_name.hash(&mut hasher);
229        hasher.finish() % self.count == self.id
230    }
231}
232
233static PARTITIONER: LazyLock<Option<HashPartitioner>> = LazyLock::new(|| {
234    let count = env::var("BUILDKITE_PARALLEL_JOB_COUNT")
235        .ok()?
236        .parse::<u64>()
237        .unwrap();
238    let id = env::var("BUILDKITE_PARALLEL_JOB")
239        .ok()?
240        .parse::<u64>()
241        .unwrap();
242    Some(HashPartitioner::new(count, id).unwrap())
243});
244
245pub struct Opts {
246    pub kill_opts: KillOpts,
247    /// Probability of `background_ddl` being set to true per ddl record.
248    pub background_ddl_rate: f64,
249    /// Set vnode count (`STREAMING_MAX_PARALLELISM`) to random value before running DDL.
250    pub random_vnode_count: bool,
251}
252
253/// Run the sqllogictest files in `glob`.
254pub async fn run_slt_task(
255    cluster: Arc<Cluster>,
256    glob: &str,
257    Opts {
258        kill_opts,
259        background_ddl_rate,
260        random_vnode_count,
261    }: Opts,
262) {
263    tracing::info!("background_ddl_rate: {}", background_ddl_rate);
264    let seed = std::env::var("MADSIM_TEST_SEED")
265        .unwrap_or("0".to_owned())
266        .parse::<u64>()
267        .unwrap();
268    let mut rng = ChaChaRng::seed_from_u64(seed);
269    let kill = kill_opts.kill_compute
270        || kill_opts.kill_meta
271        || kill_opts.kill_frontend
272        || kill_opts.kill_compactor;
273    let files = glob::glob(glob).expect("failed to read glob pattern");
274    for file in files {
275        // use a session per file
276        let mut tester =
277            sqllogictest::Runner::new(|| RisingWave::connect("frontend".into(), "dev".into()));
278        tester.add_label("madsim");
279
280        let file = file.unwrap();
281        let path = file.as_path();
282
283        if let Some(partitioner) = PARTITIONER.as_ref()
284            && !partitioner.matches(path.to_str().unwrap())
285        {
286            println!("{} [partition skipped]", path.display());
287            continue;
288        } else if kill && KILL_IGNORE_FILES.iter().any(|s| path.ends_with(s)) {
289            println!("{} [kill ignored]", path.display());
290            continue;
291        }
292        println!("{}", path.display());
293
294        // XXX: hack for kafka source test
295        let tempfile = (path.ends_with("kafka.slt") || path.ends_with("kafka_batch.slt"))
296            .then(|| hack_kafka_test(path));
297        let path = tempfile.as_ref().map(|p| p.path()).unwrap_or(path);
298
299        // NOTE(kwannoel): For background ddl
300        let mut background_ddl_enabled = false;
301
302        // If background ddl is set to true within the test case, prevent random setting of background_ddl to true.
303        // We can revert it back to false only if we encounter a record that sets background_ddl to false.
304        let mut manual_background_ddl_enabled = false;
305
306        let records = sqllogictest::parse_file(path).expect("failed to parse file");
307        let random_vnode_count = random_vnode_count
308            // Skip using random vnode count if the test case cares about parallelism, including
309            // setting parallelism manually or checking the parallelism with system tables.
310            && records.iter().all(|record| {
311                if let Record::Statement { sql, .. } | Record::Query { sql, .. } = record
312                    && sql.to_lowercase().contains("parallelism")
313                {
314                    println!("[RANDOM VNODE COUNT] skip: {}", path.display());
315                    false
316                } else {
317                    true
318                }
319            });
320
321        for record in records {
322            // uncomment to print metrics for task counts
323            // let metrics = madsim::runtime::Handle::current().metrics();
324            // println!("{:#?}", metrics);
325            // println!("{}", metrics.num_tasks_by_node_by_spawn());
326            if let sqllogictest::Record::Halt { .. } = record {
327                break;
328            }
329
330            let cmd = match &record {
331                sqllogictest::Record::Statement {
332                    sql, conditions, ..
333                }
334                | sqllogictest::Record::Query {
335                    sql, conditions, ..
336                } if conditions
337                    .iter()
338                    .all(|c| !matches!(c, Condition::SkipIf{ label } if label == "madsim"))
339                    && !conditions
340                        .iter()
341                        .any(|c| matches!(c, Condition::OnlyIf{ label} if label != "madsim" )) =>
342                {
343                    extract_sql_command(sql)
344                }
345                _ => SqlCmd::Others,
346            };
347
348            // For normal records.
349            if !kill {
350                // Set random vnode count if needed.
351                if random_vnode_count
352                    && cmd.is_create()
353                    && let Record::Statement {
354                        loc,
355                        conditions,
356                        connection,
357                        ..
358                    } = &record
359                {
360                    let vnode_count = (2..=64) // small
361                        .chain(224..=288) // normal
362                        .chain(992..=1056) // 1024 affects row id gen behavior
363                        .choose(&mut thread_rng())
364                        .unwrap();
365                    let sql = format!("SET STREAMING_MAX_PARALLELISM = {vnode_count};");
366                    println!("[RANDOM VNODE COUNT] set: {vnode_count}");
367                    let set_random_vnode_count = Record::Statement {
368                        loc: loc.clone(),
369                        conditions: conditions.clone(),
370                        connection: connection.clone(),
371                        sql,
372                        expected: StatementExpect::Ok,
373                        retry: None,
374                    };
375                    tester.run_async(set_random_vnode_count).await.unwrap();
376                    println!("[RANDOM VNODE COUNT] run: {record}");
377                }
378
379                match tester
380                    .run_async(record.clone())
381                    .timed(|_res, elapsed| {
382                        tracing::debug!("Record {:?} finished in {:?}", record, elapsed)
383                    })
384                    .await
385                {
386                    Ok(_) => continue,
387                    Err(e) => panic!("{}", e),
388                }
389            }
390
391            // For kill enabled.
392            tracing::debug!(?cmd, "Running");
393
394            if let SqlCmd::SetBackgroundDdl { enable } = cmd {
395                manual_background_ddl_enabled = enable;
396                background_ddl_enabled = enable;
397            }
398
399            // For each background ddl compatible statement, provide a chance for background_ddl=true.
400            if let Record::Statement {
401                loc,
402                conditions,
403                connection,
404                ..
405            } = &record
406                && matches!(cmd, SqlCmd::CreateMaterializedView { .. })
407                && !manual_background_ddl_enabled
408                && conditions.iter().all(|c| {
409                    *c != Condition::SkipIf {
410                        label: "madsim".to_owned(),
411                    }
412                })
413                && background_ddl_rate > 0.0
414            {
415                let background_ddl_setting = rng.random_bool(background_ddl_rate);
416                let set_background_ddl = Record::Statement {
417                    loc: loc.clone(),
418                    conditions: conditions.clone(),
419                    connection: connection.clone(),
420                    expected: StatementExpect::Ok,
421                    sql: format!("SET BACKGROUND_DDL={background_ddl_setting};"),
422                    retry: None,
423                };
424                tester.run_async(set_background_ddl).await.unwrap();
425                background_ddl_enabled = background_ddl_setting;
426            };
427
428            if !cmd.allow_kill() {
429                for i in 0usize.. {
430                    let delay = Duration::from_secs(1 << i);
431                    if let Err(err) = tester
432                        .run_async(record.clone())
433                        .timed(|_res, elapsed| {
434                            tracing::debug!("Record {:?} finished in {:?}", record, elapsed)
435                        })
436                        .await
437                    {
438                        let err_string = err.to_string();
439                        // cluster could be still under recovering if killed before, retry if
440                        // meets `no reader for dml in table with id {}`.
441                        let allowed_errs = [
442                            "no reader for dml in table",
443                            "error reading a body from connection: broken pipe",
444                            "failed to inject barrier",
445                            "get error from control stream",
446                            "cluster is under recovering",
447                        ];
448                        let should_retry = i < MAX_RETRY
449                            && allowed_errs
450                                .iter()
451                                .any(|allowed_err| err_string.contains(allowed_err));
452                        if !should_retry {
453                            panic!("{}", err);
454                        }
455                        tracing::error!("failed to run test: {err}\nretry after {delay:?}");
456                    } else {
457                        break;
458                    }
459                    tokio::time::sleep(delay).await;
460                }
461                continue;
462            }
463
464            let should_kill = thread_rng().random_bool(kill_opts.kill_rate as f64);
465            // spawn a background task to kill nodes
466            let handle = if should_kill {
467                let cluster = cluster.clone();
468                let opts = kill_opts;
469                Some(tokio::spawn(async move {
470                    let t = thread_rng().random_range(Duration::default()..Duration::from_secs(1));
471                    tokio::time::sleep(t).await;
472                    cluster.kill_node(&opts).await;
473                    tokio::time::sleep(Duration::from_secs(15)).await;
474                }))
475            } else {
476                None
477            };
478
479            for i in 0usize.. {
480                tracing::debug!(iteration = i, "retry count");
481                let delay = Duration::from_secs(min(1 << i, 10));
482                if i > 0 {
483                    tokio::time::sleep(delay).await;
484                }
485                match tester
486                    .run_async(record.clone())
487                    .timed(|_res, elapsed| {
488                        tracing::debug!("Record {:?} finished in {:?}", record, elapsed)
489                    })
490                    .await
491                {
492                    Ok(_) => {
493                        // For background ddl
494                        if let SqlCmd::CreateMaterializedView { ref name } = cmd
495                            && background_ddl_enabled
496                            && !matches!(
497                                record,
498                                Record::Statement {
499                                    expected: StatementExpect::Error(_),
500                                    ..
501                                } | Record::Query {
502                                    expected: QueryExpect::Error(_),
503                                    ..
504                                }
505                            )
506                        {
507                            tracing::debug!(iteration = i, "Retry for background ddl");
508                            match wait_background_mv_finished(name).await {
509                                Ok(_) => {
510                                    tracing::debug!(
511                                        iteration = i,
512                                        "Record with background_ddl {:?} finished",
513                                        record
514                                    );
515                                    break;
516                                }
517                                Err(err) => {
518                                    tracing::error!(
519                                        iteration = i,
520                                        ?err,
521                                        "failed to wait for background mv to finish creating"
522                                    );
523                                    if i >= MAX_RETRY {
524                                        panic!(
525                                            "failed to run test after retry {i} times, error={err:#?}"
526                                        );
527                                    }
528                                    continue;
529                                }
530                            }
531                        }
532                        break;
533                    }
534                    Err(e) => {
535                        match cmd {
536                            // allow 'table exists' error when retry CREATE statement
537                            SqlCmd::Create {
538                                is_create_table_as: false,
539                            }
540                            | SqlCmd::CreateSink {
541                                is_sink_into_table: false,
542                            }
543                            | SqlCmd::CreateMaterializedView { .. }
544                                if i != 0
545                                    && let e = e.to_string()
546                                    // It should not be a gRPC request to meta error,
547                                    // otherwise it means that the catalog is not yet populated to fe.
548                                    && !e.contains("gRPC request to meta service failed")
549                                    && e.contains("exists")
550                                    && !e.contains("under creation")
551                                    && e.contains("Catalog error") =>
552                            {
553                                break;
554                            }
555                            // allow 'not found' error when retry DROP statement
556                            SqlCmd::Drop
557                                if i != 0
558                                    && e.to_string().contains("not found")
559                                    && e.to_string().contains("Catalog error") =>
560                            {
561                                break;
562                            }
563
564                            // Keep i >= MAX_RETRY for other errors. Since these errors indicate that the MV might not yet be created.
565                            _ if i >= MAX_RETRY => {
566                                panic!("failed to run test after retry {i} times: {e}")
567                            }
568                            SqlCmd::CreateMaterializedView { ref name }
569                                if i != 0
570                                    && e.to_string().contains("table is in creating procedure")
571                                    && background_ddl_enabled =>
572                            {
573                                tracing::debug!(iteration = i, name, "Retry for background ddl");
574                                match wait_background_mv_finished(name).await {
575                                    Ok(_) => {
576                                        tracing::debug!(
577                                            iteration = i,
578                                            "Record with background_ddl {:?} finished",
579                                            record
580                                        );
581                                        break;
582                                    }
583                                    Err(err) => {
584                                        tracing::error!(
585                                            iteration = i,
586                                            ?err,
587                                            "failed to wait for background mv to finish creating"
588                                        );
589                                        if i >= MAX_RETRY {
590                                            panic!(
591                                                "failed to run test after retry {i} times, error={err:#?}"
592                                            );
593                                        }
594                                        continue;
595                                    }
596                                }
597                            }
598                            _ => tracing::error!(
599                                iteration = i,
600                                "failed to run test: {e}\nretry after {delay:?}"
601                            ),
602                        }
603                    }
604                }
605            }
606            if let SqlCmd::SetBackgroundDdl { enable } = cmd {
607                background_ddl_enabled = enable;
608            };
609            if let Some(handle) = handle {
610                handle.await.unwrap();
611            }
612        }
613    }
614}
615
616pub async fn run_parallel_slt_task(glob: &str, jobs: usize) -> Result<(), ParallelTestError> {
617    let mut tester =
618        sqllogictest::Runner::new(|| RisingWave::connect("frontend".into(), "dev".into()));
619    tester.add_label("madsim");
620
621    if let Some(partitioner) = PARTITIONER.as_ref() {
622        tester.with_partitioner(partitioner.clone());
623    }
624
625    tester
626        .run_parallel_async(
627            glob,
628            vec!["frontend".into()],
629            |host, dbname| async move { RisingWave::connect(host, dbname).await.unwrap() },
630            jobs,
631        )
632        .await
633        .map_err(|e| panic!("{e}"))
634}
635
636/// Replace some strings in kafka.slt and write to a new temp file.
637fn hack_kafka_test(path: &Path) -> tempfile::NamedTempFile {
638    let content = std::fs::read_to_string(path).expect("failed to read file");
639    let simple_avsc_full_path =
640        std::fs::canonicalize("src/connector/src/test_data/simple-schema.avsc")
641            .expect("failed to get schema path");
642    let complex_avsc_full_path =
643        std::fs::canonicalize("src/connector/src/test_data/complex-schema.avsc")
644            .expect("failed to get schema path");
645    let json_schema_full_path =
646        std::fs::canonicalize("src/connector/src/test_data/complex-schema.json")
647            .expect("failed to get schema path");
648    let content = content
649        .replace("127.0.0.1:29092", "192.168.11.1:29092")
650        .replace("localhost:29092", "192.168.11.1:29092")
651        .replace(
652            "/risingwave/avro-simple-schema.avsc",
653            simple_avsc_full_path.to_str().unwrap(),
654        )
655        .replace(
656            "/risingwave/avro-complex-schema.avsc",
657            complex_avsc_full_path.to_str().unwrap(),
658        )
659        .replace(
660            "/risingwave/json-complex-schema",
661            json_schema_full_path.to_str().unwrap(),
662        );
663    let file = tempfile::NamedTempFile::new().expect("failed to create temp file");
664    std::fs::write(file.path(), content).expect("failed to write file");
665    println!("created a temp file for kafka test: {:?}", file.path());
666    file
667}
668
669#[cfg(test)]
670mod tests {
671    use std::fmt::Debug;
672
673    use expect_test::{Expect, expect};
674
675    use super::*;
676
677    fn check(actual: impl Debug, expect: Expect) {
678        let actual = format!("{:#?}", actual);
679        expect.assert_eq(&actual);
680    }
681
682    #[test]
683    fn test_is_create_table_as() {
684        assert!(is_create_table_as("     create     table xx  as select 1;"));
685        assert!(!is_create_table_as(
686            "     create table xx not  as select 1;"
687        ));
688        assert!(!is_create_table_as("     create view xx as select 1;"));
689    }
690
691    #[test]
692    fn test_extract_sql_command() {
693        check(
694            extract_sql_command("create  table  t as select 1;"),
695            expect![[r#"
696                Create {
697                    is_create_table_as: true,
698                }"#]],
699        );
700        check(
701            extract_sql_command("  create table  t (a int);"),
702            expect![[r#"
703                Create {
704                    is_create_table_as: false,
705                }"#]],
706        );
707        check(
708            extract_sql_command(" create materialized   view  m_1 as select 1;"),
709            expect![[r#"
710                CreateMaterializedView {
711                    name: "m_1",
712                }"#]],
713        );
714        check(
715            extract_sql_command("set background_ddl= true;"),
716            expect![[r#"
717                SetBackgroundDdl {
718                    enable: true,
719                }"#]],
720        );
721        check(
722            extract_sql_command("SET BACKGROUND_DDL=true;"),
723            expect![[r#"
724                SetBackgroundDdl {
725                    enable: true,
726                }"#]],
727        );
728        check(
729            extract_sql_command("CREATE MATERIALIZED VIEW if not exists m_1 as select 1;"),
730            expect![[r#"
731                CreateMaterializedView {
732                    name: "m_1",
733                }"#]],
734        )
735    }
736}