risingwave_simulation/
slt.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::min;
16use std::path::Path;
17use std::sync::Arc;
18use std::time::Duration;
19
20use anyhow::Result;
21use rand::{Rng, rng as thread_rng};
22use sqllogictest::{
23    Condition, ParallelTestError, Partitioner, QueryExpect, Record, StatementExpect,
24};
25
26use crate::client::RisingWave;
27use crate::cluster::Cluster;
28use crate::parse::extract_sql_command;
29use crate::slt::background_ddl_mode::*;
30use crate::slt::runner::{random_vnode_count, run_kill_not_allowed, run_no_kill};
31use crate::slt::slt_env::{Env, Opts};
32use crate::slt::vnode_mode::*;
33use crate::utils::TimedExt;
34use crate::{evaluate_skip, wait_or_retry_background_ddl};
35
36// retry a maximum times until it succeed
37const MAX_RETRY: usize = 10;
38
39#[derive(Debug, PartialEq, Eq)]
40pub enum SqlCmd {
41    /// Other create statements.
42    Create {
43        is_create_table_as: bool,
44    },
45    /// Create sink.
46    CreateSink {
47        is_sink_into_table: bool,
48    },
49    /// Create Materialized views
50    CreateMaterializedView {
51        name: String,
52    },
53    /// Set background ddl
54    SetBackgroundDdl {
55        enable: bool,
56    },
57    Drop,
58    Dml,
59    Flush,
60    Others,
61}
62
63impl SqlCmd {
64    fn allow_kill(&self) -> bool {
65        matches!(
66            self,
67            SqlCmd::Create {
68                // `create table as` is also not atomic in our system.
69                is_create_table_as: false,
70                ..
71            } | SqlCmd::CreateSink {
72                is_sink_into_table: false,
73            } | SqlCmd::CreateMaterializedView { .. }
74                | SqlCmd::Drop
75        )
76        // We won't kill during insert/update/delete/alter since the atomicity is not guaranteed.
77        // TODO: For `SqlCmd::Alter`, since table fragment and catalog commit for table schema change
78        // are not transactional, we can't kill during `alter table add/drop columns` for now, will
79        // remove it until transactional commit of table fragment and catalog is supported.
80    }
81
82    fn is_create(&self) -> bool {
83        matches!(
84            self,
85            SqlCmd::Create { .. }
86                | SqlCmd::CreateSink { .. }
87                | SqlCmd::CreateMaterializedView { .. }
88        )
89    }
90}
91
92const KILL_IGNORE_FILES: &[&str] = &[
93    // TPCH queries are too slow for recovery.
94    "tpch_snapshot.slt",
95    "tpch_upstream.slt",
96    // Drop is not retryable in search path test.
97    "search_path.slt",
98    // Transaction statements are not retryable.
99    "transaction/now.slt",
100    "transaction/read_only_multi_conn.slt",
101    "transaction/read_only.slt",
102    "transaction/tolerance.slt",
103    "transaction/cursor.slt",
104    "transaction/cursor_multi_conn.slt",
105];
106
107/// Randomly set DDL statements to use `background_ddl`
108mod background_ddl_mode {
109    use anyhow::bail;
110    use rand::Rng;
111    use rand_chacha::ChaChaRng;
112    use sqllogictest::{Condition, Record, StatementExpect};
113
114    use crate::client::RisingWave;
115    use crate::slt::SqlCmd;
116    use crate::slt::slt_env::Env;
117
118    /// Wait for background mv to finish creating
119    pub(super) async fn wait_background_mv_finished(mview_name: &str) -> anyhow::Result<()> {
120        let Ok(rw) = RisingWave::connect("frontend".into(), "dev".into()).await else {
121            bail!("failed to connect to frontend for {mview_name}");
122        };
123        let client = rw.pg_client();
124        if client.simple_query("WAIT;").await.is_err() {
125            bail!("failed to wait for background mv to finish creating for {mview_name}");
126        }
127
128        let Ok(result) = client
129            .query(
130                "select count(*) from pg_matviews where matviewname=$1;",
131                &[&mview_name],
132            )
133            .await
134        else {
135            bail!("failed to query pg_matviews for {mview_name}");
136        };
137
138        match result[0].try_get::<_, i64>(0) {
139            Ok(1) => Ok(()),
140            r => bail!("expected 1 row in pg_matviews, got {r:#?} instead for {mview_name}"),
141        }
142    }
143
144    // TODO(kwannoel): move rng, `background_ddl` to `Env` struct
145    pub(super) async fn set_background_ddl<D, M, T>(
146        tester: &mut sqllogictest::Runner<D, M>,
147        env: &Env,
148        record: &Record<T>,
149        cmd: &SqlCmd,
150        manual_background_ddl_enabled: bool,
151        rng: &mut ChaChaRng,
152        background_ddl_enabled: &mut bool,
153    ) where
154        D: sqllogictest::AsyncDB<ColumnType = T>,
155        M: sqllogictest::MakeConnection<Conn = D>,
156        T: sqllogictest::ColumnType,
157    {
158        if let Record::Statement {
159            loc,
160            conditions,
161            connection,
162            ..
163        } = &record
164            && matches!(cmd, SqlCmd::CreateMaterializedView { .. })
165            && !manual_background_ddl_enabled
166            && conditions.iter().all(|c| {
167                *c != Condition::SkipIf {
168                    label: "madsim".to_owned(),
169                }
170            })
171            && env.background_ddl_rate() > 0.0
172        {
173            let background_ddl_setting = rng.random_bool(env.background_ddl_rate());
174            let set_background_ddl = Record::Statement {
175                loc: loc.clone(),
176                conditions: conditions.clone(),
177                connection: connection.clone(),
178                expected: StatementExpect::Ok,
179                sql: format!("SET BACKGROUND_DDL={background_ddl_setting};"),
180                retry: None,
181            };
182            tester.run_async(set_background_ddl).await.unwrap();
183            *background_ddl_enabled = background_ddl_setting;
184        };
185    }
186
187    // NOTE(kwannoel): only applicable to mvs currently
188    #[macro_export]
189    macro_rules! wait_or_retry_background_ddl {
190        ($record:expr, $name:expr, $iteration:expr) => {
191            let record = $record;
192            let i = $iteration;
193            tracing::debug!(iteration = i, "Retry for background ddl");
194            match wait_background_mv_finished($name).await {
195                Ok(_) => {
196                    tracing::debug!(
197                        iteration = i,
198                        "Record with background_ddl {:?} finished",
199                        record
200                    );
201                    break;
202                }
203                Err(err) => {
204                    tracing::error!(
205                        iteration = i,
206                        ?err,
207                        "failed to wait for background mv to finish creating"
208                    );
209                    if i >= MAX_RETRY {
210                        panic!("failed to run test after retry {i} times, error={err:#?}");
211                    }
212                    continue;
213                }
214            }
215        };
216    }
217}
218
219mod vnode_mode {
220    use std::env;
221    use std::hash::{DefaultHasher, Hash, Hasher};
222    use std::sync::LazyLock;
223
224    use anyhow::bail;
225    use sqllogictest::Partitioner;
226
227    // Copied from sqllogictest-bin.
228    #[derive(Clone)]
229    pub(super) struct HashPartitioner {
230        count: u64,
231        id: u64,
232    }
233
234    impl HashPartitioner {
235        pub(super) fn new(count: u64, id: u64) -> anyhow::Result<Self> {
236            if count == 0 {
237                bail!("partition count must be greater than zero");
238            }
239            if id >= count {
240                bail!("partition id (zero-based) must be less than count");
241            }
242            Ok(Self { count, id })
243        }
244    }
245
246    impl Partitioner for HashPartitioner {
247        fn matches(&self, file_name: &str) -> bool {
248            let mut hasher = DefaultHasher::new();
249            file_name.hash(&mut hasher);
250            hasher.finish() % self.count == self.id
251        }
252    }
253
254    pub(super) static PARTITIONER: LazyLock<Option<HashPartitioner>> = LazyLock::new(|| {
255        let count = env::var("BUILDKITE_PARALLEL_JOB_COUNT")
256            .ok()?
257            .parse::<u64>()
258            .unwrap();
259        let id = env::var("BUILDKITE_PARALLEL_JOB")
260            .ok()?
261            .parse::<u64>()
262            .unwrap();
263        Some(HashPartitioner::new(count, id).unwrap())
264    });
265}
266
267pub mod slt_env {
268    use rand::SeedableRng;
269    use rand_chacha::ChaChaRng;
270
271    use crate::cluster::KillOpts;
272
273    pub struct Opts {
274        pub kill_opts: KillOpts,
275        /// Probability of `background_ddl` being set to true per ddl record.
276        pub background_ddl_rate: f64,
277        /// Set vnode count (`STREAMING_MAX_PARALLELISM`) to random value before running DDL.
278        pub random_vnode_count: bool,
279    }
280
281    pub(super) struct Env {
282        opts: Opts,
283    }
284
285    impl Env {
286        pub fn new(opts: Opts) -> Self {
287            Self { opts }
288        }
289
290        pub fn get_rng() -> ChaChaRng {
291            let seed = std::env::var("MADSIM_TEST_SEED")
292                .unwrap_or("0".to_owned())
293                .parse::<u64>()
294                .unwrap();
295            ChaChaRng::seed_from_u64(seed)
296        }
297
298        pub fn background_ddl_rate(&self) -> f64 {
299            self.opts.background_ddl_rate
300        }
301
302        pub fn kill(&self) -> bool {
303            self.opts.kill_opts.kill_compute
304                || self.opts.kill_opts.kill_meta
305                || self.opts.kill_opts.kill_frontend
306                || self.opts.kill_opts.kill_compactor
307        }
308
309        pub fn random_vnode_count(&self) -> bool {
310            self.opts.random_vnode_count
311        }
312
313        pub fn kill_opts(&self) -> KillOpts {
314            self.opts.kill_opts
315        }
316
317        pub fn kill_rate(&self) -> f64 {
318            self.opts.kill_opts.kill_rate as f64
319        }
320    }
321}
322
323mod runner {
324    use std::time::Duration;
325
326    use rand::prelude::IteratorRandom;
327    use rand::rng as thread_rng;
328    use sqllogictest::{Record, StatementExpect, TestError};
329
330    use crate::slt::slt_env::Env;
331    use crate::slt::{MAX_RETRY, SqlCmd};
332    use crate::utils::TimedExt;
333    #[macro_export]
334    macro_rules! evaluate_skip {
335        ($env:expr, $path:expr) => {
336            if let Some(partitioner) = PARTITIONER.as_ref()
337                && !partitioner.matches($path.to_str().unwrap())
338            {
339                println!("[skip partition] {}", $path.display());
340                continue;
341            } else if $env.kill() && KILL_IGNORE_FILES.iter().any(|s| $path.ends_with(s)) {
342                println!("[skip kill] {}", $path.display());
343                continue;
344            } else {
345                println!("[run] {}", $path.display());
346            }
347        };
348    }
349
350    pub(super) fn random_vnode_count<T: sqllogictest::ColumnType>(
351        env: &Env,
352        records: &[Record<T>],
353    ) -> bool {
354        env.random_vnode_count()
355            && records.iter().all(|record| {
356                if let Record::Statement { sql, .. } | Record::Query { sql, .. } = record
357                    && sql.to_lowercase().contains("parallelism")
358                {
359                    println!("[RANDOM VNODE COUNT] skip: {}", sql);
360                    false
361                } else {
362                    true
363                }
364            })
365    }
366
367    /// `kill` is totally disabled.
368    pub(super) async fn run_no_kill<D, M, T>(
369        tester: &mut sqllogictest::Runner<D, M>,
370        random_vnode_count: bool,
371        cmd: &SqlCmd,
372        record: Record<T>,
373    ) -> Result<(), TestError>
374    where
375        D: sqllogictest::AsyncDB<ColumnType = T>,
376        M: sqllogictest::MakeConnection<Conn = D>,
377        T: sqllogictest::ColumnType,
378    {
379        // Set random vnode count if needed.
380        if random_vnode_count
381            && cmd.is_create()
382            && let Record::Statement {
383                loc,
384                conditions,
385                connection,
386                ..
387            } = &record
388        {
389            let vnode_count = (2..=64) // small
390                .chain(224..=288) // normal
391                .chain(992..=1056) // 1024 affects row id gen behavior
392                .choose(&mut thread_rng())
393                .unwrap();
394            let sql = format!("SET STREAMING_MAX_PARALLELISM = {vnode_count};");
395            println!("[RANDOM VNODE COUNT] set: {vnode_count}");
396            let set_random_vnode_count = Record::Statement {
397                loc: loc.clone(),
398                conditions: conditions.clone(),
399                connection: connection.clone(),
400                sql,
401                expected: StatementExpect::Ok,
402                retry: None,
403            };
404            tester.run_async(set_random_vnode_count).await.unwrap();
405            println!("[RANDOM VNODE COUNT] run: {record}");
406        }
407
408        tester.run_async(record).await.map(|_| ())
409    }
410
411    /// Used when `kill` is not allowed for some specific commands
412    pub(super) async fn run_kill_not_allowed<D, M, T>(
413        tester: &mut sqllogictest::Runner<D, M>,
414        record: Record<T>,
415    ) where
416        D: sqllogictest::AsyncDB<ColumnType = T>,
417        M: sqllogictest::MakeConnection<Conn = D>,
418        T: sqllogictest::ColumnType,
419    {
420        for i in 0usize.. {
421            let delay = Duration::from_secs(1 << i);
422            if let Err(err) = tester
423                .run_async(record.clone())
424                .timed(|_res, elapsed| {
425                    tracing::debug!("Record {:?} finished in {:?}", record, elapsed)
426                })
427                .await
428            {
429                let err_string = err.to_string();
430                // cluster could be still under recovering if killed before, retry if
431                // meets `no reader for dml in table with id {}`.
432                let allowed_errs = [
433                    "no reader for dml in table",
434                    "error reading a body from connection: broken pipe",
435                    "failed to inject barrier",
436                    "get error from control stream",
437                    "cluster is under recovering",
438                ];
439                let should_retry = i < MAX_RETRY
440                    && allowed_errs
441                        .iter()
442                        .any(|allowed_err| err_string.contains(allowed_err));
443                if !should_retry {
444                    panic!("{}", err);
445                }
446                tracing::error!("failed to run test: {err}\nretry after {delay:?}");
447            } else {
448                break;
449            }
450            tokio::time::sleep(delay).await;
451        }
452    }
453}
454
455/// Run the sqllogictest files in `glob`.
456pub async fn run_slt_task(cluster: Arc<Cluster>, glob: &str, opts: Opts) {
457    let env = slt_env::Env::new(opts);
458    tracing::info!("background_ddl_rate: {}", env.background_ddl_rate());
459    let mut rng = Env::get_rng();
460    let files = glob::glob(glob).expect("failed to read glob pattern");
461    for file in files {
462        // use a session per file
463        let mut tester =
464            sqllogictest::Runner::new(|| RisingWave::connect("frontend".into(), "dev".into()));
465        tester.add_label("madsim");
466
467        let file = file.unwrap();
468        let path = file.as_path();
469
470        evaluate_skip!(env, path);
471
472        // XXX: hack for kafka source test
473        let tempfile = (path.ends_with("kafka.slt") || path.ends_with("kafka_batch.slt"))
474            .then(|| hack_kafka_test(path));
475        let path = tempfile.as_ref().map(|p| p.path()).unwrap_or(path);
476
477        // NOTE(kwannoel): For background ddl
478        let mut background_ddl_enabled = false;
479
480        // If background ddl is set to true within the test case, prevent random setting of background_ddl to true.
481        // We can revert it back to false only if we encounter a record that sets background_ddl to false.
482        let mut manual_background_ddl_enabled = false;
483
484        let records = sqllogictest::parse_file(path).expect("failed to parse file");
485        let random_vnode_count = random_vnode_count(&env, &records);
486
487        for record in records {
488            // uncomment to print metrics for task counts
489            // let metrics = madsim::runtime::Handle::current().metrics();
490            // println!("{:#?}", metrics);
491            // println!("{}", metrics.num_tasks_by_node_by_spawn());
492            if let sqllogictest::Record::Halt { .. } = record {
493                break;
494            }
495
496            let cmd = match &record {
497                sqllogictest::Record::Statement {
498                    sql, conditions, ..
499                }
500                | sqllogictest::Record::Query {
501                    sql, conditions, ..
502                } if conditions
503                    .iter()
504                    .all(|c| !matches!(c, Condition::SkipIf{ label } if label == "madsim"))
505                    && !conditions
506                        .iter()
507                        .any(|c| matches!(c, Condition::OnlyIf{ label} if label != "madsim" )) =>
508                {
509                    extract_sql_command(sql).unwrap_or(SqlCmd::Others)
510                }
511                _ => SqlCmd::Others,
512            };
513
514            // For normal records.
515            if !env.kill() {
516                match run_no_kill(&mut tester, random_vnode_count, &cmd, record).await {
517                    Ok(_) => continue,
518                    Err(e) => panic!("{}", e),
519                }
520            }
521
522            // For kill enabled.
523            tracing::debug!(?cmd, "Running");
524
525            if let SqlCmd::SetBackgroundDdl { enable } = cmd {
526                manual_background_ddl_enabled = enable;
527                background_ddl_enabled = enable;
528            }
529
530            // For each background ddl compatible statement, provide a chance for background_ddl=true.
531            set_background_ddl(
532                &mut tester,
533                &env,
534                &record,
535                &cmd,
536                manual_background_ddl_enabled,
537                &mut rng,
538                &mut background_ddl_enabled,
539            )
540            .await;
541
542            if !cmd.allow_kill() {
543                run_kill_not_allowed(&mut tester, record.clone()).await;
544                continue;
545            }
546
547            let should_kill = thread_rng().random_bool(env.kill_rate());
548            // spawn a background task to kill nodes
549            let handle = if should_kill {
550                let cluster = cluster.clone();
551                let opts = env.kill_opts();
552                Some(tokio::spawn(async move {
553                    let t = thread_rng().random_range(Duration::default()..Duration::from_secs(1));
554                    tokio::time::sleep(t).await;
555                    cluster.kill_node(&opts).await;
556                    tokio::time::sleep(Duration::from_secs(15)).await;
557                }))
558            } else {
559                None
560            };
561
562            for i in 0usize.. {
563                tracing::debug!(iteration = i, "retry count");
564                let delay = Duration::from_secs(min(1 << i, 10));
565                if i > 0 {
566                    tokio::time::sleep(delay).await;
567                }
568                match tester
569                    .run_async(record.clone())
570                    .timed(|_res, elapsed| {
571                        tracing::debug!("Record {:?} finished in {:?}", record, elapsed)
572                    })
573                    .await
574                {
575                    Ok(_) => {
576                        // For background ddl
577                        if let SqlCmd::CreateMaterializedView { ref name } = cmd
578                            && background_ddl_enabled
579                            && !matches!(
580                                record,
581                                Record::Statement {
582                                    expected: StatementExpect::Error(_),
583                                    ..
584                                } | Record::Query {
585                                    expected: QueryExpect::Error(_),
586                                    ..
587                                }
588                            )
589                        {
590                            wait_or_retry_background_ddl!(&record, name, i);
591                        }
592                        break;
593                    }
594                    Err(e) => {
595                        match cmd {
596                            // allow 'table exists' error when retry CREATE statement
597                            SqlCmd::Create {
598                                is_create_table_as: false,
599                            }
600                            | SqlCmd::CreateSink {
601                                is_sink_into_table: false,
602                            }
603                            | SqlCmd::CreateMaterializedView { .. }
604                                if i != 0
605                                    && let e = e.to_string()
606                                    // It should not be a gRPC request to meta error,
607                                    // otherwise it means that the catalog is not yet populated to fe.
608                                    && !e.contains("gRPC request to meta service failed")
609                                    && e.contains("exists")
610                                    && !e.contains("under creation")
611                                    && e.contains("Catalog error") =>
612                            {
613                                break;
614                            }
615                            // allow 'not found' error when retry DROP statement
616                            SqlCmd::Drop
617                                if i != 0
618                                    && e.to_string().contains("not found")
619                                    && e.to_string().contains("Catalog error") =>
620                            {
621                                break;
622                            }
623
624                            // Keep i >= MAX_RETRY for other errors. Since these errors indicate that the MV might not yet be created.
625                            _ if i >= MAX_RETRY => {
626                                panic!("failed to run test after retry {i} times: {e}")
627                            }
628                            SqlCmd::CreateMaterializedView { ref name }
629                                if i != 0
630                                    && e.to_string().contains("table is in creating procedure")
631                                    && background_ddl_enabled =>
632                            {
633                                wait_or_retry_background_ddl!(&record, name, i);
634                            }
635                            _ => tracing::error!(
636                                iteration = i,
637                                "failed to run test: {e}\nretry after {delay:?}"
638                            ),
639                        }
640                    }
641                }
642            }
643            if let SqlCmd::SetBackgroundDdl { enable } = cmd {
644                background_ddl_enabled = enable;
645            };
646            if let Some(handle) = handle {
647                handle.await.unwrap();
648            }
649        }
650    }
651}
652
653pub async fn run_parallel_slt_task(glob: &str, jobs: usize) -> Result<(), ParallelTestError> {
654    let mut tester =
655        sqllogictest::Runner::new(|| RisingWave::connect("frontend".into(), "dev".into()));
656    tester.add_label("madsim");
657
658    if let Some(partitioner) = PARTITIONER.as_ref() {
659        tester.with_partitioner(partitioner.clone());
660    }
661
662    tester
663        .run_parallel_async(
664            glob,
665            vec!["frontend".into()],
666            |host, dbname| async move { RisingWave::connect(host, dbname).await.unwrap() },
667            jobs,
668        )
669        .await
670        .map_err(|e| panic!("{e}"))
671}
672
673/// Replace some strings in kafka.slt and write to a new temp file.
674fn hack_kafka_test(path: &Path) -> tempfile::NamedTempFile {
675    let content = std::fs::read_to_string(path).expect("failed to read file");
676    let simple_avsc_full_path =
677        std::fs::canonicalize("src/connector/src/test_data/simple-schema.avsc")
678            .expect("failed to get schema path");
679    let complex_avsc_full_path =
680        std::fs::canonicalize("src/connector/src/test_data/complex-schema.avsc")
681            .expect("failed to get schema path");
682    let json_schema_full_path =
683        std::fs::canonicalize("src/connector/src/test_data/complex-schema.json")
684            .expect("failed to get schema path");
685    let content = content
686        .replace("127.0.0.1:29092", "192.168.11.1:29092")
687        .replace("localhost:29092", "192.168.11.1:29092")
688        .replace(
689            "/risingwave/avro-simple-schema.avsc",
690            simple_avsc_full_path.to_str().unwrap(),
691        )
692        .replace(
693            "/risingwave/avro-complex-schema.avsc",
694            complex_avsc_full_path.to_str().unwrap(),
695        )
696        .replace(
697            "/risingwave/json-complex-schema",
698            json_schema_full_path.to_str().unwrap(),
699        );
700    let file = tempfile::NamedTempFile::new().expect("failed to create temp file");
701    std::fs::write(file.path(), content).expect("failed to write file");
702    println!("created a temp file for kafka test: {:?}", file.path());
703    file
704}
705
706#[cfg(test)]
707mod tests {
708    use std::fmt::Debug;
709
710    use expect_test::{Expect, expect};
711
712    use super::*;
713
714    fn check(actual: impl Debug, expect: Expect) {
715        let actual = format!("{:#?}", actual);
716        expect.assert_eq(&actual);
717    }
718
719    #[test]
720    fn test_extract_sql_command() {
721        check(
722            extract_sql_command("create  table  t as select 1;"),
723            expect![[r#"
724                Ok(
725                    Create {
726                        is_create_table_as: true,
727                    },
728                )"#]],
729        );
730        check(
731            extract_sql_command("  create table  t (a int);"),
732            expect![[r#"
733                Ok(
734                    Create {
735                        is_create_table_as: false,
736                    },
737                )"#]],
738        );
739        check(
740            extract_sql_command(" create materialized   view  m_1 as select 1;"),
741            expect![[r#"
742                Ok(
743                    CreateMaterializedView {
744                        name: "m_1",
745                    },
746                )"#]],
747        );
748        check(
749            extract_sql_command("set background_ddl= true;"),
750            expect![[r#"
751                Ok(
752                    SetBackgroundDdl {
753                        enable: true,
754                    },
755                )"#]],
756        );
757        check(
758            extract_sql_command("SET BACKGROUND_DDL=true;"),
759            expect![[r#"
760                Ok(
761                    SetBackgroundDdl {
762                        enable: true,
763                    },
764                )"#]],
765        );
766        check(
767            extract_sql_command("CREATE MATERIALIZED VIEW if not exists m_1 as select 1;"),
768            expect![[r#"
769                Ok(
770                    CreateMaterializedView {
771                        name: "m_1",
772                    },
773                )"#]],
774        )
775    }
776}