risingwave_simulation/
slt.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::min;
16use std::path::Path;
17use std::sync::Arc;
18use std::time::Duration;
19
20use anyhow::Result;
21use rand::{Rng, rng as thread_rng};
22use sqllogictest::substitution::well_known;
23use sqllogictest::{
24    Condition, ParallelTestError, Partitioner, QueryExpect, Record, StatementExpect,
25};
26
27use crate::client::RisingWave;
28use crate::cluster::Cluster;
29use crate::parse::extract_sql_command;
30use crate::slt::background_ddl_mode::*;
31use crate::slt::runner::{random_vnode_count, run_kill_not_allowed, run_no_kill};
32use crate::slt::slt_env::{Env, Opts};
33use crate::slt::vnode_mode::*;
34use crate::utils::TimedExt;
35use crate::{evaluate_skip, wait_or_retry_background_ddl};
36
37// retry a maximum times until it succeed
38const MAX_RETRY: usize = 10;
39
40#[derive(Debug, PartialEq, Eq)]
41pub enum SqlCmd {
42    /// Other create statements.
43    Create {
44        is_create_table_as: bool,
45    },
46    /// Create Materialized views
47    CreateMaterializedView {
48        name: String,
49    },
50    /// Set background ddl
51    SetBackgroundDdl {
52        enable: bool,
53    },
54    Drop,
55    Dml,
56    Flush,
57    Others,
58}
59
60impl SqlCmd {
61    fn allow_kill(&self) -> bool {
62        matches!(
63            self,
64            SqlCmd::Create {
65                // `create table as` is also not atomic in our system.
66                is_create_table_as: false,
67                ..
68            } | SqlCmd::CreateMaterializedView { .. }
69                | SqlCmd::Drop
70        )
71        // We won't kill during insert/update/delete/alter since the atomicity is not guaranteed.
72        // TODO: For `SqlCmd::Alter`, since table fragment and catalog commit for table schema change
73        // are not transactional, we can't kill during `alter table add/drop columns` for now, will
74        // remove it until transactional commit of table fragment and catalog is supported.
75    }
76
77    fn is_create(&self) -> bool {
78        matches!(
79            self,
80            SqlCmd::Create { .. } | SqlCmd::CreateMaterializedView { .. }
81        )
82    }
83}
84
85const KILL_IGNORE_FILES: &[&str] = &[
86    // TPCH queries are too slow for recovery.
87    "tpch_snapshot.slt",
88    "tpch_upstream.slt",
89    // Drop is not retryable in search path test.
90    "search_path.slt",
91    // Transaction statements are not retryable.
92    "transaction/now.slt",
93    "transaction/read_only_multi_conn.slt",
94    "transaction/read_only.slt",
95    "transaction/tolerance.slt",
96    "transaction/cursor.slt",
97    "transaction/cursor_multi_conn.slt",
98];
99
100/// Randomly set DDL statements to use `background_ddl`
101mod background_ddl_mode {
102    use anyhow::bail;
103    use rand::Rng;
104    use rand_chacha::ChaChaRng;
105    use sqllogictest::{Condition, Record, StatementExpect};
106
107    use crate::client::RisingWave;
108    use crate::slt::SqlCmd;
109    use crate::slt::slt_env::Env;
110
111    /// Wait for background mv to finish creating
112    pub(super) async fn wait_background_mv_finished(mview_name: &str) -> anyhow::Result<()> {
113        let Ok(rw) = RisingWave::connect("frontend".into(), "dev".into()).await else {
114            bail!("failed to connect to frontend for {mview_name}");
115        };
116        let client = rw.pg_client();
117        if client.simple_query("WAIT;").await.is_err() {
118            bail!("failed to wait for background mv to finish creating for {mview_name}");
119        }
120
121        let Ok(result) = client
122            .query(
123                "select count(*) from pg_matviews where matviewname=$1;",
124                &[&mview_name],
125            )
126            .await
127        else {
128            bail!("failed to query pg_matviews for {mview_name}");
129        };
130
131        match result[0].try_get::<_, i64>(0) {
132            Ok(1) => Ok(()),
133            r => bail!("expected 1 row in pg_matviews, got {r:#?} instead for {mview_name}"),
134        }
135    }
136
137    // TODO(kwannoel): move rng, `background_ddl` to `Env` struct
138    pub(super) async fn set_background_ddl<D, M, T>(
139        tester: &mut sqllogictest::Runner<D, M>,
140        env: &Env,
141        record: &Record<T>,
142        cmd: &SqlCmd,
143        manual_background_ddl_enabled: bool,
144        rng: &mut ChaChaRng,
145        background_ddl_enabled: &mut bool,
146    ) where
147        D: sqllogictest::AsyncDB<ColumnType = T>,
148        M: sqllogictest::MakeConnection<Conn = D>,
149        T: sqllogictest::ColumnType,
150    {
151        if let Record::Statement {
152            loc,
153            conditions,
154            connection,
155            ..
156        } = &record
157            && matches!(cmd, SqlCmd::CreateMaterializedView { .. })
158            && !manual_background_ddl_enabled
159            && conditions.iter().all(|c| {
160                *c != Condition::SkipIf {
161                    label: "madsim".to_owned(),
162                }
163            })
164            && env.background_ddl_rate() > 0.0
165        {
166            let background_ddl_setting = rng.random_bool(env.background_ddl_rate());
167            let set_background_ddl = Record::Statement {
168                loc: loc.clone(),
169                conditions: conditions.clone(),
170                connection: connection.clone(),
171                expected: StatementExpect::Ok,
172                sql: format!("SET BACKGROUND_DDL={background_ddl_setting};"),
173                retry: None,
174            };
175            tester.run_async(set_background_ddl).await.unwrap();
176            *background_ddl_enabled = background_ddl_setting;
177        };
178    }
179
180    // NOTE(kwannoel): only applicable to mvs currently
181    #[macro_export]
182    macro_rules! wait_or_retry_background_ddl {
183        ($record:expr, $name:expr, $iteration:expr) => {
184            let record = $record;
185            let i = $iteration;
186            tracing::debug!(iteration = i, "Retry for background ddl");
187            match wait_background_mv_finished($name).await {
188                Ok(_) => {
189                    tracing::debug!(
190                        iteration = i,
191                        "Record with background_ddl {:?} finished",
192                        record
193                    );
194                    break;
195                }
196                Err(err) => {
197                    tracing::error!(
198                        iteration = i,
199                        ?err,
200                        "failed to wait for background mv to finish creating"
201                    );
202                    if i >= MAX_RETRY {
203                        panic!("failed to run test after retry {i} times, error={err:#?}");
204                    }
205                    continue;
206                }
207            }
208        };
209    }
210}
211
212mod vnode_mode {
213    use std::env;
214    use std::hash::{DefaultHasher, Hash, Hasher};
215    use std::sync::LazyLock;
216
217    use anyhow::bail;
218    use sqllogictest::Partitioner;
219
220    // Copied from sqllogictest-bin.
221    #[derive(Clone)]
222    pub(super) struct HashPartitioner {
223        count: u64,
224        id: u64,
225    }
226
227    impl HashPartitioner {
228        pub(super) fn new(count: u64, id: u64) -> anyhow::Result<Self> {
229            if count == 0 {
230                bail!("partition count must be greater than zero");
231            }
232            if id >= count {
233                bail!("partition id (zero-based) must be less than count");
234            }
235            Ok(Self { count, id })
236        }
237    }
238
239    impl Partitioner for HashPartitioner {
240        fn matches(&self, file_name: &str) -> bool {
241            let mut hasher = DefaultHasher::new();
242            file_name.hash(&mut hasher);
243            hasher.finish() % self.count == self.id
244        }
245    }
246
247    pub(super) static PARTITIONER: LazyLock<Option<HashPartitioner>> = LazyLock::new(|| {
248        let count = env::var("BUILDKITE_PARALLEL_JOB_COUNT")
249            .ok()?
250            .parse::<u64>()
251            .unwrap();
252        let id = env::var("BUILDKITE_PARALLEL_JOB")
253            .ok()?
254            .parse::<u64>()
255            .unwrap();
256        Some(HashPartitioner::new(count, id).unwrap())
257    });
258}
259
260pub mod slt_env {
261    use rand::SeedableRng;
262    use rand_chacha::ChaChaRng;
263
264    use crate::cluster::KillOpts;
265
266    pub struct Opts {
267        pub kill_opts: KillOpts,
268        /// Probability of `background_ddl` being set to true per ddl record.
269        pub background_ddl_rate: f64,
270        /// Set vnode count (`STREAMING_MAX_PARALLELISM`) to random value before running DDL.
271        pub random_vnode_count: bool,
272    }
273
274    pub(super) struct Env {
275        opts: Opts,
276    }
277
278    impl Env {
279        pub fn new(opts: Opts) -> Self {
280            Self { opts }
281        }
282
283        pub fn get_rng() -> ChaChaRng {
284            let seed = std::env::var("MADSIM_TEST_SEED")
285                .unwrap_or("0".to_owned())
286                .parse::<u64>()
287                .unwrap();
288            ChaChaRng::seed_from_u64(seed)
289        }
290
291        pub fn background_ddl_rate(&self) -> f64 {
292            self.opts.background_ddl_rate
293        }
294
295        pub fn kill(&self) -> bool {
296            self.opts.kill_opts.kill_compute
297                || self.opts.kill_opts.kill_meta
298                || self.opts.kill_opts.kill_frontend
299                || self.opts.kill_opts.kill_compactor
300        }
301
302        pub fn random_vnode_count(&self) -> bool {
303            self.opts.random_vnode_count
304        }
305
306        pub fn kill_opts(&self) -> KillOpts {
307            self.opts.kill_opts
308        }
309
310        pub fn kill_rate(&self) -> f64 {
311            self.opts.kill_opts.kill_rate as f64
312        }
313    }
314}
315
316mod runner {
317    use std::time::Duration;
318
319    use rand::prelude::IteratorRandom;
320    use rand::rng as thread_rng;
321    use sqllogictest::{Record, StatementExpect, TestError};
322
323    use crate::slt::slt_env::Env;
324    use crate::slt::{MAX_RETRY, SqlCmd};
325    use crate::utils::TimedExt;
326    #[macro_export]
327    macro_rules! evaluate_skip {
328        ($env:expr, $path:expr) => {
329            if let Some(partitioner) = PARTITIONER.as_ref()
330                && !partitioner.matches($path.to_str().unwrap())
331            {
332                println!("[skip partition] {}", $path.display());
333                continue;
334            } else if $env.kill() && KILL_IGNORE_FILES.iter().any(|s| $path.ends_with(s)) {
335                println!("[skip kill] {}", $path.display());
336                continue;
337            } else {
338                println!("[run] {}", $path.display());
339            }
340        };
341    }
342
343    pub(super) fn random_vnode_count<T: sqllogictest::ColumnType>(
344        env: &Env,
345        records: &[Record<T>],
346    ) -> bool {
347        env.random_vnode_count()
348            && records.iter().all(|record| {
349                if let Record::Statement { sql, .. } | Record::Query { sql, .. } = record
350                    && sql.to_lowercase().contains("parallelism")
351                {
352                    println!("[RANDOM VNODE COUNT] skip: {}", sql);
353                    false
354                } else {
355                    true
356                }
357            })
358    }
359
360    /// `kill` is totally disabled.
361    pub(super) async fn run_no_kill<D, M, T>(
362        tester: &mut sqllogictest::Runner<D, M>,
363        random_vnode_count: bool,
364        cmd: &SqlCmd,
365        record: Record<T>,
366    ) -> Result<(), TestError>
367    where
368        D: sqllogictest::AsyncDB<ColumnType = T>,
369        M: sqllogictest::MakeConnection<Conn = D>,
370        T: sqllogictest::ColumnType,
371    {
372        // Set random vnode count if needed.
373        if random_vnode_count
374            && cmd.is_create()
375            && let Record::Statement {
376                loc,
377                conditions,
378                connection,
379                ..
380            } = &record
381        {
382            let vnode_count = (2..=64) // small
383                .chain(224..=288) // normal
384                .chain(992..=1056) // 1024 affects row id gen behavior
385                .choose(&mut thread_rng())
386                .unwrap();
387            let sql = format!("SET STREAMING_MAX_PARALLELISM = {vnode_count};");
388            println!("[RANDOM VNODE COUNT] set: {vnode_count}");
389            let set_random_vnode_count = Record::Statement {
390                loc: loc.clone(),
391                conditions: conditions.clone(),
392                connection: connection.clone(),
393                sql,
394                expected: StatementExpect::Ok,
395                retry: None,
396            };
397            tester.run_async(set_random_vnode_count).await.unwrap();
398            println!("[RANDOM VNODE COUNT] run: {record}");
399        }
400
401        tester.run_async(record).await.map(|_| ())
402    }
403
404    /// Used when `kill` is not allowed for some specific commands
405    pub(super) async fn run_kill_not_allowed<D, M, T>(
406        tester: &mut sqllogictest::Runner<D, M>,
407        record: Record<T>,
408    ) where
409        D: sqllogictest::AsyncDB<ColumnType = T>,
410        M: sqllogictest::MakeConnection<Conn = D>,
411        T: sqllogictest::ColumnType,
412    {
413        for i in 0usize.. {
414            let delay = Duration::from_secs(1 << i);
415            if let Err(err) = tester
416                .run_async(record.clone())
417                .timed(|_res, elapsed| {
418                    tracing::debug!("Record {:?} finished in {:?}", record, elapsed)
419                })
420                .await
421            {
422                let err_string = err.to_string();
423                // cluster could be still under recovering if killed before, retry if
424                // meets `no reader for dml in table with id {}`.
425                let allowed_errs = [
426                    "no reader for dml in table",
427                    "error reading a body from connection: broken pipe",
428                    "failed to inject barrier",
429                    "get error from control stream",
430                    "cluster is under recovering",
431                ];
432                let should_retry = i < MAX_RETRY
433                    && allowed_errs
434                        .iter()
435                        .any(|allowed_err| err_string.contains(allowed_err));
436                if !should_retry {
437                    panic!("{}", err);
438                }
439                tracing::error!("failed to run test: {err}\nretry after {delay:?}");
440            } else {
441                break;
442            }
443            tokio::time::sleep(delay).await;
444        }
445    }
446}
447
448/// Run the sqllogictest files in `glob`.
449pub async fn run_slt_task(cluster: Arc<Cluster>, glob: &str, opts: Opts) {
450    let env = slt_env::Env::new(opts);
451    tracing::info!("background_ddl_rate: {}", env.background_ddl_rate());
452    let mut rng = Env::get_rng();
453    let files = glob::glob(glob).expect("failed to read glob pattern");
454    for file in files {
455        // use a session per file
456        let mut tester =
457            sqllogictest::Runner::new(|| RisingWave::connect("frontend".into(), "dev".into()));
458        tester.add_label("madsim");
459        tester.set_var(well_known::DATABASE.to_owned(), "dev".to_owned());
460
461        let file = file.unwrap();
462        let path = file.as_path();
463
464        evaluate_skip!(env, path);
465
466        // XXX: hack for kafka source test
467        let tempfile = (path.ends_with("kafka.slt") || path.ends_with("kafka_batch.slt"))
468            .then(|| hack_kafka_test(path));
469        let path = tempfile.as_ref().map(|p| p.path()).unwrap_or(path);
470
471        // NOTE(kwannoel): For background ddl
472        let mut background_ddl_enabled = false;
473
474        // If background ddl is set to true within the test case, prevent random setting of background_ddl to true.
475        // We can revert it back to false only if we encounter a record that sets background_ddl to false.
476        let mut manual_background_ddl_enabled = false;
477
478        let records = sqllogictest::parse_file(path).expect("failed to parse file");
479        let random_vnode_count = random_vnode_count(&env, &records);
480
481        for record in records {
482            // uncomment to print metrics for task counts
483            // let metrics = madsim::runtime::Handle::current().metrics();
484            // println!("{:#?}", metrics);
485            // println!("{}", metrics.num_tasks_by_node_by_spawn());
486            if let sqllogictest::Record::Halt { .. } = record {
487                break;
488            }
489
490            let cmd = match &record {
491                sqllogictest::Record::Statement {
492                    sql, conditions, ..
493                }
494                | sqllogictest::Record::Query {
495                    sql, conditions, ..
496                } if conditions
497                    .iter()
498                    .all(|c| !matches!(c, Condition::SkipIf{ label } if label == "madsim"))
499                    && !conditions
500                        .iter()
501                        .any(|c| matches!(c, Condition::OnlyIf{ label} if label != "madsim" )) =>
502                {
503                    extract_sql_command(sql).unwrap_or(SqlCmd::Others)
504                }
505                _ => SqlCmd::Others,
506            };
507
508            // For normal records.
509            if !env.kill() {
510                match run_no_kill(&mut tester, random_vnode_count, &cmd, record).await {
511                    Ok(_) => continue,
512                    Err(e) => panic!("{}", e),
513                }
514            }
515
516            // For kill enabled.
517            tracing::debug!(?cmd, "Running");
518
519            if let SqlCmd::SetBackgroundDdl { enable } = cmd {
520                manual_background_ddl_enabled = enable;
521                background_ddl_enabled = enable;
522            }
523
524            // For each background ddl compatible statement, provide a chance for background_ddl=true.
525            set_background_ddl(
526                &mut tester,
527                &env,
528                &record,
529                &cmd,
530                manual_background_ddl_enabled,
531                &mut rng,
532                &mut background_ddl_enabled,
533            )
534            .await;
535
536            if !cmd.allow_kill() {
537                run_kill_not_allowed(&mut tester, record.clone()).await;
538                continue;
539            }
540
541            let should_kill = thread_rng().random_bool(env.kill_rate());
542            // spawn a background task to kill nodes
543            let handle = if should_kill {
544                let cluster = cluster.clone();
545                let opts = env.kill_opts();
546                Some(tokio::spawn(async move {
547                    let t = thread_rng().random_range(Duration::default()..Duration::from_secs(1));
548                    tokio::time::sleep(t).await;
549                    cluster.kill_node(&opts).await;
550                    tokio::time::sleep(Duration::from_secs(15)).await;
551                }))
552            } else {
553                None
554            };
555
556            for i in 0usize.. {
557                tracing::debug!(iteration = i, "retry count");
558                let delay = Duration::from_secs(min(1 << i, 10));
559                if i > 0 {
560                    tokio::time::sleep(delay).await;
561                }
562                match tester
563                    .run_async(record.clone())
564                    .timed(|_res, elapsed| {
565                        tracing::debug!("Record {:?} finished in {:?}", record, elapsed)
566                    })
567                    .await
568                {
569                    Ok(_) => {
570                        // For background ddl
571                        if let SqlCmd::CreateMaterializedView { ref name } = cmd
572                            && background_ddl_enabled
573                            && !matches!(
574                                record,
575                                Record::Statement {
576                                    expected: StatementExpect::Error(_),
577                                    ..
578                                } | Record::Query {
579                                    expected: QueryExpect::Error(_),
580                                    ..
581                                }
582                            )
583                        {
584                            wait_or_retry_background_ddl!(&record, name, i);
585                        }
586                        break;
587                    }
588                    Err(e) => {
589                        match cmd {
590                            // allow 'table exists' error when retry CREATE statement
591                            SqlCmd::Create {
592                                is_create_table_as: false,
593                            }
594                            | SqlCmd::CreateMaterializedView { .. }
595                                if i != 0
596                                    && let e = e.to_string()
597                                    // It should not be a gRPC request to meta error,
598                                    // otherwise it means that the catalog is not yet populated to fe.
599                                    && !e.contains("gRPC request to meta service failed")
600                                    && e.contains("exists")
601                                    && !e.contains("under creation")
602                                    && e.contains("Catalog error") =>
603                            {
604                                break;
605                            }
606                            // allow 'not found' error when retry DROP statement
607                            SqlCmd::Drop
608                                if i != 0
609                                    && e.to_string().contains("not found")
610                                    && e.to_string().contains("Catalog error") =>
611                            {
612                                break;
613                            }
614
615                            // Keep i >= MAX_RETRY for other errors. Since these errors indicate that the MV might not yet be created.
616                            _ if i >= MAX_RETRY => {
617                                panic!("failed to run test after retry {i} times: {e}")
618                            }
619                            SqlCmd::CreateMaterializedView { ref name }
620                                if i != 0
621                                    && e.to_string().contains("table is in creating procedure")
622                                    && background_ddl_enabled =>
623                            {
624                                wait_or_retry_background_ddl!(&record, name, i);
625                            }
626                            _ => tracing::error!(
627                                iteration = i,
628                                "failed to run test: {e}\nretry after {delay:?}"
629                            ),
630                        }
631                    }
632                }
633            }
634            if let SqlCmd::SetBackgroundDdl { enable } = cmd {
635                background_ddl_enabled = enable;
636            };
637            if let Some(handle) = handle {
638                handle.await.unwrap();
639            }
640        }
641    }
642}
643
644pub async fn run_parallel_slt_task(glob: &str, jobs: usize) -> Result<(), ParallelTestError> {
645    let mut tester =
646        sqllogictest::Runner::new(|| RisingWave::connect("frontend".into(), "dev".into()));
647    tester.add_label("madsim");
648
649    if let Some(partitioner) = PARTITIONER.as_ref() {
650        tester.with_partitioner(partitioner.clone());
651    }
652
653    tester
654        .run_parallel_async(
655            glob,
656            vec!["frontend".into()],
657            |host, dbname| async move { RisingWave::connect(host, dbname).await.unwrap() },
658            jobs,
659        )
660        .await
661        .map_err(|e| panic!("{e}"))
662}
663
664/// Replace some strings in kafka.slt and write to a new temp file.
665fn hack_kafka_test(path: &Path) -> tempfile::NamedTempFile {
666    let content = std::fs::read_to_string(path).expect("failed to read file");
667    let simple_avsc_full_path =
668        std::fs::canonicalize("src/connector/src/test_data/simple-schema.avsc")
669            .expect("failed to get schema path");
670    let complex_avsc_full_path =
671        std::fs::canonicalize("src/connector/src/test_data/complex-schema.avsc")
672            .expect("failed to get schema path");
673    let json_schema_full_path =
674        std::fs::canonicalize("src/connector/src/test_data/complex-schema.json")
675            .expect("failed to get schema path");
676    let content = content
677        .replace("127.0.0.1:29092", "192.168.11.1:29092")
678        .replace("localhost:29092", "192.168.11.1:29092")
679        .replace(
680            "/risingwave/avro-simple-schema.avsc",
681            simple_avsc_full_path.to_str().unwrap(),
682        )
683        .replace(
684            "/risingwave/avro-complex-schema.avsc",
685            complex_avsc_full_path.to_str().unwrap(),
686        )
687        .replace(
688            "/risingwave/json-complex-schema",
689            json_schema_full_path.to_str().unwrap(),
690        );
691    let file = tempfile::NamedTempFile::new().expect("failed to create temp file");
692    std::fs::write(file.path(), content).expect("failed to write file");
693    println!("created a temp file for kafka test: {:?}", file.path());
694    file
695}
696
697#[cfg(test)]
698mod tests {
699    use std::fmt::Debug;
700
701    use expect_test::{Expect, expect};
702
703    use super::*;
704
705    fn check(actual: impl Debug, expect: Expect) {
706        let actual = format!("{:#?}", actual);
707        expect.assert_eq(&actual);
708    }
709
710    #[test]
711    fn test_extract_sql_command() {
712        check(
713            extract_sql_command("create  table  t as select 1;"),
714            expect![[r#"
715                Ok(
716                    Create {
717                        is_create_table_as: true,
718                    },
719                )"#]],
720        );
721        check(
722            extract_sql_command("  create table  t (a int);"),
723            expect![[r#"
724                Ok(
725                    Create {
726                        is_create_table_as: false,
727                    },
728                )"#]],
729        );
730        check(
731            extract_sql_command(" create materialized   view  m_1 as select 1;"),
732            expect![[r#"
733                Ok(
734                    CreateMaterializedView {
735                        name: "m_1",
736                    },
737                )"#]],
738        );
739        check(
740            extract_sql_command("set background_ddl= true;"),
741            expect![[r#"
742                Ok(
743                    SetBackgroundDdl {
744                        enable: true,
745                    },
746                )"#]],
747        );
748        check(
749            extract_sql_command("SET BACKGROUND_DDL=true;"),
750            expect![[r#"
751                Ok(
752                    SetBackgroundDdl {
753                        enable: true,
754                    },
755                )"#]],
756        );
757        check(
758            extract_sql_command("CREATE MATERIALIZED VIEW if not exists m_1 as select 1;"),
759            expect![[r#"
760                Ok(
761                    CreateMaterializedView {
762                        name: "m_1",
763                    },
764                )"#]],
765        )
766    }
767}