risingwave_simulation/
slt.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::min;
16use std::path::Path;
17use std::sync::Arc;
18use std::time::Duration;
19
20use anyhow::Result;
21use rand::{Rng, rng as thread_rng};
22use sqllogictest::substitution::well_known;
23use sqllogictest::{
24    Condition, ParallelTestError, Partitioner, QueryExpect, Record, StatementExpect,
25};
26
27use crate::client::RisingWave;
28use crate::cluster::Cluster;
29use crate::parse::extract_sql_command;
30use crate::slt::background_ddl_mode::*;
31use crate::slt::runner::{random_vnode_count, run_kill_not_allowed, run_no_kill};
32use crate::slt::slt_env::{Env, Opts};
33use crate::slt::vnode_mode::*;
34use crate::utils::TimedExt;
35use crate::{evaluate_skip, wait_or_retry_background_ddl};
36
37// retry a maximum times until it succeed
38const MAX_RETRY: usize = 10;
39
40#[derive(Debug, PartialEq, Eq)]
41pub enum SqlCmd {
42    /// Other create statements.
43    Create {
44        is_create_table_as: bool,
45    },
46    /// Create sink.
47    CreateSink {
48        is_sink_into_table: bool,
49    },
50    /// Create Materialized views
51    CreateMaterializedView {
52        name: String,
53    },
54    /// Set background ddl
55    SetBackgroundDdl {
56        enable: bool,
57    },
58    Drop,
59    Dml,
60    Flush,
61    Others,
62}
63
64impl SqlCmd {
65    fn allow_kill(&self) -> bool {
66        matches!(
67            self,
68            SqlCmd::Create {
69                // `create table as` is also not atomic in our system.
70                is_create_table_as: false,
71                ..
72            } | SqlCmd::CreateSink {
73                is_sink_into_table: false,
74            } | SqlCmd::CreateMaterializedView { .. }
75                | SqlCmd::Drop
76        )
77        // We won't kill during insert/update/delete/alter since the atomicity is not guaranteed.
78        // TODO: For `SqlCmd::Alter`, since table fragment and catalog commit for table schema change
79        // are not transactional, we can't kill during `alter table add/drop columns` for now, will
80        // remove it until transactional commit of table fragment and catalog is supported.
81    }
82
83    fn is_create(&self) -> bool {
84        matches!(
85            self,
86            SqlCmd::Create { .. }
87                | SqlCmd::CreateSink { .. }
88                | SqlCmd::CreateMaterializedView { .. }
89        )
90    }
91}
92
93const KILL_IGNORE_FILES: &[&str] = &[
94    // TPCH queries are too slow for recovery.
95    "tpch_snapshot.slt",
96    "tpch_upstream.slt",
97    // Drop is not retryable in search path test.
98    "search_path.slt",
99    // Transaction statements are not retryable.
100    "transaction/now.slt",
101    "transaction/read_only_multi_conn.slt",
102    "transaction/read_only.slt",
103    "transaction/tolerance.slt",
104    "transaction/cursor.slt",
105    "transaction/cursor_multi_conn.slt",
106];
107
108/// Randomly set DDL statements to use `background_ddl`
109mod background_ddl_mode {
110    use anyhow::bail;
111    use rand::Rng;
112    use rand_chacha::ChaChaRng;
113    use sqllogictest::{Condition, Record, StatementExpect};
114
115    use crate::client::RisingWave;
116    use crate::slt::SqlCmd;
117    use crate::slt::slt_env::Env;
118
119    /// Wait for background mv to finish creating
120    pub(super) async fn wait_background_mv_finished(mview_name: &str) -> anyhow::Result<()> {
121        let Ok(rw) = RisingWave::connect("frontend".into(), "dev".into()).await else {
122            bail!("failed to connect to frontend for {mview_name}");
123        };
124        let client = rw.pg_client();
125        if client.simple_query("WAIT;").await.is_err() {
126            bail!("failed to wait for background mv to finish creating for {mview_name}");
127        }
128
129        let Ok(result) = client
130            .query(
131                "select count(*) from pg_matviews where matviewname=$1;",
132                &[&mview_name],
133            )
134            .await
135        else {
136            bail!("failed to query pg_matviews for {mview_name}");
137        };
138
139        match result[0].try_get::<_, i64>(0) {
140            Ok(1) => Ok(()),
141            r => bail!("expected 1 row in pg_matviews, got {r:#?} instead for {mview_name}"),
142        }
143    }
144
145    // TODO(kwannoel): move rng, `background_ddl` to `Env` struct
146    pub(super) async fn set_background_ddl<D, M, T>(
147        tester: &mut sqllogictest::Runner<D, M>,
148        env: &Env,
149        record: &Record<T>,
150        cmd: &SqlCmd,
151        manual_background_ddl_enabled: bool,
152        rng: &mut ChaChaRng,
153        background_ddl_enabled: &mut bool,
154    ) where
155        D: sqllogictest::AsyncDB<ColumnType = T>,
156        M: sqllogictest::MakeConnection<Conn = D>,
157        T: sqllogictest::ColumnType,
158    {
159        if let Record::Statement {
160            loc,
161            conditions,
162            connection,
163            ..
164        } = &record
165            && matches!(cmd, SqlCmd::CreateMaterializedView { .. })
166            && !manual_background_ddl_enabled
167            && conditions.iter().all(|c| {
168                *c != Condition::SkipIf {
169                    label: "madsim".to_owned(),
170                }
171            })
172            && env.background_ddl_rate() > 0.0
173        {
174            let background_ddl_setting = rng.random_bool(env.background_ddl_rate());
175            let set_background_ddl = Record::Statement {
176                loc: loc.clone(),
177                conditions: conditions.clone(),
178                connection: connection.clone(),
179                expected: StatementExpect::Ok,
180                sql: format!("SET BACKGROUND_DDL={background_ddl_setting};"),
181                retry: None,
182            };
183            tester.run_async(set_background_ddl).await.unwrap();
184            *background_ddl_enabled = background_ddl_setting;
185        };
186    }
187
188    // NOTE(kwannoel): only applicable to mvs currently
189    #[macro_export]
190    macro_rules! wait_or_retry_background_ddl {
191        ($record:expr, $name:expr, $iteration:expr) => {
192            let record = $record;
193            let i = $iteration;
194            tracing::debug!(iteration = i, "Retry for background ddl");
195            match wait_background_mv_finished($name).await {
196                Ok(_) => {
197                    tracing::debug!(
198                        iteration = i,
199                        "Record with background_ddl {:?} finished",
200                        record
201                    );
202                    break;
203                }
204                Err(err) => {
205                    tracing::error!(
206                        iteration = i,
207                        ?err,
208                        "failed to wait for background mv to finish creating"
209                    );
210                    if i >= MAX_RETRY {
211                        panic!("failed to run test after retry {i} times, error={err:#?}");
212                    }
213                    continue;
214                }
215            }
216        };
217    }
218}
219
220mod vnode_mode {
221    use std::env;
222    use std::hash::{DefaultHasher, Hash, Hasher};
223    use std::sync::LazyLock;
224
225    use anyhow::bail;
226    use sqllogictest::Partitioner;
227
228    // Copied from sqllogictest-bin.
229    #[derive(Clone)]
230    pub(super) struct HashPartitioner {
231        count: u64,
232        id: u64,
233    }
234
235    impl HashPartitioner {
236        pub(super) fn new(count: u64, id: u64) -> anyhow::Result<Self> {
237            if count == 0 {
238                bail!("partition count must be greater than zero");
239            }
240            if id >= count {
241                bail!("partition id (zero-based) must be less than count");
242            }
243            Ok(Self { count, id })
244        }
245    }
246
247    impl Partitioner for HashPartitioner {
248        fn matches(&self, file_name: &str) -> bool {
249            let mut hasher = DefaultHasher::new();
250            file_name.hash(&mut hasher);
251            hasher.finish() % self.count == self.id
252        }
253    }
254
255    pub(super) static PARTITIONER: LazyLock<Option<HashPartitioner>> = LazyLock::new(|| {
256        let count = env::var("BUILDKITE_PARALLEL_JOB_COUNT")
257            .ok()?
258            .parse::<u64>()
259            .unwrap();
260        let id = env::var("BUILDKITE_PARALLEL_JOB")
261            .ok()?
262            .parse::<u64>()
263            .unwrap();
264        Some(HashPartitioner::new(count, id).unwrap())
265    });
266}
267
268pub mod slt_env {
269    use rand::SeedableRng;
270    use rand_chacha::ChaChaRng;
271
272    use crate::cluster::KillOpts;
273
274    pub struct Opts {
275        pub kill_opts: KillOpts,
276        /// Probability of `background_ddl` being set to true per ddl record.
277        pub background_ddl_rate: f64,
278        /// Set vnode count (`STREAMING_MAX_PARALLELISM`) to random value before running DDL.
279        pub random_vnode_count: bool,
280    }
281
282    pub(super) struct Env {
283        opts: Opts,
284    }
285
286    impl Env {
287        pub fn new(opts: Opts) -> Self {
288            Self { opts }
289        }
290
291        pub fn get_rng() -> ChaChaRng {
292            let seed = std::env::var("MADSIM_TEST_SEED")
293                .unwrap_or("0".to_owned())
294                .parse::<u64>()
295                .unwrap();
296            ChaChaRng::seed_from_u64(seed)
297        }
298
299        pub fn background_ddl_rate(&self) -> f64 {
300            self.opts.background_ddl_rate
301        }
302
303        pub fn kill(&self) -> bool {
304            self.opts.kill_opts.kill_compute
305                || self.opts.kill_opts.kill_meta
306                || self.opts.kill_opts.kill_frontend
307                || self.opts.kill_opts.kill_compactor
308        }
309
310        pub fn random_vnode_count(&self) -> bool {
311            self.opts.random_vnode_count
312        }
313
314        pub fn kill_opts(&self) -> KillOpts {
315            self.opts.kill_opts
316        }
317
318        pub fn kill_rate(&self) -> f64 {
319            self.opts.kill_opts.kill_rate as f64
320        }
321    }
322}
323
324mod runner {
325    use std::time::Duration;
326
327    use rand::prelude::IteratorRandom;
328    use rand::rng as thread_rng;
329    use sqllogictest::{Record, StatementExpect, TestError};
330
331    use crate::slt::slt_env::Env;
332    use crate::slt::{MAX_RETRY, SqlCmd};
333    use crate::utils::TimedExt;
334    #[macro_export]
335    macro_rules! evaluate_skip {
336        ($env:expr, $path:expr) => {
337            if let Some(partitioner) = PARTITIONER.as_ref()
338                && !partitioner.matches($path.to_str().unwrap())
339            {
340                println!("[skip partition] {}", $path.display());
341                continue;
342            } else if $env.kill() && KILL_IGNORE_FILES.iter().any(|s| $path.ends_with(s)) {
343                println!("[skip kill] {}", $path.display());
344                continue;
345            } else {
346                println!("[run] {}", $path.display());
347            }
348        };
349    }
350
351    pub(super) fn random_vnode_count<T: sqllogictest::ColumnType>(
352        env: &Env,
353        records: &[Record<T>],
354    ) -> bool {
355        env.random_vnode_count()
356            && records.iter().all(|record| {
357                if let Record::Statement { sql, .. } | Record::Query { sql, .. } = record
358                    && sql.to_lowercase().contains("parallelism")
359                {
360                    println!("[RANDOM VNODE COUNT] skip: {}", sql);
361                    false
362                } else {
363                    true
364                }
365            })
366    }
367
368    /// `kill` is totally disabled.
369    pub(super) async fn run_no_kill<D, M, T>(
370        tester: &mut sqllogictest::Runner<D, M>,
371        random_vnode_count: bool,
372        cmd: &SqlCmd,
373        record: Record<T>,
374    ) -> Result<(), TestError>
375    where
376        D: sqllogictest::AsyncDB<ColumnType = T>,
377        M: sqllogictest::MakeConnection<Conn = D>,
378        T: sqllogictest::ColumnType,
379    {
380        // Set random vnode count if needed.
381        if random_vnode_count
382            && cmd.is_create()
383            && let Record::Statement {
384                loc,
385                conditions,
386                connection,
387                ..
388            } = &record
389        {
390            let vnode_count = (2..=64) // small
391                .chain(224..=288) // normal
392                .chain(992..=1056) // 1024 affects row id gen behavior
393                .choose(&mut thread_rng())
394                .unwrap();
395            let sql = format!("SET STREAMING_MAX_PARALLELISM = {vnode_count};");
396            println!("[RANDOM VNODE COUNT] set: {vnode_count}");
397            let set_random_vnode_count = Record::Statement {
398                loc: loc.clone(),
399                conditions: conditions.clone(),
400                connection: connection.clone(),
401                sql,
402                expected: StatementExpect::Ok,
403                retry: None,
404            };
405            tester.run_async(set_random_vnode_count).await.unwrap();
406            println!("[RANDOM VNODE COUNT] run: {record}");
407        }
408
409        tester.run_async(record).await.map(|_| ())
410    }
411
412    /// Used when `kill` is not allowed for some specific commands
413    pub(super) async fn run_kill_not_allowed<D, M, T>(
414        tester: &mut sqllogictest::Runner<D, M>,
415        record: Record<T>,
416    ) where
417        D: sqllogictest::AsyncDB<ColumnType = T>,
418        M: sqllogictest::MakeConnection<Conn = D>,
419        T: sqllogictest::ColumnType,
420    {
421        for i in 0usize.. {
422            let delay = Duration::from_secs(1 << i);
423            if let Err(err) = tester
424                .run_async(record.clone())
425                .timed(|_res, elapsed| {
426                    tracing::debug!("Record {:?} finished in {:?}", record, elapsed)
427                })
428                .await
429            {
430                let err_string = err.to_string();
431                // cluster could be still under recovering if killed before, retry if
432                // meets `no reader for dml in table with id {}`.
433                let allowed_errs = [
434                    "no reader for dml in table",
435                    "error reading a body from connection: broken pipe",
436                    "failed to inject barrier",
437                    "get error from control stream",
438                    "cluster is under recovering",
439                ];
440                let should_retry = i < MAX_RETRY
441                    && allowed_errs
442                        .iter()
443                        .any(|allowed_err| err_string.contains(allowed_err));
444                if !should_retry {
445                    panic!("{}", err);
446                }
447                tracing::error!("failed to run test: {err}\nretry after {delay:?}");
448            } else {
449                break;
450            }
451            tokio::time::sleep(delay).await;
452        }
453    }
454}
455
456/// Run the sqllogictest files in `glob`.
457pub async fn run_slt_task(cluster: Arc<Cluster>, glob: &str, opts: Opts) {
458    let env = slt_env::Env::new(opts);
459    tracing::info!("background_ddl_rate: {}", env.background_ddl_rate());
460    let mut rng = Env::get_rng();
461    let files = glob::glob(glob).expect("failed to read glob pattern");
462    for file in files {
463        // use a session per file
464        let mut tester =
465            sqllogictest::Runner::new(|| RisingWave::connect("frontend".into(), "dev".into()));
466        tester.add_label("madsim");
467        tester.set_var(well_known::DATABASE.to_owned(), "dev".to_owned());
468
469        let file = file.unwrap();
470        let path = file.as_path();
471
472        evaluate_skip!(env, path);
473
474        // XXX: hack for kafka source test
475        let tempfile = (path.ends_with("kafka.slt") || path.ends_with("kafka_batch.slt"))
476            .then(|| hack_kafka_test(path));
477        let path = tempfile.as_ref().map(|p| p.path()).unwrap_or(path);
478
479        // NOTE(kwannoel): For background ddl
480        let mut background_ddl_enabled = false;
481
482        // If background ddl is set to true within the test case, prevent random setting of background_ddl to true.
483        // We can revert it back to false only if we encounter a record that sets background_ddl to false.
484        let mut manual_background_ddl_enabled = false;
485
486        let records = sqllogictest::parse_file(path).expect("failed to parse file");
487        let random_vnode_count = random_vnode_count(&env, &records);
488
489        for record in records {
490            // uncomment to print metrics for task counts
491            // let metrics = madsim::runtime::Handle::current().metrics();
492            // println!("{:#?}", metrics);
493            // println!("{}", metrics.num_tasks_by_node_by_spawn());
494            if let sqllogictest::Record::Halt { .. } = record {
495                break;
496            }
497
498            let cmd = match &record {
499                sqllogictest::Record::Statement {
500                    sql, conditions, ..
501                }
502                | sqllogictest::Record::Query {
503                    sql, conditions, ..
504                } if conditions
505                    .iter()
506                    .all(|c| !matches!(c, Condition::SkipIf{ label } if label == "madsim"))
507                    && !conditions
508                        .iter()
509                        .any(|c| matches!(c, Condition::OnlyIf{ label} if label != "madsim" )) =>
510                {
511                    extract_sql_command(sql).unwrap_or(SqlCmd::Others)
512                }
513                _ => SqlCmd::Others,
514            };
515
516            // For normal records.
517            if !env.kill() {
518                match run_no_kill(&mut tester, random_vnode_count, &cmd, record).await {
519                    Ok(_) => continue,
520                    Err(e) => panic!("{}", e),
521                }
522            }
523
524            // For kill enabled.
525            tracing::debug!(?cmd, "Running");
526
527            if let SqlCmd::SetBackgroundDdl { enable } = cmd {
528                manual_background_ddl_enabled = enable;
529                background_ddl_enabled = enable;
530            }
531
532            // For each background ddl compatible statement, provide a chance for background_ddl=true.
533            set_background_ddl(
534                &mut tester,
535                &env,
536                &record,
537                &cmd,
538                manual_background_ddl_enabled,
539                &mut rng,
540                &mut background_ddl_enabled,
541            )
542            .await;
543
544            if !cmd.allow_kill() {
545                run_kill_not_allowed(&mut tester, record.clone()).await;
546                continue;
547            }
548
549            let should_kill = thread_rng().random_bool(env.kill_rate());
550            // spawn a background task to kill nodes
551            let handle = if should_kill {
552                let cluster = cluster.clone();
553                let opts = env.kill_opts();
554                Some(tokio::spawn(async move {
555                    let t = thread_rng().random_range(Duration::default()..Duration::from_secs(1));
556                    tokio::time::sleep(t).await;
557                    cluster.kill_node(&opts).await;
558                    tokio::time::sleep(Duration::from_secs(15)).await;
559                }))
560            } else {
561                None
562            };
563
564            for i in 0usize.. {
565                tracing::debug!(iteration = i, "retry count");
566                let delay = Duration::from_secs(min(1 << i, 10));
567                if i > 0 {
568                    tokio::time::sleep(delay).await;
569                }
570                match tester
571                    .run_async(record.clone())
572                    .timed(|_res, elapsed| {
573                        tracing::debug!("Record {:?} finished in {:?}", record, elapsed)
574                    })
575                    .await
576                {
577                    Ok(_) => {
578                        // For background ddl
579                        if let SqlCmd::CreateMaterializedView { ref name } = cmd
580                            && background_ddl_enabled
581                            && !matches!(
582                                record,
583                                Record::Statement {
584                                    expected: StatementExpect::Error(_),
585                                    ..
586                                } | Record::Query {
587                                    expected: QueryExpect::Error(_),
588                                    ..
589                                }
590                            )
591                        {
592                            wait_or_retry_background_ddl!(&record, name, i);
593                        }
594                        break;
595                    }
596                    Err(e) => {
597                        match cmd {
598                            // allow 'table exists' error when retry CREATE statement
599                            SqlCmd::Create {
600                                is_create_table_as: false,
601                            }
602                            | SqlCmd::CreateSink {
603                                is_sink_into_table: false,
604                            }
605                            | SqlCmd::CreateMaterializedView { .. }
606                                if i != 0
607                                    && let e = e.to_string()
608                                    // It should not be a gRPC request to meta error,
609                                    // otherwise it means that the catalog is not yet populated to fe.
610                                    && !e.contains("gRPC request to meta service failed")
611                                    && e.contains("exists")
612                                    && !e.contains("under creation")
613                                    && e.contains("Catalog error") =>
614                            {
615                                break;
616                            }
617                            // allow 'not found' error when retry DROP statement
618                            SqlCmd::Drop
619                                if i != 0
620                                    && e.to_string().contains("not found")
621                                    && e.to_string().contains("Catalog error") =>
622                            {
623                                break;
624                            }
625
626                            // Keep i >= MAX_RETRY for other errors. Since these errors indicate that the MV might not yet be created.
627                            _ if i >= MAX_RETRY => {
628                                panic!("failed to run test after retry {i} times: {e}")
629                            }
630                            SqlCmd::CreateMaterializedView { ref name }
631                                if i != 0
632                                    && e.to_string().contains("table is in creating procedure")
633                                    && background_ddl_enabled =>
634                            {
635                                wait_or_retry_background_ddl!(&record, name, i);
636                            }
637                            _ => tracing::error!(
638                                iteration = i,
639                                "failed to run test: {e}\nretry after {delay:?}"
640                            ),
641                        }
642                    }
643                }
644            }
645            if let SqlCmd::SetBackgroundDdl { enable } = cmd {
646                background_ddl_enabled = enable;
647            };
648            if let Some(handle) = handle {
649                handle.await.unwrap();
650            }
651        }
652    }
653}
654
655pub async fn run_parallel_slt_task(glob: &str, jobs: usize) -> Result<(), ParallelTestError> {
656    let mut tester =
657        sqllogictest::Runner::new(|| RisingWave::connect("frontend".into(), "dev".into()));
658    tester.add_label("madsim");
659
660    if let Some(partitioner) = PARTITIONER.as_ref() {
661        tester.with_partitioner(partitioner.clone());
662    }
663
664    tester
665        .run_parallel_async(
666            glob,
667            vec!["frontend".into()],
668            |host, dbname| async move { RisingWave::connect(host, dbname).await.unwrap() },
669            jobs,
670        )
671        .await
672        .map_err(|e| panic!("{e}"))
673}
674
675/// Replace some strings in kafka.slt and write to a new temp file.
676fn hack_kafka_test(path: &Path) -> tempfile::NamedTempFile {
677    let content = std::fs::read_to_string(path).expect("failed to read file");
678    let simple_avsc_full_path =
679        std::fs::canonicalize("src/connector/src/test_data/simple-schema.avsc")
680            .expect("failed to get schema path");
681    let complex_avsc_full_path =
682        std::fs::canonicalize("src/connector/src/test_data/complex-schema.avsc")
683            .expect("failed to get schema path");
684    let json_schema_full_path =
685        std::fs::canonicalize("src/connector/src/test_data/complex-schema.json")
686            .expect("failed to get schema path");
687    let content = content
688        .replace("127.0.0.1:29092", "192.168.11.1:29092")
689        .replace("localhost:29092", "192.168.11.1:29092")
690        .replace(
691            "/risingwave/avro-simple-schema.avsc",
692            simple_avsc_full_path.to_str().unwrap(),
693        )
694        .replace(
695            "/risingwave/avro-complex-schema.avsc",
696            complex_avsc_full_path.to_str().unwrap(),
697        )
698        .replace(
699            "/risingwave/json-complex-schema",
700            json_schema_full_path.to_str().unwrap(),
701        );
702    let file = tempfile::NamedTempFile::new().expect("failed to create temp file");
703    std::fs::write(file.path(), content).expect("failed to write file");
704    println!("created a temp file for kafka test: {:?}", file.path());
705    file
706}
707
708#[cfg(test)]
709mod tests {
710    use std::fmt::Debug;
711
712    use expect_test::{Expect, expect};
713
714    use super::*;
715
716    fn check(actual: impl Debug, expect: Expect) {
717        let actual = format!("{:#?}", actual);
718        expect.assert_eq(&actual);
719    }
720
721    #[test]
722    fn test_extract_sql_command() {
723        check(
724            extract_sql_command("create  table  t as select 1;"),
725            expect![[r#"
726                Ok(
727                    Create {
728                        is_create_table_as: true,
729                    },
730                )"#]],
731        );
732        check(
733            extract_sql_command("  create table  t (a int);"),
734            expect![[r#"
735                Ok(
736                    Create {
737                        is_create_table_as: false,
738                    },
739                )"#]],
740        );
741        check(
742            extract_sql_command(" create materialized   view  m_1 as select 1;"),
743            expect![[r#"
744                Ok(
745                    CreateMaterializedView {
746                        name: "m_1",
747                    },
748                )"#]],
749        );
750        check(
751            extract_sql_command("set background_ddl= true;"),
752            expect![[r#"
753                Ok(
754                    SetBackgroundDdl {
755                        enable: true,
756                    },
757                )"#]],
758        );
759        check(
760            extract_sql_command("SET BACKGROUND_DDL=true;"),
761            expect![[r#"
762                Ok(
763                    SetBackgroundDdl {
764                        enable: true,
765                    },
766                )"#]],
767        );
768        check(
769            extract_sql_command("CREATE MATERIALIZED VIEW if not exists m_1 as select 1;"),
770            expect![[r#"
771                Ok(
772                    CreateMaterializedView {
773                        name: "m_1",
774                    },
775                )"#]],
776        )
777    }
778}