risingwave_simulation/
slt.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::min;
16use std::path::Path;
17use std::sync::Arc;
18use std::time::Duration;
19
20use anyhow::Result;
21use rand::{Rng, rng as thread_rng};
22use sqllogictest::substitution::well_known;
23use sqllogictest::{
24    Condition, ParallelTestError, Partitioner, QueryExpect, Record, StatementExpect,
25};
26
27use crate::client::RisingWave;
28use crate::cluster::Cluster;
29use crate::parse::extract_sql_command;
30use crate::slt::background_ddl_mode::*;
31use crate::slt::runner::{random_vnode_count, run_kill_not_allowed, run_no_kill};
32use crate::slt::slt_env::{Env, Opts};
33use crate::slt::vnode_mode::*;
34use crate::utils::TimedExt;
35use crate::{evaluate_skip, wait_or_retry_background_ddl};
36
37// retry a maximum times until it succeed
38const MAX_RETRY: usize = 10;
39
40#[derive(Debug, PartialEq, Eq)]
41pub enum SqlCmd {
42    /// Other create statements.
43    Create {
44        is_create_table_as: bool,
45    },
46    /// Create Materialized views
47    CreateMaterializedView {
48        name: String,
49    },
50    /// Set background ddl
51    SetBackgroundDdl {
52        enable: bool,
53    },
54    Drop,
55    Dml,
56    Flush,
57    Others,
58}
59
60impl SqlCmd {
61    fn allow_kill(&self) -> bool {
62        matches!(
63            self,
64            SqlCmd::Create {
65                // `create table as` is also not atomic in our system.
66                is_create_table_as: false,
67                ..
68            } | SqlCmd::CreateMaterializedView { .. }
69                | SqlCmd::Drop
70        )
71        // We won't kill during insert/update/delete/alter since the atomicity is not guaranteed.
72        // TODO: For `SqlCmd::Alter`, since table fragment and catalog commit for table schema change
73        // are not transactional, we can't kill during `alter table add/drop columns` for now, will
74        // remove it until transactional commit of table fragment and catalog is supported.
75    }
76
77    fn is_create(&self) -> bool {
78        matches!(
79            self,
80            SqlCmd::Create { .. } | SqlCmd::CreateMaterializedView { .. }
81        )
82    }
83}
84
85const KILL_IGNORE_FILES: &[&str] = &[
86    // TPCH queries are too slow for recovery.
87    "tpch_snapshot.slt",
88    "tpch_upstream.slt",
89    // Drop is not retryable in search path test.
90    "search_path.slt",
91    // Transaction statements are not retryable.
92    "transaction/now.slt",
93    "transaction/read_only_multi_conn.slt",
94    "transaction/read_only.slt",
95    "transaction/tolerance.slt",
96    "transaction/cursor.slt",
97    "transaction/cursor_multi_conn.slt",
98];
99
100/// Randomly set DDL statements to use `background_ddl`
101mod background_ddl_mode {
102    use anyhow::bail;
103    use rand::Rng;
104    use rand_chacha::ChaChaRng;
105    use sqllogictest::{Condition, Record, StatementExpect};
106
107    use crate::client::RisingWave;
108    use crate::slt::SqlCmd;
109    use crate::slt::slt_env::Env;
110
111    /// Wait for background mv to finish creating
112    pub(super) async fn wait_background_mv_finished(mview_name: &str) -> anyhow::Result<()> {
113        let Ok(rw) = RisingWave::connect("frontend".into(), "dev".into()).await else {
114            bail!("failed to connect to frontend for {mview_name}");
115        };
116        let client = rw.pg_client();
117        if client.simple_query("WAIT;").await.is_err() {
118            bail!("failed to wait for background mv to finish creating for {mview_name}");
119        }
120
121        let Ok(result) = client
122            .query(
123                "select count(*) from pg_matviews where matviewname=$1;",
124                &[&mview_name],
125            )
126            .await
127        else {
128            bail!("failed to query pg_matviews for {mview_name}");
129        };
130
131        match result[0].try_get::<_, i64>(0) {
132            Ok(1) => Ok(()),
133            r => bail!("expected 1 row in pg_matviews, got {r:#?} instead for {mview_name}"),
134        }
135    }
136
137    // TODO(kwannoel): move rng, `background_ddl` to `Env` struct
138    pub(super) async fn set_background_ddl<D, M, T>(
139        tester: &mut sqllogictest::Runner<D, M>,
140        env: &Env,
141        record: &Record<T>,
142        cmd: &SqlCmd,
143        manual_background_ddl_enabled: bool,
144        rng: &mut ChaChaRng,
145        background_ddl_enabled: &mut bool,
146    ) where
147        D: sqllogictest::AsyncDB<ColumnType = T>,
148        M: sqllogictest::MakeConnection<Conn = D>,
149        T: sqllogictest::ColumnType,
150    {
151        if let Record::Statement {
152            loc,
153            conditions,
154            connection,
155            ..
156        } = &record
157            && matches!(cmd, SqlCmd::CreateMaterializedView { .. })
158            && !manual_background_ddl_enabled
159            && conditions.iter().all(|c| {
160                *c != Condition::SkipIf {
161                    label: "madsim".to_owned(),
162                }
163            })
164            && env.background_ddl_rate() > 0.0
165        {
166            let background_ddl_setting = rng.random_bool(env.background_ddl_rate());
167            let set_background_ddl = Record::Statement {
168                loc: loc.clone(),
169                conditions: conditions.clone(),
170                connection: connection.clone(),
171                expected: StatementExpect::Ok,
172                sql: format!("SET BACKGROUND_DDL={background_ddl_setting};"),
173                retry: None,
174            };
175            tester.run_async(set_background_ddl).await.unwrap();
176            *background_ddl_enabled = background_ddl_setting;
177        };
178    }
179
180    // NOTE(kwannoel): only applicable to mvs currently
181    #[macro_export]
182    macro_rules! wait_or_retry_background_ddl {
183        ($record:expr, $name:expr, $iteration:expr) => {
184            let record = $record;
185            let i = $iteration;
186            tracing::debug!(iteration = i, "Retry for background ddl");
187            match wait_background_mv_finished($name).await {
188                Ok(_) => {
189                    tracing::debug!(
190                        iteration = i,
191                        "Record with background_ddl {:?} finished",
192                        record
193                    );
194                    break;
195                }
196                Err(err) => {
197                    tracing::error!(
198                        iteration = i,
199                        ?err,
200                        "failed to wait for background mv to finish creating"
201                    );
202                    if i >= MAX_RETRY {
203                        panic!("failed to run test after retry {i} times, error={err:#?}");
204                    }
205                    continue;
206                }
207            }
208        };
209    }
210}
211
212mod vnode_mode {
213    use std::env;
214    use std::hash::{DefaultHasher, Hash, Hasher};
215    use std::sync::LazyLock;
216
217    use anyhow::bail;
218    use sqllogictest::Partitioner;
219
220    // Copied from sqllogictest-bin.
221    #[derive(Clone)]
222    pub(super) struct HashPartitioner {
223        count: u64,
224        id: u64,
225    }
226
227    impl HashPartitioner {
228        pub(super) fn new(count: u64, id: u64) -> anyhow::Result<Self> {
229            if count == 0 {
230                bail!("partition count must be greater than zero");
231            }
232            if id >= count {
233                bail!("partition id (zero-based) must be less than count");
234            }
235            Ok(Self { count, id })
236        }
237    }
238
239    impl Partitioner for HashPartitioner {
240        fn matches(&self, file_name: &str) -> bool {
241            let mut hasher = DefaultHasher::new();
242            file_name.hash(&mut hasher);
243            hasher.finish() % self.count == self.id
244        }
245    }
246
247    pub(super) static PARTITIONER: LazyLock<Option<HashPartitioner>> = LazyLock::new(|| {
248        let count = env::var("BUILDKITE_PARALLEL_JOB_COUNT")
249            .ok()?
250            .parse::<u64>()
251            .unwrap();
252        let id = env::var("BUILDKITE_PARALLEL_JOB")
253            .ok()?
254            .parse::<u64>()
255            .unwrap();
256        Some(HashPartitioner::new(count, id).unwrap())
257    });
258}
259
260pub mod slt_env {
261    use rand::SeedableRng;
262    use rand_chacha::ChaChaRng;
263
264    use crate::cluster::KillOpts;
265
266    pub struct Opts {
267        pub kill_opts: KillOpts,
268        /// Probability of `background_ddl` being set to true per ddl record.
269        pub background_ddl_rate: f64,
270        /// Set vnode count (`STREAMING_MAX_PARALLELISM`) to random value before running DDL.
271        pub random_vnode_count: bool,
272    }
273
274    pub(super) struct Env {
275        opts: Opts,
276    }
277
278    impl Env {
279        pub fn new(opts: Opts) -> Self {
280            Self { opts }
281        }
282
283        pub fn get_rng() -> ChaChaRng {
284            let seed = std::env::var("MADSIM_TEST_SEED")
285                .unwrap_or("0".to_owned())
286                .parse::<u64>()
287                .unwrap();
288            ChaChaRng::seed_from_u64(seed)
289        }
290
291        pub fn background_ddl_rate(&self) -> f64 {
292            self.opts.background_ddl_rate
293        }
294
295        pub fn kill(&self) -> bool {
296            self.opts.kill_opts.kill_compute
297                || self.opts.kill_opts.kill_meta
298                || self.opts.kill_opts.kill_frontend
299                || self.opts.kill_opts.kill_compactor
300        }
301
302        pub fn random_vnode_count(&self) -> bool {
303            self.opts.random_vnode_count
304        }
305
306        pub fn kill_opts(&self) -> KillOpts {
307            self.opts.kill_opts
308        }
309
310        pub fn kill_rate(&self) -> f64 {
311            self.opts.kill_opts.kill_rate as f64
312        }
313    }
314}
315
316mod runner {
317    use std::time::Duration;
318
319    use rand::prelude::IteratorRandom;
320    use rand::rng as thread_rng;
321    use sqllogictest::{Record, StatementExpect, TestError};
322
323    use crate::slt::slt_env::Env;
324    use crate::slt::{MAX_RETRY, SqlCmd};
325    use crate::utils::TimedExt;
326    #[macro_export]
327    macro_rules! evaluate_skip {
328        ($env:expr, $path:expr) => {
329            if let Some(partitioner) = PARTITIONER.as_ref()
330                && !partitioner.matches($path.to_str().unwrap())
331            {
332                println!("[skip partition] {}", $path.display());
333                continue;
334            } else if $env.kill() && KILL_IGNORE_FILES.iter().any(|s| $path.ends_with(s)) {
335                println!("[skip kill] {}", $path.display());
336                continue;
337            } else {
338                println!("[run] {}", $path.display());
339            }
340        };
341    }
342
343    pub(super) fn random_vnode_count<T: sqllogictest::ColumnType>(
344        env: &Env,
345        records: &[Record<T>],
346    ) -> bool {
347        env.random_vnode_count()
348            && records.iter().all(|record| {
349                if let Record::Statement { sql, .. } | Record::Query { sql, .. } = record
350                    && sql.to_lowercase().contains("parallelism")
351                {
352                    println!("[RANDOM VNODE COUNT] skip: {}", sql);
353                    false
354                } else {
355                    true
356                }
357            })
358    }
359
360    /// `kill` is totally disabled.
361    pub(super) async fn run_no_kill<D, M, T>(
362        tester: &mut sqllogictest::Runner<D, M>,
363        random_vnode_count: bool,
364        cmd: &SqlCmd,
365        record: Record<T>,
366    ) -> Result<(), TestError>
367    where
368        D: sqllogictest::AsyncDB<ColumnType = T>,
369        M: sqllogictest::MakeConnection<Conn = D>,
370        T: sqllogictest::ColumnType,
371    {
372        // Set random vnode count if needed.
373        if random_vnode_count
374            && cmd.is_create()
375            && let Record::Statement {
376                loc,
377                conditions,
378                connection,
379                ..
380            } = &record
381        {
382            let vnode_count = (2..=64) // small
383                .chain(224..=288) // normal
384                .chain(992..=1056) // 1024 affects row id gen behavior
385                .choose(&mut thread_rng())
386                .unwrap();
387            let sql = format!("SET STREAMING_MAX_PARALLELISM = {vnode_count};");
388            println!("[RANDOM VNODE COUNT] set: {vnode_count}");
389            let set_random_vnode_count = Record::Statement {
390                loc: loc.clone(),
391                conditions: conditions.clone(),
392                connection: connection.clone(),
393                sql,
394                expected: StatementExpect::Ok,
395                retry: None,
396            };
397            tester.run_async(set_random_vnode_count).await.unwrap();
398            println!("[RANDOM VNODE COUNT] run: {record}");
399        }
400
401        tester.run_async(record).await.map(|_| ())
402    }
403
404    /// Used when `kill` is not allowed for some specific commands
405    pub(super) async fn run_kill_not_allowed<D, M, T>(
406        tester: &mut sqllogictest::Runner<D, M>,
407        record: Record<T>,
408    ) where
409        D: sqllogictest::AsyncDB<ColumnType = T>,
410        M: sqllogictest::MakeConnection<Conn = D>,
411        T: sqllogictest::ColumnType,
412    {
413        for i in 0usize.. {
414            let delay = Duration::from_secs(1 << i);
415            if let Err(err) = tester
416                .run_async(record.clone())
417                .timed(|_res, elapsed| {
418                    tracing::debug!("Record {:?} finished in {:?}", record, elapsed)
419                })
420                .await
421            {
422                let err_string = err.to_string().to_ascii_lowercase();
423                // cluster could be still under recovering if killed before, retry if
424                // meets `no reader for dml in table with id {}`.
425                let allowed_errs = [
426                    "no reader for dml in table",
427                    "error reading a body from connection: broken pipe",
428                    "failed to inject barrier",
429                    "get error from control stream",
430                    "cluster is under recovering",
431                    "streaming vnode mapping has not been initialized",
432                ];
433                let should_retry = i < MAX_RETRY
434                    && allowed_errs
435                        .iter()
436                        .any(|allowed_err| err_string.contains(&allowed_err.to_ascii_lowercase()));
437                if !should_retry {
438                    panic!("{}", err);
439                }
440                tracing::error!("failed to run test: {err}\nretry after {delay:?}");
441            } else {
442                break;
443            }
444            tokio::time::sleep(delay).await;
445        }
446    }
447}
448
449/// Run the sqllogictest files in `glob`.
450pub async fn run_slt_task(cluster: Arc<Cluster>, glob: &str, opts: Opts) {
451    let env = slt_env::Env::new(opts);
452    tracing::info!("background_ddl_rate: {}", env.background_ddl_rate());
453    let mut rng = Env::get_rng();
454    let files = glob::glob(glob).expect("failed to read glob pattern");
455    for file in files {
456        // use a session per file
457        let mut tester =
458            sqllogictest::Runner::new(|| RisingWave::connect("frontend".into(), "dev".into()));
459        tester.add_label("madsim");
460        tester.set_var(well_known::DATABASE.to_owned(), "dev".to_owned());
461
462        let file = file.unwrap();
463        let path = file.as_path();
464
465        evaluate_skip!(env, path);
466
467        // XXX: hack for kafka source test
468        let tempfile = (path.ends_with("kafka.slt") || path.ends_with("kafka_batch.slt"))
469            .then(|| hack_kafka_test(path));
470        let path = tempfile.as_ref().map(|p| p.path()).unwrap_or(path);
471
472        // NOTE(kwannoel): For background ddl
473        let mut background_ddl_enabled = false;
474
475        // If background ddl is set to true within the test case, prevent random setting of background_ddl to true.
476        // We can revert it back to false only if we encounter a record that sets background_ddl to false.
477        let mut manual_background_ddl_enabled = false;
478
479        let records = sqllogictest::parse_file(path).expect("failed to parse file");
480        let random_vnode_count = random_vnode_count(&env, &records);
481
482        for record in records {
483            // uncomment to print metrics for task counts
484            // let metrics = madsim::runtime::Handle::current().metrics();
485            // println!("{:#?}", metrics);
486            // println!("{}", metrics.num_tasks_by_node_by_spawn());
487            if let sqllogictest::Record::Halt { .. } = record {
488                break;
489            }
490
491            let cmd = match &record {
492                sqllogictest::Record::Statement {
493                    sql, conditions, ..
494                }
495                | sqllogictest::Record::Query {
496                    sql, conditions, ..
497                } if conditions
498                    .iter()
499                    .all(|c| !matches!(c, Condition::SkipIf{ label } if label == "madsim"))
500                    && !conditions
501                        .iter()
502                        .any(|c| matches!(c, Condition::OnlyIf{ label} if label != "madsim" )) =>
503                {
504                    extract_sql_command(sql).unwrap_or(SqlCmd::Others)
505                }
506                _ => SqlCmd::Others,
507            };
508
509            // For normal records.
510            if !env.kill() {
511                match run_no_kill(&mut tester, random_vnode_count, &cmd, record).await {
512                    Ok(_) => continue,
513                    Err(e) => panic!("{}", e),
514                }
515            }
516
517            // For kill enabled.
518            tracing::debug!(?cmd, "Running");
519
520            if let SqlCmd::SetBackgroundDdl { enable } = cmd {
521                manual_background_ddl_enabled = enable;
522                background_ddl_enabled = enable;
523            }
524
525            // For each background ddl compatible statement, provide a chance for background_ddl=true.
526            set_background_ddl(
527                &mut tester,
528                &env,
529                &record,
530                &cmd,
531                manual_background_ddl_enabled,
532                &mut rng,
533                &mut background_ddl_enabled,
534            )
535            .await;
536
537            if !cmd.allow_kill() {
538                run_kill_not_allowed(&mut tester, record.clone()).await;
539                continue;
540            }
541
542            let should_kill = thread_rng().random_bool(env.kill_rate());
543            // spawn a background task to kill nodes
544            let handle = if should_kill {
545                let cluster = cluster.clone();
546                let opts = env.kill_opts();
547                Some(tokio::spawn(async move {
548                    let t = thread_rng().random_range(Duration::default()..Duration::from_secs(1));
549                    tokio::time::sleep(t).await;
550                    cluster.kill_node(&opts).await;
551                    tokio::time::sleep(Duration::from_secs(15)).await;
552                }))
553            } else {
554                None
555            };
556
557            for i in 0usize.. {
558                tracing::debug!(iteration = i, "retry count");
559                let delay = Duration::from_secs(min(1 << i, 10));
560                if i > 0 {
561                    tokio::time::sleep(delay).await;
562                }
563                match tester
564                    .run_async(record.clone())
565                    .timed(|_res, elapsed| {
566                        tracing::debug!("Record {:?} finished in {:?}", record, elapsed)
567                    })
568                    .await
569                {
570                    Ok(_) => {
571                        // For background ddl
572                        if let SqlCmd::CreateMaterializedView { ref name } = cmd
573                            && background_ddl_enabled
574                            && !matches!(
575                                record,
576                                Record::Statement {
577                                    expected: StatementExpect::Error(_),
578                                    ..
579                                } | Record::Query {
580                                    expected: QueryExpect::Error(_),
581                                    ..
582                                }
583                            )
584                        {
585                            wait_or_retry_background_ddl!(&record, name, i);
586                        }
587                        break;
588                    }
589                    Err(e) => {
590                        match cmd {
591                            // allow 'table exists' error when retry CREATE statement
592                            SqlCmd::Create {
593                                is_create_table_as: false,
594                            }
595                            | SqlCmd::CreateMaterializedView { .. }
596                                if i != 0
597                                    && let e = e.to_string()
598                                    // It should not be a gRPC request to meta error,
599                                    // otherwise it means that the catalog is not yet populated to fe.
600                                    && !e.contains("gRPC request to meta service failed")
601                                    && e.contains("exists")
602                                    && !e.contains("under creation")
603                                    && e.contains("Catalog error") =>
604                            {
605                                break;
606                            }
607                            // allow 'not found' error when retry DROP statement
608                            SqlCmd::Drop
609                                if i != 0
610                                    && e.to_string().contains("not found")
611                                    && e.to_string().contains("Catalog error") =>
612                            {
613                                break;
614                            }
615
616                            // Keep i >= MAX_RETRY for other errors. Since these errors indicate that the MV might not yet be created.
617                            _ if i >= MAX_RETRY => {
618                                panic!("failed to run test after retry {i} times: {e}")
619                            }
620                            SqlCmd::CreateMaterializedView { ref name }
621                                if i != 0
622                                    && e.to_string().contains("table is in creating procedure")
623                                    && background_ddl_enabled =>
624                            {
625                                wait_or_retry_background_ddl!(&record, name, i);
626                            }
627                            _ => tracing::error!(
628                                iteration = i,
629                                "failed to run test: {e}\nretry after {delay:?}"
630                            ),
631                        }
632                    }
633                }
634            }
635            if let SqlCmd::SetBackgroundDdl { enable } = cmd {
636                background_ddl_enabled = enable;
637            };
638            if let Some(handle) = handle {
639                handle.await.unwrap();
640            }
641        }
642    }
643}
644
645pub async fn run_parallel_slt_task(glob: &str, jobs: usize) -> Result<(), ParallelTestError> {
646    let mut tester =
647        sqllogictest::Runner::new(|| RisingWave::connect("frontend".into(), "dev".into()));
648    tester.add_label("madsim");
649
650    if let Some(partitioner) = PARTITIONER.as_ref() {
651        tester.with_partitioner(partitioner.clone());
652    }
653
654    tester
655        .run_parallel_async(
656            glob,
657            vec!["frontend".into()],
658            |host, dbname| async move { RisingWave::connect(host, dbname).await.unwrap() },
659            jobs,
660        )
661        .await
662        .map_err(|e| panic!("{e}"))
663}
664
665/// Replace some strings in kafka.slt and write to a new temp file.
666fn hack_kafka_test(path: &Path) -> tempfile::NamedTempFile {
667    let content = std::fs::read_to_string(path).expect("failed to read file");
668    let simple_avsc_full_path =
669        std::fs::canonicalize("src/connector/src/test_data/simple-schema.avsc")
670            .expect("failed to get schema path");
671    let complex_avsc_full_path =
672        std::fs::canonicalize("src/connector/src/test_data/complex-schema.avsc")
673            .expect("failed to get schema path");
674    let json_schema_full_path =
675        std::fs::canonicalize("src/connector/src/test_data/complex-schema.json")
676            .expect("failed to get schema path");
677    let content = content
678        .replace("127.0.0.1:29092", "192.168.11.1:29092")
679        .replace("localhost:29092", "192.168.11.1:29092")
680        .replace(
681            "/risingwave/avro-simple-schema.avsc",
682            simple_avsc_full_path.to_str().unwrap(),
683        )
684        .replace(
685            "/risingwave/avro-complex-schema.avsc",
686            complex_avsc_full_path.to_str().unwrap(),
687        )
688        .replace(
689            "/risingwave/json-complex-schema",
690            json_schema_full_path.to_str().unwrap(),
691        );
692    let file = tempfile::NamedTempFile::new().expect("failed to create temp file");
693    std::fs::write(file.path(), content).expect("failed to write file");
694    println!("created a temp file for kafka test: {:?}", file.path());
695    file
696}
697
698#[cfg(test)]
699mod tests {
700    use std::fmt::Debug;
701
702    use expect_test::{Expect, expect};
703
704    use super::*;
705
706    fn check(actual: impl Debug, expect: Expect) {
707        let actual = format!("{:#?}", actual);
708        expect.assert_eq(&actual);
709    }
710
711    #[test]
712    fn test_extract_sql_command() {
713        check(
714            extract_sql_command("create  table  t as select 1;"),
715            expect![[r#"
716                Ok(
717                    Create {
718                        is_create_table_as: true,
719                    },
720                )"#]],
721        );
722        check(
723            extract_sql_command("  create table  t (a int);"),
724            expect![[r#"
725                Ok(
726                    Create {
727                        is_create_table_as: false,
728                    },
729                )"#]],
730        );
731        check(
732            extract_sql_command(" create materialized   view  m_1 as select 1;"),
733            expect![[r#"
734                Ok(
735                    CreateMaterializedView {
736                        name: "m_1",
737                    },
738                )"#]],
739        );
740        check(
741            extract_sql_command("set background_ddl= true;"),
742            expect![[r#"
743                Ok(
744                    SetBackgroundDdl {
745                        enable: true,
746                    },
747                )"#]],
748        );
749        check(
750            extract_sql_command("SET BACKGROUND_DDL=true;"),
751            expect![[r#"
752                Ok(
753                    SetBackgroundDdl {
754                        enable: true,
755                    },
756                )"#]],
757        );
758        check(
759            extract_sql_command("CREATE MATERIALIZED VIEW if not exists m_1 as select 1;"),
760            expect![[r#"
761                Ok(
762                    CreateMaterializedView {
763                        name: "m_1",
764                    },
765                )"#]],
766        )
767    }
768}