risingwave_planner_test/
lib.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![feature(let_chains)]
16#![allow(clippy::derive_partial_eq_without_eq)]
17
18//! Data-driven tests.
19
20risingwave_expr_impl::enable!();
21
22mod resolve_id;
23
24use std::collections::{BTreeMap, HashSet};
25use std::path::{Path, PathBuf};
26use std::sync::Arc;
27
28use anyhow::{Result, anyhow, bail};
29pub use resolve_id::*;
30use risingwave_frontend::handler::util::SourceSchemaCompatExt;
31use risingwave_frontend::handler::{
32    HandlerArgs, create_index, create_mv, create_schema, create_source, create_table, create_view,
33    drop_table, explain, variable,
34};
35use risingwave_frontend::session::SessionImpl;
36use risingwave_frontend::test_utils::{LocalFrontend, create_proto_file, get_explain_output};
37use risingwave_frontend::{
38    Binder, Explain, FrontendOpts, OptimizerContext, OptimizerContextRef, PlanRef, Planner,
39    WithOptionsSecResolved, build_graph, explain_stream_graph,
40};
41use risingwave_sqlparser::ast::{
42    AstOption, DropMode, EmitMode, ExplainOptions, ObjectName, Statement,
43};
44use risingwave_sqlparser::parser::Parser;
45use serde::{Deserialize, Serialize};
46use thiserror_ext::AsReport;
47
48#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Hash, Eq)]
49#[serde(deny_unknown_fields, rename_all = "snake_case")]
50pub enum TestType {
51    /// The result of an `EXPLAIN` statement.
52    ///
53    /// This field is used when `sql` is an `EXPLAIN` statement.
54    /// In this case, all other fields are invalid.
55    ExplainOutput,
56
57    /// The original logical plan
58    LogicalPlan,
59    /// Logical plan with optimization `.gen_optimized_logical_plan_for_batch()`
60    OptimizedLogicalPlanForBatch,
61    /// Logical plan with optimization `.gen_optimized_logical_plan_for_stream()`
62    OptimizedLogicalPlanForStream,
63
64    /// Distributed batch plan `.gen_batch_query_plan()`
65    BatchPlan,
66    /// Proto JSON of generated batch plan
67    BatchPlanProto,
68    /// Batch plan for local execution `.gen_batch_local_plan()`
69    BatchLocalPlan,
70    /// Batch plan for local execution `.gen_batch_distributed_plan()`
71    BatchDistributedPlan,
72
73    /// Create MV plan `.gen_create_mv_plan()`
74    StreamPlan,
75    /// Create MV fragments plan
76    StreamDistPlan,
77    /// Create MV plan with EOWC semantics `.gen_create_mv_plan(.., EmitMode::OnWindowClose)`
78    EowcStreamPlan,
79    /// Create MV fragments plan with EOWC semantics
80    EowcStreamDistPlan,
81
82    /// Create sink plan (assumes blackhole sink)
83    /// TODO: Other sinks
84    SinkPlan,
85
86    BinderError,
87    PlannerError,
88    OptimizerError,
89    BatchError,
90    BatchLocalError,
91    StreamError,
92    EowcStreamError,
93}
94
95pub fn check(actual: Vec<TestCaseResult>, expect: expect_test::ExpectFile) {
96    let actual = serde_yaml::to_string(&actual).unwrap();
97    expect.assert_eq(&format!("# This file is automatically generated. See `src/frontend/planner_test/README.md` for more information.\n{}",actual));
98}
99
100#[serde_with::skip_serializing_none]
101#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Default)]
102#[serde(deny_unknown_fields)]
103pub struct TestInput {
104    /// Id of the test case, used in before.
105    pub id: Option<String>,
106    /// A brief description of the test case.
107    pub name: Option<String>,
108    /// Before running the SQL statements, the test runner will execute the specified test cases
109    pub before: Option<Vec<String>>,
110    /// The resolved statements of the before ids
111    #[serde(skip_serializing)]
112    before_statements: Option<Vec<String>>,
113    /// The SQL statements
114    pub sql: String,
115}
116
117#[serde_with::skip_serializing_none]
118#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Default)]
119#[serde(deny_unknown_fields)]
120pub struct TestCase {
121    #[serde(flatten)]
122    pub input: TestInput,
123
124    // TODO: these should also be in TestInput, but it affects ordering. So next PR
125    /// Support using file content or file location to create source.
126    pub create_source: Option<CreateConnector>,
127    /// Support using file content or file location to create table with connector.
128    pub create_table_with_connector: Option<CreateConnector>,
129    /// Provide config map to frontend
130    pub with_config_map: Option<BTreeMap<String, String>>,
131
132    /// Specify what output fields to check
133    pub expected_outputs: HashSet<TestType>,
134}
135
136impl TestCase {
137    pub fn id(&self) -> &Option<String> {
138        &self.input.id
139    }
140
141    pub fn name(&self) -> &Option<String> {
142        &self.input.name
143    }
144
145    pub fn before(&self) -> &Option<Vec<String>> {
146        &self.input.before
147    }
148
149    pub fn before_statements(&self) -> &Option<Vec<String>> {
150        &self.input.before_statements
151    }
152
153    pub fn sql(&self) -> &String {
154        &self.input.sql
155    }
156
157    pub fn create_source(&self) -> &Option<CreateConnector> {
158        &self.create_source
159    }
160
161    pub fn create_table_with_connector(&self) -> &Option<CreateConnector> {
162        &self.create_table_with_connector
163    }
164
165    pub fn with_config_map(&self) -> &Option<BTreeMap<String, String>> {
166        &self.with_config_map
167    }
168}
169
170#[serde_with::skip_serializing_none]
171#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Default)]
172#[serde(deny_unknown_fields)]
173pub struct CreateConnector {
174    format: String,
175    encode: String,
176    name: String,
177    file: Option<String>,
178    is_table: Option<bool>,
179}
180
181#[serde_with::skip_serializing_none]
182#[derive(Debug, PartialEq, Serialize, Deserialize, Default)]
183#[serde(deny_unknown_fields)]
184pub struct TestCaseResult {
185    #[serde(flatten)]
186    pub input: TestInput,
187
188    /// The original logical plan
189    pub logical_plan: Option<String>,
190
191    /// Logical plan with optimization `.gen_optimized_logical_plan_for_batch()`
192    pub optimized_logical_plan_for_batch: Option<String>,
193
194    /// Logical plan with optimization `.gen_optimized_logical_plan_for_stream()`
195    pub optimized_logical_plan_for_stream: Option<String>,
196
197    /// Distributed batch plan `.gen_batch_query_plan()`
198    pub batch_plan: Option<String>,
199
200    /// Proto JSON of generated batch plan
201    pub batch_plan_proto: Option<String>,
202
203    /// Batch plan for local execution `.gen_batch_local_plan()`
204    pub batch_local_plan: Option<String>,
205
206    /// Batch plan for distributed execution `.gen_batch_distributed_plan()`
207    pub batch_distributed_plan: Option<String>,
208
209    /// Generate sink plan
210    pub sink_plan: Option<String>,
211
212    /// Create MV plan `.gen_create_mv_plan()`
213    pub stream_plan: Option<String>,
214
215    /// Create MV fragments plan
216    pub stream_dist_plan: Option<String>,
217
218    /// Create MV plan with EOWC semantics `.gen_create_mv_plan(.., EmitMode::OnWindowClose)`
219    pub eowc_stream_plan: Option<String>,
220
221    /// Create MV fragments plan with EOWC semantics
222    pub eowc_stream_dist_plan: Option<String>,
223
224    /// Error of binder
225    pub binder_error: Option<String>,
226
227    /// Error of planner
228    pub planner_error: Option<String>,
229
230    /// Error of optimizer
231    pub optimizer_error: Option<String>,
232
233    /// Error of `.gen_batch_query_plan()`
234    pub batch_error: Option<String>,
235
236    /// Error of `.gen_batch_local_plan()`
237    pub batch_local_error: Option<String>,
238
239    /// Error of `.gen_stream_plan()`
240    pub stream_error: Option<String>,
241
242    /// Error of `.gen_stream_plan()` with `emit_on_window_close = true`
243    pub eowc_stream_error: Option<String>,
244
245    /// Error of `.gen_sink_plan()`
246    pub sink_error: Option<String>,
247
248    /// The result of an `EXPLAIN` statement.
249    ///
250    /// This field is used when `sql` is an `EXPLAIN` statement.
251    /// In this case, all other fields are invalid.
252    pub explain_output: Option<String>,
253
254    // TODO: these should also be in TestInput, but it affects ordering. So next PR
255    /// Support using file content or file location to create source.
256    pub create_source: Option<CreateConnector>,
257    /// Support using file content or file location to create table with connector.
258    pub create_table_with_connector: Option<CreateConnector>,
259    /// Provide config map to frontend
260    pub with_config_map: Option<BTreeMap<String, String>>,
261}
262
263impl TestCase {
264    /// Run the test case, and return the expected output.
265    pub async fn run(&self, do_check_result: bool) -> Result<TestCaseResult> {
266        let session = {
267            let frontend = LocalFrontend::new(FrontendOpts::default()).await;
268            frontend.session_ref()
269        };
270
271        if let Some(config_map) = self.with_config_map() {
272            for (key, val) in config_map {
273                session.set_config(key, val.to_owned()).unwrap();
274            }
275        }
276
277        let placeholder_empty_vec = vec![];
278
279        // Since temp file will be deleted when it goes out of scope, so create source in advance.
280        self.do_create_source(session.clone()).await?;
281        self.do_create_table_with_connector(session.clone()).await?;
282
283        let mut result: Option<TestCaseResult> = None;
284        for sql in self
285            .before_statements()
286            .as_ref()
287            .unwrap_or(&placeholder_empty_vec)
288            .iter()
289            .chain(std::iter::once(self.sql()))
290        {
291            result = self
292                .run_sql(
293                    Arc::from(sql.to_owned()),
294                    session.clone(),
295                    do_check_result,
296                    result,
297                )
298                .await?;
299        }
300
301        let mut result = result.unwrap_or_default();
302        result.input = self.input.clone();
303        result.create_source.clone_from(self.create_source());
304        result
305            .create_table_with_connector
306            .clone_from(self.create_table_with_connector());
307        result.with_config_map.clone_from(self.with_config_map());
308
309        Ok(result)
310    }
311
312    #[inline(always)]
313    fn create_connector_sql(
314        is_table: bool,
315        connector_name: String,
316        connector_format: String,
317        connector_encode: String,
318    ) -> String {
319        let object_to_create = if is_table { "TABLE" } else { "SOURCE" };
320        format!(
321            r#"CREATE {} {}
322    WITH (connector = 'kafka', kafka.topic = 'abc', kafka.brokers = 'localhost:1001')
323    FORMAT {} ENCODE {} (message = '.test.TestRecord', schema.location = 'file://"#,
324            object_to_create, connector_name, connector_format, connector_encode
325        )
326    }
327
328    async fn do_create_table_with_connector(
329        &self,
330        session: Arc<SessionImpl>,
331    ) -> Result<Option<TestCaseResult>> {
332        match self.create_table_with_connector().clone() {
333            Some(connector) => {
334                if let Some(content) = connector.file {
335                    let sql = Self::create_connector_sql(
336                        true,
337                        connector.name,
338                        connector.format,
339                        connector.encode,
340                    );
341                    let temp_file = create_proto_file(content.as_str());
342                    self.run_sql(
343                        Arc::from(sql + temp_file.path().to_str().unwrap() + "')"),
344                        session.clone(),
345                        false,
346                        None,
347                    )
348                    .await
349                } else {
350                    panic!(
351                        "{:?} create table with connector must include `file` for the file content",
352                        self.id()
353                    );
354                }
355            }
356            None => Ok(None),
357        }
358    }
359
360    // If testcase have create source info, run sql to create source.
361    // Support create source by file content or file location.
362    async fn do_create_source(&self, session: Arc<SessionImpl>) -> Result<Option<TestCaseResult>> {
363        match self.create_source().clone() {
364            Some(source) => {
365                if let Some(content) = source.file {
366                    let sql = Self::create_connector_sql(
367                        false,
368                        source.name,
369                        source.format,
370                        source.encode,
371                    );
372                    let temp_file = create_proto_file(content.as_str());
373                    self.run_sql(
374                        Arc::from(sql + temp_file.path().to_str().unwrap() + "')"),
375                        session.clone(),
376                        false,
377                        None,
378                    )
379                    .await
380                } else {
381                    panic!(
382                        "{:?} create source must include `file` for the file content",
383                        self.id()
384                    );
385                }
386            }
387            None => Ok(None),
388        }
389    }
390
391    async fn run_sql(
392        &self,
393        sql: Arc<str>,
394        session: Arc<SessionImpl>,
395        do_check_result: bool,
396        mut result: Option<TestCaseResult>,
397    ) -> Result<Option<TestCaseResult>> {
398        let statements = Parser::parse_sql(&sql).unwrap();
399        for stmt in statements {
400            // TODO: `sql` may contain multiple statements here.
401            let handler_args = HandlerArgs::new(session.clone(), &stmt, sql.clone())?;
402            let _guard = session.txn_begin_implicit();
403            match stmt.clone() {
404                Statement::Query(_)
405                | Statement::Insert { .. }
406                | Statement::Delete { .. }
407                | Statement::Update { .. } => {
408                    if result.is_some() {
409                        panic!("two queries in one test case");
410                    }
411                    let explain_options = ExplainOptions {
412                        verbose: true,
413                        ..Default::default()
414                    };
415                    let context = OptimizerContext::new(
416                        HandlerArgs::new(session.clone(), &stmt, sql.clone())?,
417                        explain_options,
418                    );
419                    let ret = self.apply_query(&stmt, context.into())?;
420                    if do_check_result {
421                        check_result(self, &ret)?;
422                    }
423                    result = Some(ret);
424                }
425                Statement::CreateTable {
426                    name,
427                    columns,
428                    constraints,
429                    if_not_exists,
430                    format_encode,
431                    source_watermarks,
432                    append_only,
433                    on_conflict,
434                    with_version_column,
435                    cdc_table_info,
436                    include_column_options,
437                    wildcard_idx,
438                    webhook_info,
439                    engine,
440                    ..
441                } => {
442                    let format_encode = format_encode.map(|schema| schema.into_v2_with_warning());
443
444                    create_table::handle_create_table(
445                        handler_args,
446                        name,
447                        columns,
448                        wildcard_idx,
449                        constraints,
450                        if_not_exists,
451                        format_encode,
452                        source_watermarks,
453                        append_only,
454                        on_conflict,
455                        with_version_column.map(|x| x.real_value()),
456                        cdc_table_info,
457                        include_column_options,
458                        webhook_info,
459                        engine,
460                    )
461                    .await?;
462                }
463                Statement::CreateSource { stmt } => {
464                    if let Err(error) =
465                        create_source::handle_create_source(handler_args, stmt).await
466                    {
467                        let actual_result = TestCaseResult {
468                            planner_error: Some(error.to_report_string()),
469                            ..Default::default()
470                        };
471
472                        check_result(self, &actual_result)?;
473                        result = Some(actual_result);
474                    }
475                }
476                Statement::CreateIndex {
477                    name,
478                    table_name,
479                    columns,
480                    include,
481                    distributed_by,
482                    if_not_exists,
483                    // TODO: support unique and if_not_exist in planner test
484                    ..
485                } => {
486                    create_index::handle_create_index(
487                        handler_args,
488                        if_not_exists,
489                        name,
490                        table_name,
491                        columns,
492                        include,
493                        distributed_by,
494                    )
495                    .await?;
496                }
497                Statement::CreateView {
498                    materialized: true,
499                    or_replace: false,
500                    if_not_exists,
501                    name,
502                    query,
503                    columns,
504                    emit_mode,
505                    ..
506                } => {
507                    create_mv::handle_create_mv(
508                        handler_args,
509                        if_not_exists,
510                        name,
511                        *query,
512                        columns,
513                        emit_mode,
514                    )
515                    .await?;
516                }
517                Statement::CreateView {
518                    materialized: false,
519                    or_replace: false,
520                    if_not_exists,
521                    name,
522                    query,
523                    columns,
524                    ..
525                } => {
526                    create_view::handle_create_view(
527                        handler_args,
528                        if_not_exists,
529                        name,
530                        columns,
531                        *query,
532                    )
533                    .await?;
534                }
535                Statement::Drop(drop_statement) => {
536                    drop_table::handle_drop_table(
537                        handler_args,
538                        drop_statement.object_name,
539                        drop_statement.if_exists,
540                        matches!(drop_statement.drop_mode, AstOption::Some(DropMode::Cascade)),
541                    )
542                    .await?;
543                }
544                Statement::SetVariable {
545                    local: _,
546                    variable,
547                    value,
548                } => {
549                    variable::handle_set(handler_args, variable, value).unwrap();
550                }
551                Statement::Explain {
552                    analyze,
553                    statement,
554                    options,
555                } => {
556                    if result.is_some() {
557                        panic!("two queries in one test case");
558                    }
559                    let rsp =
560                        explain::handle_explain(handler_args, *statement, options, analyze).await?;
561
562                    let explain_output = get_explain_output(rsp).await;
563                    let ret = TestCaseResult {
564                        explain_output: Some(explain_output),
565                        ..Default::default()
566                    };
567                    if do_check_result {
568                        check_result(self, &ret)?;
569                    }
570                    result = Some(ret);
571                }
572                Statement::CreateSchema {
573                    schema_name,
574                    if_not_exists,
575                    owner,
576                } => {
577                    create_schema::handle_create_schema(
578                        handler_args,
579                        schema_name,
580                        if_not_exists,
581                        owner,
582                    )
583                    .await?;
584                }
585                _ => return Err(anyhow!("Unsupported statement type")),
586            }
587        }
588        Ok(result)
589    }
590
591    fn apply_query(
592        &self,
593        stmt: &Statement,
594        context: OptimizerContextRef,
595    ) -> Result<TestCaseResult> {
596        let session = context.session_ctx().clone();
597        let mut ret = TestCaseResult::default();
598
599        let bound = {
600            let mut binder = Binder::new(&session);
601            match binder.bind(stmt.clone()) {
602                Ok(bound) => bound,
603                Err(err) => {
604                    ret.binder_error = Some(err.to_report_string_pretty());
605                    return Ok(ret);
606                }
607            }
608        };
609
610        let mut planner = Planner::new_for_stream(context.clone());
611
612        let plan_root = match planner.plan(bound) {
613            Ok(plan_root) => {
614                if self.expected_outputs.contains(&TestType::LogicalPlan) {
615                    ret.logical_plan =
616                        Some(explain_plan(&plan_root.clone().into_unordered_subplan()));
617                }
618                plan_root
619            }
620            Err(err) => {
621                ret.planner_error = Some(err.to_report_string_pretty());
622                return Ok(ret);
623            }
624        };
625
626        if self
627            .expected_outputs
628            .contains(&TestType::OptimizedLogicalPlanForBatch)
629            || self.expected_outputs.contains(&TestType::OptimizerError)
630        {
631            let mut plan_root = plan_root.clone();
632            let optimized_logical_plan_for_batch =
633                match plan_root.gen_optimized_logical_plan_for_batch() {
634                    Ok(optimized_logical_plan_for_batch) => optimized_logical_plan_for_batch,
635                    Err(err) => {
636                        ret.optimizer_error = Some(err.to_report_string_pretty());
637                        return Ok(ret);
638                    }
639                };
640
641            // Only generate optimized_logical_plan_for_batch if it is specified in test case
642            if self
643                .expected_outputs
644                .contains(&TestType::OptimizedLogicalPlanForBatch)
645            {
646                ret.optimized_logical_plan_for_batch =
647                    Some(explain_plan(&optimized_logical_plan_for_batch));
648            }
649        }
650
651        if self
652            .expected_outputs
653            .contains(&TestType::OptimizedLogicalPlanForStream)
654            || self.expected_outputs.contains(&TestType::OptimizerError)
655        {
656            let mut plan_root = plan_root.clone();
657            let optimized_logical_plan_for_stream =
658                match plan_root.gen_optimized_logical_plan_for_stream() {
659                    Ok(optimized_logical_plan_for_stream) => optimized_logical_plan_for_stream,
660                    Err(err) => {
661                        ret.optimizer_error = Some(err.to_report_string_pretty());
662                        return Ok(ret);
663                    }
664                };
665
666            // Only generate optimized_logical_plan_for_stream if it is specified in test case
667            if self
668                .expected_outputs
669                .contains(&TestType::OptimizedLogicalPlanForStream)
670            {
671                ret.optimized_logical_plan_for_stream =
672                    Some(explain_plan(&optimized_logical_plan_for_stream));
673            }
674        }
675
676        'batch: {
677            if self.expected_outputs.contains(&TestType::BatchPlan)
678                || self.expected_outputs.contains(&TestType::BatchPlanProto)
679                || self.expected_outputs.contains(&TestType::BatchError)
680            {
681                let mut plan_root = plan_root.clone();
682                let batch_plan = match plan_root.gen_batch_plan() {
683                    Ok(_batch_plan) => match plan_root.gen_batch_distributed_plan() {
684                        Ok(batch_plan) => batch_plan,
685                        Err(err) => {
686                            ret.batch_error = Some(err.to_report_string_pretty());
687                            break 'batch;
688                        }
689                    },
690                    Err(err) => {
691                        ret.batch_error = Some(err.to_report_string_pretty());
692                        break 'batch;
693                    }
694                };
695
696                // Only generate batch_plan if it is specified in test case
697                if self.expected_outputs.contains(&TestType::BatchPlan) {
698                    ret.batch_plan = Some(explain_plan(&batch_plan));
699                }
700
701                // Only generate batch_plan_proto if it is specified in test case
702                if self.expected_outputs.contains(&TestType::BatchPlanProto) {
703                    ret.batch_plan_proto = Some(serde_yaml::to_string(
704                        &batch_plan.to_batch_prost_identity(false)?,
705                    )?);
706                }
707            }
708        }
709
710        'local_batch: {
711            if self.expected_outputs.contains(&TestType::BatchLocalPlan)
712                || self.expected_outputs.contains(&TestType::BatchError)
713            {
714                let mut plan_root = plan_root.clone();
715                let batch_plan = match plan_root.gen_batch_plan() {
716                    Ok(_batch_plan) => match plan_root.gen_batch_local_plan() {
717                        Ok(batch_plan) => batch_plan,
718                        Err(err) => {
719                            ret.batch_error = Some(err.to_report_string_pretty());
720                            break 'local_batch;
721                        }
722                    },
723                    Err(err) => {
724                        ret.batch_error = Some(err.to_report_string_pretty());
725                        break 'local_batch;
726                    }
727                };
728
729                // Only generate batch_plan if it is specified in test case
730                if self.expected_outputs.contains(&TestType::BatchLocalPlan) {
731                    ret.batch_local_plan = Some(explain_plan(&batch_plan));
732                }
733            }
734        }
735
736        'distributed_batch: {
737            if self
738                .expected_outputs
739                .contains(&TestType::BatchDistributedPlan)
740                || self.expected_outputs.contains(&TestType::BatchError)
741            {
742                let mut plan_root = plan_root.clone();
743                let batch_plan = match plan_root.gen_batch_plan() {
744                    Ok(_batch_plan) => match plan_root.gen_batch_distributed_plan() {
745                        Ok(batch_plan) => batch_plan,
746                        Err(err) => {
747                            ret.batch_error = Some(err.to_report_string_pretty());
748                            break 'distributed_batch;
749                        }
750                    },
751                    Err(err) => {
752                        ret.batch_error = Some(err.to_report_string_pretty());
753                        break 'distributed_batch;
754                    }
755                };
756
757                // Only generate batch_plan if it is specified in test case
758                if self
759                    .expected_outputs
760                    .contains(&TestType::BatchDistributedPlan)
761                {
762                    ret.batch_distributed_plan = Some(explain_plan(&batch_plan));
763                }
764            }
765        }
766
767        {
768            // stream
769            for (
770                emit_mode,
771                plan,
772                ret_plan_str,
773                dist_plan,
774                ret_dist_plan_str,
775                error,
776                ret_error_str,
777            ) in [
778                (
779                    EmitMode::Immediately,
780                    self.expected_outputs.contains(&TestType::StreamPlan),
781                    &mut ret.stream_plan,
782                    self.expected_outputs.contains(&TestType::StreamDistPlan),
783                    &mut ret.stream_dist_plan,
784                    self.expected_outputs.contains(&TestType::StreamError),
785                    &mut ret.stream_error,
786                ),
787                (
788                    EmitMode::OnWindowClose,
789                    self.expected_outputs.contains(&TestType::EowcStreamPlan),
790                    &mut ret.eowc_stream_plan,
791                    self.expected_outputs
792                        .contains(&TestType::EowcStreamDistPlan),
793                    &mut ret.eowc_stream_dist_plan,
794                    self.expected_outputs.contains(&TestType::EowcStreamError),
795                    &mut ret.eowc_stream_error,
796                ),
797            ] {
798                if !plan && !dist_plan && !error {
799                    continue;
800                }
801
802                let q = if let Statement::Query(q) = stmt {
803                    q.as_ref().clone()
804                } else {
805                    return Err(anyhow!("expect a query"));
806                };
807
808                let stream_plan = match create_mv::gen_create_mv_plan(
809                    &session,
810                    context.clone(),
811                    q,
812                    ObjectName(vec!["test".into()]),
813                    vec![],
814                    Some(emit_mode),
815                ) {
816                    Ok((stream_plan, _)) => stream_plan,
817                    Err(err) => {
818                        *ret_error_str = Some(err.to_report_string_pretty());
819                        continue;
820                    }
821                };
822
823                // Only generate stream_plan if it is specified in test case
824                if plan {
825                    *ret_plan_str = Some(explain_plan(&stream_plan));
826                }
827
828                // Only generate stream_dist_plan if it is specified in test case
829                if dist_plan {
830                    let graph = build_graph(stream_plan)?;
831                    *ret_dist_plan_str = Some(explain_stream_graph(&graph, false));
832                }
833            }
834        }
835
836        'sink: {
837            if self.expected_outputs.contains(&TestType::SinkPlan) {
838                let mut plan_root = plan_root.clone();
839                let sink_name = "sink_test";
840                let mut options = BTreeMap::new();
841                options.insert("connector".to_owned(), "blackhole".to_owned());
842                options.insert("type".to_owned(), "append-only".to_owned());
843                // let options = WithOptionsSecResolved::without_secrets(options);
844                let options = WithOptionsSecResolved::without_secrets(options);
845                let format_desc = (&options).try_into().unwrap();
846                match plan_root.gen_sink_plan(
847                    sink_name.to_owned(),
848                    format!("CREATE SINK {sink_name} AS {}", stmt),
849                    options,
850                    false,
851                    "test_db".into(),
852                    "test_table".into(),
853                    format_desc,
854                    false,
855                    None,
856                    None,
857                    false,
858                ) {
859                    Ok(sink_plan) => {
860                        ret.sink_plan = Some(explain_plan(&sink_plan.into()));
861                        break 'sink;
862                    }
863                    Err(err) => {
864                        ret.sink_error = Some(err.to_report_string_pretty());
865                        break 'sink;
866                    }
867                }
868            }
869        }
870
871        Ok(ret)
872    }
873}
874
875fn explain_plan(plan: &PlanRef) -> String {
876    plan.explain_to_string()
877}
878
879/// Checks that the result matches `test_case.expected_outputs`.
880///
881/// We don't check the result matches here.
882fn check_result(test_case: &TestCase, actual: &TestCaseResult) -> Result<()> {
883    macro_rules! check {
884        ($field:ident) => {
885            paste::paste! {
886                let case_contains = test_case.expected_outputs.contains(&TestType:: [< $field:camel >]  );
887                let actual_contains = &actual.$field;
888                match (case_contains, actual_contains) {
889                    (false, None) | (true, Some(_)) => {},
890                    (false, Some(e)) => return Err(anyhow!("unexpected {}: {}", stringify!($field), e)),
891                    (true, None) => return Err(anyhow!(
892                        "expected {}, but there's no such result during execution",
893                        stringify!($field)
894                    )),
895                }
896            }
897        };
898    }
899
900    check!(binder_error);
901    check!(planner_error);
902    check!(optimizer_error);
903    check!(batch_error);
904    check!(batch_local_error);
905    check!(stream_error);
906    check!(eowc_stream_error);
907
908    check!(logical_plan);
909    check!(optimized_logical_plan_for_batch);
910    check!(optimized_logical_plan_for_stream);
911    check!(batch_plan);
912    check!(batch_local_plan);
913    check!(stream_plan);
914    check!(stream_dist_plan);
915    check!(eowc_stream_plan);
916    check!(eowc_stream_dist_plan);
917    check!(batch_plan_proto);
918    check!(sink_plan);
919
920    check!(explain_output);
921
922    Ok(())
923}
924
925/// `/tests/testdata` directory.
926pub fn test_data_dir() -> PathBuf {
927    std::path::Path::new(env!("CARGO_MANIFEST_DIR"))
928        .join("tests")
929        .join("testdata")
930}
931
932pub async fn run_test_file(file_path: &Path, file_content: &str) -> Result<()> {
933    let file_name = file_path.file_name().unwrap().to_str().unwrap();
934    println!("-- running {file_name} --");
935
936    let mut failed_num = 0;
937    let cases: Vec<TestCase> = serde_yaml::from_str(file_content).map_err(|e| {
938        let context = if let Some(loc) = e.location() {
939            format!(
940                "failed to parse yaml at {}:{}:{}",
941                file_path.display(),
942                loc.line(),
943                loc.column()
944            )
945        } else {
946            "failed to parse yaml".to_owned()
947        };
948        anyhow::anyhow!(e).context(context)
949    })?;
950    let cases = resolve_testcase_id(cases).expect("failed to resolve");
951    let mut outputs = vec![];
952
953    for (i, c) in cases.into_iter().enumerate() {
954        println!(
955            "Running test #{i} (id: {}), SQL:\n{}",
956            c.id().clone().unwrap_or_else(|| "<none>".to_owned()),
957            c.sql()
958        );
959        match c.run(true).await {
960            Ok(case) => {
961                outputs.push(case);
962            }
963            Err(e) => {
964                eprintln!(
965                    "Test #{i} (id: {}) failed, SQL:\n{}\nError: {}",
966                    c.id().clone().unwrap_or_else(|| "<none>".to_owned()),
967                    c.sql(),
968                    e.as_report()
969                );
970                failed_num += 1;
971            }
972        }
973    }
974
975    let output_path = test_data_dir().join("output").join(file_name);
976    check(outputs, expect_test::expect_file![output_path]);
977
978    if failed_num > 0 {
979        println!("\n");
980        bail!(format!("{} test cases failed", failed_num));
981    }
982    Ok(())
983}