risingwave_simulation/
slt.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::min;
16use std::path::Path;
17use std::sync::Arc;
18use std::time::Duration;
19
20use anyhow::Result;
21use rand::{Rng, rng as thread_rng};
22use sqllogictest::substitution::well_known;
23use sqllogictest::{
24    Condition, ParallelTestError, Partitioner, QueryExpect, Record, StatementExpect,
25};
26
27use crate::client::RisingWave;
28use crate::cluster::Cluster;
29use crate::parse::extract_sql_command;
30use crate::slt::background_ddl_mode::*;
31use crate::slt::runner::{random_vnode_count, run_kill_not_allowed, run_no_kill};
32use crate::slt::slt_env::{Env, Opts};
33use crate::slt::vnode_mode::*;
34use crate::utils::TimedExt;
35use crate::{evaluate_skip, wait_or_retry_background_ddl};
36
37// retry a maximum times until it succeed
38const MAX_RETRY: usize = 10;
39
40#[derive(Debug, PartialEq, Eq)]
41pub enum SqlCmd {
42    /// Other create statements.
43    Create {
44        is_create_table_as: bool,
45    },
46    /// Create Materialized views
47    CreateMaterializedView {
48        name: String,
49    },
50    /// Set background ddl
51    SetBackgroundDdl {
52        enable: bool,
53    },
54    Drop,
55    Dml,
56    Flush,
57    Others,
58}
59
60impl SqlCmd {
61    fn allow_kill(&self) -> bool {
62        matches!(
63            self,
64            SqlCmd::Create {
65                // `create table as` is also not atomic in our system.
66                is_create_table_as: false,
67                ..
68            } | SqlCmd::CreateMaterializedView { .. }
69                | SqlCmd::Drop
70        )
71        // We won't kill during insert/update/delete/alter since the atomicity is not guaranteed.
72        // TODO: For `SqlCmd::Alter`, since table fragment and catalog commit for table schema change
73        // are not transactional, we can't kill during `alter table add/drop columns` for now, will
74        // remove it until transactional commit of table fragment and catalog is supported.
75    }
76
77    fn is_create(&self) -> bool {
78        matches!(
79            self,
80            SqlCmd::Create { .. } | SqlCmd::CreateMaterializedView { .. }
81        )
82    }
83}
84
85const KILL_IGNORE_FILES: &[&str] = &[
86    // TPCH queries are too slow for recovery.
87    "tpch_snapshot.slt",
88    "tpch_upstream.slt",
89    // Drop is not retryable in search path test.
90    "search_path.slt",
91    // Transaction statements are not retryable.
92    "transaction/now.slt",
93    "transaction/read_only_multi_conn.slt",
94    "transaction/read_only.slt",
95    "transaction/tolerance.slt",
96    "transaction/cursor.slt",
97    "transaction/cursor_multi_conn.slt",
98];
99
100/// Randomly set DDL statements to use `background_ddl`
101mod background_ddl_mode {
102    use anyhow::bail;
103    use rand::Rng;
104    use rand_chacha::ChaChaRng;
105    use sqllogictest::{Condition, Record, StatementExpect};
106
107    use crate::client::RisingWave;
108    use crate::slt::SqlCmd;
109    use crate::slt::slt_env::Env;
110
111    /// Wait for background mv to finish creating
112    pub(super) async fn wait_background_mv_finished(mview_name: &str) -> anyhow::Result<()> {
113        let Ok(rw) = RisingWave::connect("frontend".into(), "dev".into()).await else {
114            bail!("failed to connect to frontend for {mview_name}");
115        };
116        let client = rw.pg_client();
117        if client.simple_query("WAIT;").await.is_err() {
118            bail!("failed to wait for background mv to finish creating for {mview_name}");
119        }
120
121        let Ok(result) = client
122            .query(
123                "select count(*) from pg_matviews where matviewname=$1;",
124                &[&mview_name],
125            )
126            .await
127        else {
128            bail!("failed to query pg_matviews for {mview_name}");
129        };
130
131        match result[0].try_get::<_, i64>(0) {
132            Ok(1) => Ok(()),
133            r => bail!("expected 1 row in pg_matviews, got {r:#?} instead for {mview_name}"),
134        }
135    }
136
137    // TODO(kwannoel): move rng, `background_ddl` to `Env` struct
138    pub(super) async fn set_background_ddl<D, M, T>(
139        tester: &mut sqllogictest::Runner<D, M>,
140        env: &Env,
141        record: &Record<T>,
142        cmd: &SqlCmd,
143        manual_background_ddl_enabled: bool,
144        rng: &mut ChaChaRng,
145        background_ddl_enabled: &mut bool,
146    ) where
147        D: sqllogictest::AsyncDB<ColumnType = T>,
148        M: sqllogictest::MakeConnection<Conn = D>,
149        T: sqllogictest::ColumnType,
150    {
151        if let Record::Statement {
152            loc,
153            conditions,
154            connection,
155            ..
156        } = &record
157            && matches!(cmd, SqlCmd::CreateMaterializedView { .. })
158            && !manual_background_ddl_enabled
159            && conditions.iter().all(|c| {
160                *c != Condition::SkipIf {
161                    label: "madsim".to_owned(),
162                }
163            })
164            && env.background_ddl_rate() > 0.0
165        {
166            let background_ddl_setting = rng.random_bool(env.background_ddl_rate());
167            let set_background_ddl = Record::Statement {
168                loc: loc.clone(),
169                conditions: conditions.clone(),
170                connection: connection.clone(),
171                expected: StatementExpect::Ok,
172                sql: format!("SET BACKGROUND_DDL={background_ddl_setting};"),
173                retry: None,
174            };
175            tester.run_async(set_background_ddl).await.unwrap();
176            *background_ddl_enabled = background_ddl_setting;
177        };
178    }
179
180    // NOTE(kwannoel): only applicable to mvs currently
181    #[macro_export]
182    macro_rules! wait_or_retry_background_ddl {
183        ($record:expr, $name:expr, $iteration:expr) => {
184            let record = $record;
185            let i = $iteration;
186            tracing::debug!(iteration = i, "Retry for background ddl");
187            match wait_background_mv_finished($name).await {
188                Ok(_) => {
189                    tracing::debug!(
190                        iteration = i,
191                        "Record with background_ddl {:?} finished",
192                        record
193                    );
194                    break;
195                }
196                Err(err) => {
197                    tracing::error!(
198                        iteration = i,
199                        ?err,
200                        "failed to wait for background mv to finish creating"
201                    );
202                    if i >= MAX_RETRY {
203                        panic!("failed to run test after retry {i} times, error={err:#?}");
204                    }
205                    continue;
206                }
207            }
208        };
209    }
210}
211
212mod vnode_mode {
213    use std::env;
214    use std::hash::{DefaultHasher, Hash, Hasher};
215    use std::sync::LazyLock;
216
217    use anyhow::bail;
218    use sqllogictest::Partitioner;
219
220    // Copied from sqllogictest-bin.
221    #[derive(Clone)]
222    pub(super) struct HashPartitioner {
223        count: u64,
224        id: u64,
225    }
226
227    impl HashPartitioner {
228        pub(super) fn new(count: u64, id: u64) -> anyhow::Result<Self> {
229            if count == 0 {
230                bail!("partition count must be greater than zero");
231            }
232            if id >= count {
233                bail!("partition id (zero-based) must be less than count");
234            }
235            Ok(Self { count, id })
236        }
237    }
238
239    impl Partitioner for HashPartitioner {
240        fn matches(&self, file_name: &str) -> bool {
241            let mut hasher = DefaultHasher::new();
242            file_name.hash(&mut hasher);
243            hasher.finish() % self.count == self.id
244        }
245    }
246
247    pub(super) static PARTITIONER: LazyLock<Option<HashPartitioner>> = LazyLock::new(|| {
248        let count = env::var("BUILDKITE_PARALLEL_JOB_COUNT")
249            .ok()?
250            .parse::<u64>()
251            .unwrap();
252        let id = env::var("BUILDKITE_PARALLEL_JOB")
253            .ok()?
254            .parse::<u64>()
255            .unwrap();
256        Some(HashPartitioner::new(count, id).unwrap())
257    });
258}
259
260pub mod slt_env {
261    use rand::SeedableRng;
262    use rand_chacha::ChaChaRng;
263
264    use crate::cluster::KillOpts;
265
266    pub struct Opts {
267        pub kill_opts: KillOpts,
268        /// Probability of `background_ddl` being set to true per ddl record.
269        pub background_ddl_rate: f64,
270        /// Set vnode count (`STREAMING_MAX_PARALLELISM`) to random value before running DDL.
271        pub random_vnode_count: bool,
272    }
273
274    pub(super) struct Env {
275        opts: Opts,
276    }
277
278    impl Env {
279        pub fn new(opts: Opts) -> Self {
280            Self { opts }
281        }
282
283        pub fn get_rng() -> ChaChaRng {
284            let seed = std::env::var("MADSIM_TEST_SEED")
285                .unwrap_or("0".to_owned())
286                .parse::<u64>()
287                .unwrap();
288            ChaChaRng::seed_from_u64(seed)
289        }
290
291        pub fn background_ddl_rate(&self) -> f64 {
292            self.opts.background_ddl_rate
293        }
294
295        pub fn kill(&self) -> bool {
296            self.opts.kill_opts.kill_compute
297                || self.opts.kill_opts.kill_meta
298                || self.opts.kill_opts.kill_frontend
299                || self.opts.kill_opts.kill_compactor
300        }
301
302        pub fn random_vnode_count(&self) -> bool {
303            self.opts.random_vnode_count
304        }
305
306        pub fn kill_opts(&self) -> KillOpts {
307            self.opts.kill_opts
308        }
309
310        pub fn kill_rate(&self) -> f64 {
311            self.opts.kill_opts.kill_rate as f64
312        }
313    }
314}
315
316mod runner {
317    use std::time::Duration;
318
319    use rand::prelude::IteratorRandom;
320    use rand::rng as thread_rng;
321    use sqllogictest::{Record, StatementExpect, TestError};
322
323    use crate::slt::slt_env::Env;
324    use crate::slt::{MAX_RETRY, SqlCmd};
325    use crate::utils::TimedExt;
326    #[macro_export]
327    macro_rules! evaluate_skip {
328        ($env:expr, $path:expr) => {
329            if let Some(partitioner) = PARTITIONER.as_ref()
330                && !partitioner.matches($path.to_str().unwrap())
331            {
332                println!("[skip partition] {}", $path.display());
333                continue;
334            } else if $env.kill() && KILL_IGNORE_FILES.iter().any(|s| $path.ends_with(s)) {
335                println!("[skip kill] {}", $path.display());
336                continue;
337            } else {
338                println!("[run] {}", $path.display());
339            }
340        };
341    }
342
343    pub(super) fn random_vnode_count<T: sqllogictest::ColumnType>(
344        env: &Env,
345        records: &[Record<T>],
346    ) -> bool {
347        env.random_vnode_count()
348            && records.iter().all(|record| {
349                if let Record::Statement { sql, .. } | Record::Query { sql, .. } = record
350                    && sql.to_lowercase().contains("parallelism")
351                {
352                    println!("[RANDOM VNODE COUNT] skip: {}", sql);
353                    false
354                } else {
355                    true
356                }
357            })
358    }
359
360    /// `kill` is totally disabled.
361    pub(super) async fn run_no_kill<D, M, T>(
362        tester: &mut sqllogictest::Runner<D, M>,
363        random_vnode_count: bool,
364        cmd: &SqlCmd,
365        record: Record<T>,
366    ) -> Result<(), TestError>
367    where
368        D: sqllogictest::AsyncDB<ColumnType = T>,
369        M: sqllogictest::MakeConnection<Conn = D>,
370        T: sqllogictest::ColumnType,
371    {
372        // Set random vnode count if needed.
373        if random_vnode_count
374            && cmd.is_create()
375            && let Record::Statement {
376                loc,
377                conditions,
378                connection,
379                ..
380            } = &record
381        {
382            let vnode_count = (2..=64) // small
383                .chain(224..=288) // normal
384                .chain(992..=1056) // 1024 affects row id gen behavior
385                .choose(&mut thread_rng())
386                .unwrap();
387            let sql = format!("SET STREAMING_MAX_PARALLELISM = {vnode_count};");
388            println!("[RANDOM VNODE COUNT] set: {vnode_count}");
389            let set_random_vnode_count = Record::Statement {
390                loc: loc.clone(),
391                conditions: conditions.clone(),
392                connection: connection.clone(),
393                sql,
394                expected: StatementExpect::Ok,
395                retry: None,
396            };
397            tester.run_async(set_random_vnode_count).await.unwrap();
398            println!("[RANDOM VNODE COUNT] run: {record}");
399        }
400
401        tester.run_async(record).await.map(|_| ())
402    }
403
404    /// Used when `kill` is not allowed for some specific commands
405    pub(super) async fn run_kill_not_allowed<D, M, T>(
406        tester: &mut sqllogictest::Runner<D, M>,
407        record: Record<T>,
408    ) where
409        D: sqllogictest::AsyncDB<ColumnType = T>,
410        M: sqllogictest::MakeConnection<Conn = D>,
411        T: sqllogictest::ColumnType,
412    {
413        for i in 0usize.. {
414            let delay = Duration::from_secs(1 << i);
415            if let Err(err) = tester
416                .run_async(record.clone())
417                .timed(|_res, elapsed| {
418                    tracing::debug!("Record {:?} finished in {:?}", record, elapsed)
419                })
420                .await
421            {
422                let err_string = err.to_string().to_ascii_lowercase();
423                // cluster could be still under recovering if killed before, retry if
424                // meets `no reader for dml in table with id {}`.
425                let allowed_errs = [
426                    "no reader for dml in table",
427                    "error reading a body from connection: broken pipe",
428                    "failed to inject barrier",
429                    "get error from control stream",
430                    "cluster is under recovering",
431                    "streaming vnode mapping has not been initialized",
432                    "streaming vnode mapping not found",
433                ];
434                let should_retry = i < MAX_RETRY
435                    && allowed_errs
436                        .iter()
437                        .any(|allowed_err| err_string.contains(&allowed_err.to_ascii_lowercase()));
438                if !should_retry {
439                    panic!("{}", err);
440                }
441                tracing::error!("failed to run test: {err}\nretry after {delay:?}");
442            } else {
443                break;
444            }
445            tokio::time::sleep(delay).await;
446        }
447    }
448}
449
450/// Run the sqllogictest files in `glob`.
451pub async fn run_slt_task(cluster: Arc<Cluster>, glob: &str, opts: Opts) {
452    let env = slt_env::Env::new(opts);
453    tracing::info!("background_ddl_rate: {}", env.background_ddl_rate());
454    let mut rng = Env::get_rng();
455    let files = glob::glob(glob).expect("failed to read glob pattern");
456    for file in files {
457        // use a session per file
458        let mut tester =
459            sqllogictest::Runner::new(|| RisingWave::connect("frontend".into(), "dev".into()));
460        tester.add_label("madsim");
461        tester.set_var(well_known::DATABASE.to_owned(), "dev".to_owned());
462
463        let file = file.unwrap();
464        let path = file.as_path();
465
466        evaluate_skip!(env, path);
467
468        // XXX: hack for kafka source test
469        let tempfile = (path.ends_with("kafka.slt") || path.ends_with("kafka_batch.slt"))
470            .then(|| hack_kafka_test(path));
471        let path = tempfile.as_ref().map(|p| p.path()).unwrap_or(path);
472
473        // NOTE(kwannoel): For background ddl
474        let mut background_ddl_enabled = false;
475
476        // If background ddl is set to true within the test case, prevent random setting of background_ddl to true.
477        // We can revert it back to false only if we encounter a record that sets background_ddl to false.
478        let mut manual_background_ddl_enabled = false;
479
480        let records = sqllogictest::parse_file(path).expect("failed to parse file");
481        let random_vnode_count = random_vnode_count(&env, &records);
482
483        for record in records {
484            // uncomment to print metrics for task counts
485            // let metrics = madsim::runtime::Handle::current().metrics();
486            // println!("{:#?}", metrics);
487            // println!("{}", metrics.num_tasks_by_node_by_spawn());
488            if let sqllogictest::Record::Halt { .. } = record {
489                break;
490            }
491
492            let cmd = match &record {
493                sqllogictest::Record::Statement {
494                    sql, conditions, ..
495                }
496                | sqllogictest::Record::Query {
497                    sql, conditions, ..
498                } if conditions
499                    .iter()
500                    .all(|c| !matches!(c, Condition::SkipIf{ label } if label == "madsim"))
501                    && !conditions
502                        .iter()
503                        .any(|c| matches!(c, Condition::OnlyIf{ label} if label != "madsim" )) =>
504                {
505                    extract_sql_command(sql).unwrap_or(SqlCmd::Others)
506                }
507                _ => SqlCmd::Others,
508            };
509
510            // For normal records.
511            if !env.kill() {
512                match run_no_kill(&mut tester, random_vnode_count, &cmd, record).await {
513                    Ok(_) => continue,
514                    Err(e) => panic!("{}", e),
515                }
516            }
517
518            // For kill enabled.
519            tracing::debug!(?cmd, "Running");
520
521            if let SqlCmd::SetBackgroundDdl { enable } = cmd {
522                manual_background_ddl_enabled = enable;
523                background_ddl_enabled = enable;
524            }
525
526            // For each background ddl compatible statement, provide a chance for background_ddl=true.
527            set_background_ddl(
528                &mut tester,
529                &env,
530                &record,
531                &cmd,
532                manual_background_ddl_enabled,
533                &mut rng,
534                &mut background_ddl_enabled,
535            )
536            .await;
537
538            if !cmd.allow_kill() {
539                run_kill_not_allowed(&mut tester, record.clone()).await;
540                continue;
541            }
542
543            let should_kill = thread_rng().random_bool(env.kill_rate());
544            // spawn a background task to kill nodes
545            let handle = if should_kill {
546                let cluster = cluster.clone();
547                let opts = env.kill_opts();
548                Some(tokio::spawn(async move {
549                    let t = thread_rng().random_range(Duration::default()..Duration::from_secs(1));
550                    tokio::time::sleep(t).await;
551                    cluster.kill_node(&opts).await;
552                    tokio::time::sleep(Duration::from_secs(15)).await;
553                }))
554            } else {
555                None
556            };
557
558            for i in 0usize.. {
559                tracing::debug!(iteration = i, "retry count");
560                let delay = Duration::from_secs(min(1 << i, 10));
561                if i > 0 {
562                    tokio::time::sleep(delay).await;
563                }
564                match tester
565                    .run_async(record.clone())
566                    .timed(|_res, elapsed| {
567                        tracing::debug!("Record {:?} finished in {:?}", record, elapsed)
568                    })
569                    .await
570                {
571                    Ok(_) => {
572                        // For background ddl
573                        if let SqlCmd::CreateMaterializedView { ref name } = cmd
574                            && background_ddl_enabled
575                            && !matches!(
576                                record,
577                                Record::Statement {
578                                    expected: StatementExpect::Error(_),
579                                    ..
580                                } | Record::Query {
581                                    expected: QueryExpect::Error(_),
582                                    ..
583                                }
584                            )
585                        {
586                            wait_or_retry_background_ddl!(&record, name, i);
587                        }
588                        break;
589                    }
590                    Err(e) => {
591                        match cmd {
592                            // allow 'table exists' error when retry CREATE statement
593                            SqlCmd::Create {
594                                is_create_table_as: false,
595                            }
596                            | SqlCmd::CreateMaterializedView { .. }
597                                if i != 0
598                                    && let e = e.to_string()
599                                    // It should not be a gRPC request to meta error,
600                                    // otherwise it means that the catalog is not yet populated to fe.
601                                    && !e.contains("gRPC request to meta service failed")
602                                    && e.contains("exists")
603                                    && !e.contains("under creation")
604                                    && e.contains("Catalog error") =>
605                            {
606                                break;
607                            }
608                            // allow 'not found' error when retry DROP statement
609                            SqlCmd::Drop
610                                if i != 0
611                                    && e.to_string().contains("not found")
612                                    && e.to_string().contains("Catalog error") =>
613                            {
614                                break;
615                            }
616
617                            // Keep i >= MAX_RETRY for other errors. Since these errors indicate that the MV might not yet be created.
618                            _ if i >= MAX_RETRY => {
619                                panic!("failed to run test after retry {i} times: {e}")
620                            }
621                            SqlCmd::CreateMaterializedView { ref name }
622                                if i != 0
623                                    && e.to_string().contains("table is in creating procedure")
624                                    && background_ddl_enabled =>
625                            {
626                                wait_or_retry_background_ddl!(&record, name, i);
627                            }
628                            _ => tracing::error!(
629                                iteration = i,
630                                "failed to run test: {e}\nretry after {delay:?}"
631                            ),
632                        }
633                    }
634                }
635            }
636            if let SqlCmd::SetBackgroundDdl { enable } = cmd {
637                background_ddl_enabled = enable;
638            };
639            if let Some(handle) = handle {
640                handle.await.unwrap();
641            }
642        }
643    }
644}
645
646pub async fn run_parallel_slt_task(glob: &str, jobs: usize) -> Result<(), ParallelTestError> {
647    let mut tester =
648        sqllogictest::Runner::new(|| RisingWave::connect("frontend".into(), "dev".into()));
649    tester.add_label("madsim");
650
651    if let Some(partitioner) = PARTITIONER.as_ref() {
652        tester.with_partitioner(partitioner.clone());
653    }
654
655    tester
656        .run_parallel_async(
657            glob,
658            vec!["frontend".into()],
659            |host, dbname| async move { RisingWave::connect(host, dbname).await.unwrap() },
660            jobs,
661        )
662        .await
663        .map_err(|e| panic!("{e}"))
664}
665
666/// Replace some strings in kafka.slt and write to a new temp file.
667fn hack_kafka_test(path: &Path) -> tempfile::NamedTempFile {
668    let content = std::fs::read_to_string(path).expect("failed to read file");
669    let simple_avsc_full_path =
670        std::fs::canonicalize("src/connector/src/test_data/simple-schema.avsc")
671            .expect("failed to get schema path");
672    let complex_avsc_full_path =
673        std::fs::canonicalize("src/connector/src/test_data/complex-schema.avsc")
674            .expect("failed to get schema path");
675    let json_schema_full_path =
676        std::fs::canonicalize("src/connector/src/test_data/complex-schema.json")
677            .expect("failed to get schema path");
678    let content = content
679        .replace("127.0.0.1:29092", "192.168.11.1:29092")
680        .replace("localhost:29092", "192.168.11.1:29092")
681        .replace(
682            "/risingwave/avro-simple-schema.avsc",
683            simple_avsc_full_path.to_str().unwrap(),
684        )
685        .replace(
686            "/risingwave/avro-complex-schema.avsc",
687            complex_avsc_full_path.to_str().unwrap(),
688        )
689        .replace(
690            "/risingwave/json-complex-schema",
691            json_schema_full_path.to_str().unwrap(),
692        );
693    let file = tempfile::NamedTempFile::new().expect("failed to create temp file");
694    std::fs::write(file.path(), content).expect("failed to write file");
695    println!("created a temp file for kafka test: {:?}", file.path());
696    file
697}
698
699#[cfg(test)]
700mod tests {
701    use std::fmt::Debug;
702
703    use expect_test::{Expect, expect};
704
705    use super::*;
706
707    fn check(actual: impl Debug, expect: Expect) {
708        let actual = format!("{:#?}", actual);
709        expect.assert_eq(&actual);
710    }
711
712    #[test]
713    fn test_extract_sql_command() {
714        check(
715            extract_sql_command("create  table  t as select 1;"),
716            expect![[r#"
717                Ok(
718                    Create {
719                        is_create_table_as: true,
720                    },
721                )"#]],
722        );
723        check(
724            extract_sql_command("  create table  t (a int);"),
725            expect![[r#"
726                Ok(
727                    Create {
728                        is_create_table_as: false,
729                    },
730                )"#]],
731        );
732        check(
733            extract_sql_command(" create materialized   view  m_1 as select 1;"),
734            expect![[r#"
735                Ok(
736                    CreateMaterializedView {
737                        name: "m_1",
738                    },
739                )"#]],
740        );
741        check(
742            extract_sql_command("set background_ddl= true;"),
743            expect![[r#"
744                Ok(
745                    SetBackgroundDdl {
746                        enable: true,
747                    },
748                )"#]],
749        );
750        check(
751            extract_sql_command("SET BACKGROUND_DDL=true;"),
752            expect![[r#"
753                Ok(
754                    SetBackgroundDdl {
755                        enable: true,
756                    },
757                )"#]],
758        );
759        check(
760            extract_sql_command("CREATE MATERIALIZED VIEW if not exists m_1 as select 1;"),
761            expect![[r#"
762                Ok(
763                    CreateMaterializedView {
764                        name: "m_1",
765                    },
766                )"#]],
767        )
768    }
769}