risingwave_planner_test/
lib.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![feature(let_chains)]
16#![allow(clippy::derive_partial_eq_without_eq)]
17
18//! Data-driven tests.
19
20risingwave_expr_impl::enable!();
21
22mod resolve_id;
23
24use std::collections::{BTreeMap, HashSet};
25use std::path::{Path, PathBuf};
26use std::sync::Arc;
27
28use anyhow::{Result, anyhow, bail};
29pub use resolve_id::*;
30use risingwave_frontend::handler::util::SourceSchemaCompatExt;
31use risingwave_frontend::handler::{
32    HandlerArgs, create_index, create_mv, create_schema, create_source, create_table, create_view,
33    drop_table, explain, variable,
34};
35use risingwave_frontend::optimizer::backfill_order_strategy::explain_backfill_order_in_dot_format;
36use risingwave_frontend::session::SessionImpl;
37use risingwave_frontend::test_utils::{LocalFrontend, create_proto_file, get_explain_output};
38use risingwave_frontend::{
39    Binder, Explain, FrontendOpts, OptimizerContext, OptimizerContextRef, PlanRef, Planner,
40    WithOptionsSecResolved, build_graph, explain_stream_graph,
41};
42use risingwave_sqlparser::ast::{
43    AstOption, BackfillOrderStrategy, DropMode, EmitMode, ExplainOptions, ObjectName, Statement,
44};
45use risingwave_sqlparser::parser::Parser;
46use serde::{Deserialize, Serialize};
47use thiserror_ext::AsReport;
48
49#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Hash, Eq)]
50#[serde(deny_unknown_fields, rename_all = "snake_case")]
51pub enum TestType {
52    /// The result of an `EXPLAIN` statement.
53    ///
54    /// This field is used when `sql` is an `EXPLAIN` statement.
55    /// In this case, all other fields are invalid.
56    ExplainOutput,
57
58    /// The original logical plan
59    LogicalPlan,
60    /// Logical plan with optimization `.gen_optimized_logical_plan_for_batch()`
61    OptimizedLogicalPlanForBatch,
62    /// Logical plan with optimization `.gen_optimized_logical_plan_for_stream()`
63    OptimizedLogicalPlanForStream,
64
65    /// Distributed batch plan `.gen_batch_query_plan()`
66    BatchPlan,
67    /// Proto JSON of generated batch plan
68    BatchPlanProto,
69    /// Batch plan for local execution `.gen_batch_local_plan()`
70    BatchLocalPlan,
71    /// Batch plan for local execution `.gen_batch_distributed_plan()`
72    BatchDistributedPlan,
73
74    /// Create MV plan `.gen_create_mv_plan()`
75    StreamPlan,
76    /// Create MV fragments plan
77    StreamDistPlan,
78    /// Create MV plan with EOWC semantics `.gen_create_mv_plan(.., EmitMode::OnWindowClose)`
79    EowcStreamPlan,
80    /// Create MV fragments plan with EOWC semantics
81    EowcStreamDistPlan,
82    /// Create Backfill Order Plan
83    BackfillOrderPlan,
84
85    /// Create sink plan (assumes blackhole sink)
86    /// TODO: Other sinks
87    SinkPlan,
88
89    BinderError,
90    PlannerError,
91    OptimizerError,
92    BatchError,
93    BatchLocalError,
94    StreamError,
95    EowcStreamError,
96}
97
98pub fn check(actual: Vec<TestCaseResult>, expect: expect_test::ExpectFile) {
99    let actual = serde_yaml::to_string(&actual).unwrap();
100    expect.assert_eq(&format!("# This file is automatically generated. See `src/frontend/planner_test/README.md` for more information.\n{}",actual));
101}
102
103#[serde_with::skip_serializing_none]
104#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Default)]
105#[serde(deny_unknown_fields)]
106pub struct TestInput {
107    /// Id of the test case, used in before.
108    pub id: Option<String>,
109    /// A brief description of the test case.
110    pub name: Option<String>,
111    /// Before running the SQL statements, the test runner will execute the specified test cases
112    pub before: Option<Vec<String>>,
113    /// The resolved statements of the before ids
114    #[serde(skip_serializing)]
115    before_statements: Option<Vec<String>>,
116    /// The SQL statements
117    pub sql: String,
118}
119
120#[serde_with::skip_serializing_none]
121#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Default)]
122#[serde(deny_unknown_fields)]
123pub struct TestCase {
124    #[serde(flatten)]
125    pub input: TestInput,
126
127    // TODO: these should also be in TestInput, but it affects ordering. So next PR
128    /// Support using file content or file location to create source.
129    pub create_source: Option<CreateConnector>,
130    /// Support using file content or file location to create table with connector.
131    pub create_table_with_connector: Option<CreateConnector>,
132    /// Provide config map to frontend
133    pub with_config_map: Option<BTreeMap<String, String>>,
134
135    /// Specify what output fields to check
136    pub expected_outputs: HashSet<TestType>,
137}
138
139impl TestCase {
140    pub fn id(&self) -> &Option<String> {
141        &self.input.id
142    }
143
144    pub fn name(&self) -> &Option<String> {
145        &self.input.name
146    }
147
148    pub fn before(&self) -> &Option<Vec<String>> {
149        &self.input.before
150    }
151
152    pub fn before_statements(&self) -> &Option<Vec<String>> {
153        &self.input.before_statements
154    }
155
156    pub fn sql(&self) -> &String {
157        &self.input.sql
158    }
159
160    pub fn create_source(&self) -> &Option<CreateConnector> {
161        &self.create_source
162    }
163
164    pub fn create_table_with_connector(&self) -> &Option<CreateConnector> {
165        &self.create_table_with_connector
166    }
167
168    pub fn with_config_map(&self) -> &Option<BTreeMap<String, String>> {
169        &self.with_config_map
170    }
171}
172
173#[serde_with::skip_serializing_none]
174#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Default)]
175#[serde(deny_unknown_fields)]
176pub struct CreateConnector {
177    format: String,
178    encode: String,
179    name: String,
180    file: Option<String>,
181    is_table: Option<bool>,
182}
183
184#[serde_with::skip_serializing_none]
185#[derive(Debug, PartialEq, Serialize, Deserialize, Default)]
186#[serde(deny_unknown_fields)]
187pub struct TestCaseResult {
188    #[serde(flatten)]
189    pub input: TestInput,
190
191    /// The original logical plan
192    pub logical_plan: Option<String>,
193
194    /// Logical plan with optimization `.gen_optimized_logical_plan_for_batch()`
195    pub optimized_logical_plan_for_batch: Option<String>,
196
197    /// Logical plan with optimization `.gen_optimized_logical_plan_for_stream()`
198    pub optimized_logical_plan_for_stream: Option<String>,
199
200    /// Distributed batch plan `.gen_batch_query_plan()`
201    pub batch_plan: Option<String>,
202
203    /// Proto JSON of generated batch plan
204    pub batch_plan_proto: Option<String>,
205
206    /// Batch plan for local execution `.gen_batch_local_plan()`
207    pub batch_local_plan: Option<String>,
208
209    /// Batch plan for distributed execution `.gen_batch_distributed_plan()`
210    pub batch_distributed_plan: Option<String>,
211
212    /// Generate sink plan
213    pub sink_plan: Option<String>,
214
215    /// Create MV plan `.gen_create_mv_plan()`
216    pub stream_plan: Option<String>,
217
218    /// Create MV fragments plan
219    pub stream_dist_plan: Option<String>,
220
221    /// Create MV plan with EOWC semantics `.gen_create_mv_plan(.., EmitMode::OnWindowClose)`
222    pub eowc_stream_plan: Option<String>,
223
224    /// Create MV fragments plan with EOWC semantics
225    pub eowc_stream_dist_plan: Option<String>,
226
227    /// Create Backfill Order Plan
228    pub backfill_order_plan: Option<String>,
229
230    /// Error of binder
231    pub binder_error: Option<String>,
232
233    /// Error of planner
234    pub planner_error: Option<String>,
235
236    /// Error of optimizer
237    pub optimizer_error: Option<String>,
238
239    /// Error of `.gen_batch_query_plan()`
240    pub batch_error: Option<String>,
241
242    /// Error of `.gen_batch_local_plan()`
243    pub batch_local_error: Option<String>,
244
245    /// Error of `.gen_stream_plan()`
246    pub stream_error: Option<String>,
247
248    /// Error of `.gen_stream_plan()` with `emit_on_window_close = true`
249    pub eowc_stream_error: Option<String>,
250
251    /// Error of `.gen_sink_plan()`
252    pub sink_error: Option<String>,
253
254    /// The result of an `EXPLAIN` statement.
255    ///
256    /// This field is used when `sql` is an `EXPLAIN` statement.
257    /// In this case, all other fields are invalid.
258    pub explain_output: Option<String>,
259
260    // TODO: these should also be in TestInput, but it affects ordering. So next PR
261    /// Support using file content or file location to create source.
262    pub create_source: Option<CreateConnector>,
263    /// Support using file content or file location to create table with connector.
264    pub create_table_with_connector: Option<CreateConnector>,
265    /// Provide config map to frontend
266    pub with_config_map: Option<BTreeMap<String, String>>,
267}
268
269impl TestCase {
270    /// Run the test case, and return the expected output.
271    pub async fn run(&self, do_check_result: bool) -> Result<TestCaseResult> {
272        let session = {
273            let frontend = LocalFrontend::new(FrontendOpts::default()).await;
274            frontend.session_ref()
275        };
276
277        if let Some(config_map) = self.with_config_map() {
278            for (key, val) in config_map {
279                session.set_config(key, val.to_owned()).unwrap();
280            }
281        }
282
283        let placeholder_empty_vec = vec![];
284
285        // Since temp file will be deleted when it goes out of scope, so create source in advance.
286        self.do_create_source(session.clone()).await?;
287        self.do_create_table_with_connector(session.clone()).await?;
288
289        let mut result: Option<TestCaseResult> = None;
290        for sql in self
291            .before_statements()
292            .as_ref()
293            .unwrap_or(&placeholder_empty_vec)
294            .iter()
295            .chain(std::iter::once(self.sql()))
296        {
297            result = self
298                .run_sql(
299                    Arc::from(sql.to_owned()),
300                    session.clone(),
301                    do_check_result,
302                    result,
303                )
304                .await?;
305        }
306
307        let mut result = result.unwrap_or_default();
308        result.input = self.input.clone();
309        result.create_source.clone_from(self.create_source());
310        result
311            .create_table_with_connector
312            .clone_from(self.create_table_with_connector());
313        result.with_config_map.clone_from(self.with_config_map());
314
315        Ok(result)
316    }
317
318    #[inline(always)]
319    fn create_connector_sql(
320        is_table: bool,
321        connector_name: String,
322        connector_format: String,
323        connector_encode: String,
324    ) -> String {
325        let object_to_create = if is_table { "TABLE" } else { "SOURCE" };
326        format!(
327            r#"CREATE {} {}
328    WITH (connector = 'kafka', kafka.topic = 'abc', kafka.brokers = 'localhost:1001')
329    FORMAT {} ENCODE {} (message = '.test.TestRecord', schema.location = 'file://"#,
330            object_to_create, connector_name, connector_format, connector_encode
331        )
332    }
333
334    async fn do_create_table_with_connector(
335        &self,
336        session: Arc<SessionImpl>,
337    ) -> Result<Option<TestCaseResult>> {
338        match self.create_table_with_connector().clone() {
339            Some(connector) => {
340                if let Some(content) = connector.file {
341                    let sql = Self::create_connector_sql(
342                        true,
343                        connector.name,
344                        connector.format,
345                        connector.encode,
346                    );
347                    let temp_file = create_proto_file(content.as_str());
348                    self.run_sql(
349                        Arc::from(sql + temp_file.path().to_str().unwrap() + "')"),
350                        session.clone(),
351                        false,
352                        None,
353                    )
354                    .await
355                } else {
356                    panic!(
357                        "{:?} create table with connector must include `file` for the file content",
358                        self.id()
359                    );
360                }
361            }
362            None => Ok(None),
363        }
364    }
365
366    // If testcase have create source info, run sql to create source.
367    // Support create source by file content or file location.
368    async fn do_create_source(&self, session: Arc<SessionImpl>) -> Result<Option<TestCaseResult>> {
369        match self.create_source().clone() {
370            Some(source) => {
371                if let Some(content) = source.file {
372                    let sql = Self::create_connector_sql(
373                        false,
374                        source.name,
375                        source.format,
376                        source.encode,
377                    );
378                    let temp_file = create_proto_file(content.as_str());
379                    self.run_sql(
380                        Arc::from(sql + temp_file.path().to_str().unwrap() + "')"),
381                        session.clone(),
382                        false,
383                        None,
384                    )
385                    .await
386                } else {
387                    panic!(
388                        "{:?} create source must include `file` for the file content",
389                        self.id()
390                    );
391                }
392            }
393            None => Ok(None),
394        }
395    }
396
397    async fn run_sql(
398        &self,
399        sql: Arc<str>,
400        session: Arc<SessionImpl>,
401        do_check_result: bool,
402        mut result: Option<TestCaseResult>,
403    ) -> Result<Option<TestCaseResult>> {
404        let statements = Parser::parse_sql(&sql).unwrap();
405        for stmt in statements {
406            // TODO: `sql` may contain multiple statements here.
407            let handler_args = HandlerArgs::new(session.clone(), &stmt, sql.clone())?;
408            let _guard = session.txn_begin_implicit();
409            match stmt.clone() {
410                Statement::Query(_)
411                | Statement::Insert { .. }
412                | Statement::Delete { .. }
413                | Statement::Update { .. } => {
414                    if result.is_some() {
415                        panic!("two queries in one test case");
416                    }
417                    let explain_options = ExplainOptions {
418                        verbose: true,
419                        ..Default::default()
420                    };
421                    let context = OptimizerContext::new(
422                        HandlerArgs::new(session.clone(), &stmt, sql.clone())?,
423                        explain_options,
424                    );
425                    let ret = self.apply_query(&stmt, context.into())?;
426                    if do_check_result {
427                        check_result(self, &ret)?;
428                    }
429                    result = Some(ret);
430                }
431                Statement::CreateTable {
432                    name,
433                    columns,
434                    constraints,
435                    if_not_exists,
436                    format_encode,
437                    source_watermarks,
438                    append_only,
439                    on_conflict,
440                    with_version_column,
441                    cdc_table_info,
442                    include_column_options,
443                    wildcard_idx,
444                    webhook_info,
445                    engine,
446                    ..
447                } => {
448                    let format_encode = format_encode.map(|schema| schema.into_v2_with_warning());
449
450                    create_table::handle_create_table(
451                        handler_args,
452                        name,
453                        columns,
454                        wildcard_idx,
455                        constraints,
456                        if_not_exists,
457                        format_encode,
458                        source_watermarks,
459                        append_only,
460                        on_conflict,
461                        with_version_column.map(|x| x.real_value()),
462                        cdc_table_info,
463                        include_column_options,
464                        webhook_info,
465                        engine,
466                    )
467                    .await?;
468                }
469                Statement::CreateSource { stmt } => {
470                    if let Err(error) =
471                        create_source::handle_create_source(handler_args, stmt).await
472                    {
473                        let actual_result = TestCaseResult {
474                            planner_error: Some(error.to_report_string()),
475                            ..Default::default()
476                        };
477
478                        check_result(self, &actual_result)?;
479                        result = Some(actual_result);
480                    }
481                }
482                Statement::CreateIndex {
483                    name,
484                    table_name,
485                    columns,
486                    include,
487                    distributed_by,
488                    if_not_exists,
489                    // TODO: support unique and if_not_exist in planner test
490                    ..
491                } => {
492                    create_index::handle_create_index(
493                        handler_args,
494                        if_not_exists,
495                        name,
496                        table_name,
497                        columns,
498                        include,
499                        distributed_by,
500                    )
501                    .await?;
502                }
503                Statement::CreateView {
504                    materialized: true,
505                    or_replace: false,
506                    if_not_exists,
507                    name,
508                    query,
509                    columns,
510                    emit_mode,
511                    ..
512                } => {
513                    create_mv::handle_create_mv(
514                        handler_args,
515                        if_not_exists,
516                        name,
517                        *query,
518                        columns,
519                        emit_mode,
520                    )
521                    .await?;
522                }
523                Statement::CreateView {
524                    materialized: false,
525                    or_replace: false,
526                    if_not_exists,
527                    name,
528                    query,
529                    columns,
530                    ..
531                } => {
532                    create_view::handle_create_view(
533                        handler_args,
534                        if_not_exists,
535                        name,
536                        columns,
537                        *query,
538                    )
539                    .await?;
540                }
541                Statement::Drop(drop_statement) => {
542                    drop_table::handle_drop_table(
543                        handler_args,
544                        drop_statement.object_name,
545                        drop_statement.if_exists,
546                        matches!(drop_statement.drop_mode, AstOption::Some(DropMode::Cascade)),
547                    )
548                    .await?;
549                }
550                Statement::SetVariable {
551                    local: _,
552                    variable,
553                    value,
554                } => {
555                    variable::handle_set(handler_args, variable, value).unwrap();
556                }
557                Statement::Explain {
558                    analyze,
559                    statement,
560                    options,
561                } => {
562                    if result.is_some() {
563                        panic!("two queries in one test case");
564                    }
565                    let rsp =
566                        explain::handle_explain(handler_args, *statement, options, analyze).await?;
567
568                    let explain_output = get_explain_output(rsp).await;
569                    let ret = TestCaseResult {
570                        explain_output: Some(explain_output),
571                        ..Default::default()
572                    };
573                    if do_check_result {
574                        check_result(self, &ret)?;
575                    }
576                    result = Some(ret);
577                }
578                Statement::CreateSchema {
579                    schema_name,
580                    if_not_exists,
581                    owner,
582                } => {
583                    create_schema::handle_create_schema(
584                        handler_args,
585                        schema_name,
586                        if_not_exists,
587                        owner,
588                    )
589                    .await?;
590                }
591                _ => return Err(anyhow!("Unsupported statement type")),
592            }
593        }
594        Ok(result)
595    }
596
597    fn apply_query(
598        &self,
599        stmt: &Statement,
600        context: OptimizerContextRef,
601    ) -> Result<TestCaseResult> {
602        let session = context.session_ctx().clone();
603        let mut ret = TestCaseResult::default();
604
605        let bound = {
606            let mut binder = Binder::new(&session);
607            match binder.bind(stmt.clone()) {
608                Ok(bound) => bound,
609                Err(err) => {
610                    ret.binder_error = Some(err.to_report_string_pretty());
611                    return Ok(ret);
612                }
613            }
614        };
615
616        let mut planner = Planner::new_for_stream(context.clone());
617
618        let plan_root = match planner.plan(bound) {
619            Ok(plan_root) => {
620                if self.expected_outputs.contains(&TestType::LogicalPlan) {
621                    ret.logical_plan =
622                        Some(explain_plan(&plan_root.clone().into_unordered_subplan()));
623                }
624                plan_root
625            }
626            Err(err) => {
627                ret.planner_error = Some(err.to_report_string_pretty());
628                return Ok(ret);
629            }
630        };
631
632        if self
633            .expected_outputs
634            .contains(&TestType::OptimizedLogicalPlanForBatch)
635            || self.expected_outputs.contains(&TestType::OptimizerError)
636        {
637            let plan_root = plan_root.clone();
638            let optimized_logical_plan_for_batch =
639                match plan_root.gen_optimized_logical_plan_for_batch() {
640                    Ok(optimized_logical_plan_for_batch) => optimized_logical_plan_for_batch,
641                    Err(err) => {
642                        ret.optimizer_error = Some(err.to_report_string_pretty());
643                        return Ok(ret);
644                    }
645                };
646
647            // Only generate optimized_logical_plan_for_batch if it is specified in test case
648            if self
649                .expected_outputs
650                .contains(&TestType::OptimizedLogicalPlanForBatch)
651            {
652                ret.optimized_logical_plan_for_batch =
653                    Some(explain_plan(&optimized_logical_plan_for_batch.plan));
654            }
655        }
656
657        if self
658            .expected_outputs
659            .contains(&TestType::OptimizedLogicalPlanForStream)
660            || self.expected_outputs.contains(&TestType::OptimizerError)
661        {
662            let plan_root = plan_root.clone();
663            let optimized_logical_plan_for_stream =
664                match plan_root.gen_optimized_logical_plan_for_stream() {
665                    Ok(optimized_logical_plan_for_stream) => optimized_logical_plan_for_stream,
666                    Err(err) => {
667                        ret.optimizer_error = Some(err.to_report_string_pretty());
668                        return Ok(ret);
669                    }
670                };
671
672            // Only generate optimized_logical_plan_for_stream if it is specified in test case
673            if self
674                .expected_outputs
675                .contains(&TestType::OptimizedLogicalPlanForStream)
676            {
677                ret.optimized_logical_plan_for_stream =
678                    Some(explain_plan(&optimized_logical_plan_for_stream.plan));
679            }
680        }
681
682        'batch: {
683            if self.expected_outputs.contains(&TestType::BatchPlan)
684                || self.expected_outputs.contains(&TestType::BatchPlanProto)
685                || self.expected_outputs.contains(&TestType::BatchError)
686            {
687                let plan_root = plan_root.clone();
688                let batch_plan = match plan_root.gen_batch_plan() {
689                    Ok(batch_plan) => match batch_plan.gen_batch_distributed_plan() {
690                        Ok(batch_plan) => batch_plan,
691                        Err(err) => {
692                            ret.batch_error = Some(err.to_report_string_pretty());
693                            break 'batch;
694                        }
695                    },
696                    Err(err) => {
697                        ret.batch_error = Some(err.to_report_string_pretty());
698                        break 'batch;
699                    }
700                };
701
702                // Only generate batch_plan if it is specified in test case
703                if self.expected_outputs.contains(&TestType::BatchPlan) {
704                    ret.batch_plan = Some(explain_plan(&batch_plan));
705                }
706
707                // Only generate batch_plan_proto if it is specified in test case
708                if self.expected_outputs.contains(&TestType::BatchPlanProto) {
709                    ret.batch_plan_proto = Some(serde_yaml::to_string(
710                        &batch_plan.to_batch_prost_identity(false)?,
711                    )?);
712                }
713            }
714        }
715
716        'local_batch: {
717            if self.expected_outputs.contains(&TestType::BatchLocalPlan)
718                || self.expected_outputs.contains(&TestType::BatchError)
719            {
720                let plan_root = plan_root.clone();
721                let batch_plan = match plan_root.gen_batch_plan() {
722                    Ok(batch_plan) => match batch_plan.gen_batch_local_plan() {
723                        Ok(batch_plan) => batch_plan,
724                        Err(err) => {
725                            ret.batch_error = Some(err.to_report_string_pretty());
726                            break 'local_batch;
727                        }
728                    },
729                    Err(err) => {
730                        ret.batch_error = Some(err.to_report_string_pretty());
731                        break 'local_batch;
732                    }
733                };
734
735                // Only generate batch_plan if it is specified in test case
736                if self.expected_outputs.contains(&TestType::BatchLocalPlan) {
737                    ret.batch_local_plan = Some(explain_plan(&batch_plan));
738                }
739            }
740        }
741
742        'distributed_batch: {
743            if self
744                .expected_outputs
745                .contains(&TestType::BatchDistributedPlan)
746                || self.expected_outputs.contains(&TestType::BatchError)
747            {
748                let plan_root = plan_root.clone();
749                let batch_plan = match plan_root.gen_batch_plan() {
750                    Ok(batch_plan) => match batch_plan.gen_batch_distributed_plan() {
751                        Ok(batch_plan) => batch_plan,
752                        Err(err) => {
753                            ret.batch_error = Some(err.to_report_string_pretty());
754                            break 'distributed_batch;
755                        }
756                    },
757                    Err(err) => {
758                        ret.batch_error = Some(err.to_report_string_pretty());
759                        break 'distributed_batch;
760                    }
761                };
762
763                // Only generate batch_plan if it is specified in test case
764                if self
765                    .expected_outputs
766                    .contains(&TestType::BatchDistributedPlan)
767                {
768                    ret.batch_distributed_plan = Some(explain_plan(&batch_plan));
769                }
770            }
771        }
772
773        {
774            // stream
775            for (
776                emit_mode,
777                plan,
778                ret_plan_str,
779                dist_plan,
780                ret_dist_plan_str,
781                error,
782                ret_error_str,
783            ) in [
784                (
785                    EmitMode::Immediately,
786                    self.expected_outputs.contains(&TestType::StreamPlan),
787                    &mut ret.stream_plan,
788                    self.expected_outputs.contains(&TestType::StreamDistPlan),
789                    &mut ret.stream_dist_plan,
790                    self.expected_outputs.contains(&TestType::StreamError),
791                    &mut ret.stream_error,
792                ),
793                (
794                    EmitMode::OnWindowClose,
795                    self.expected_outputs.contains(&TestType::EowcStreamPlan),
796                    &mut ret.eowc_stream_plan,
797                    self.expected_outputs
798                        .contains(&TestType::EowcStreamDistPlan),
799                    &mut ret.eowc_stream_dist_plan,
800                    self.expected_outputs.contains(&TestType::EowcStreamError),
801                    &mut ret.eowc_stream_error,
802                ),
803            ] {
804                if !plan && !dist_plan && !error {
805                    continue;
806                }
807
808                let q = if let Statement::Query(q) = stmt {
809                    q.as_ref().clone()
810                } else {
811                    return Err(anyhow!("expect a query"));
812                };
813
814                let (stream_plan, table) = match create_mv::gen_create_mv_plan(
815                    &session,
816                    context.clone(),
817                    q,
818                    ObjectName(vec!["test".into()]),
819                    vec![],
820                    Some(emit_mode),
821                ) {
822                    Ok(r) => r,
823                    Err(err) => {
824                        *ret_error_str = Some(err.to_report_string_pretty());
825                        continue;
826                    }
827                };
828
829                // Only generate stream_plan if it is specified in test case
830                if plan {
831                    *ret_plan_str = Some(explain_plan(&stream_plan));
832                }
833
834                // Only generate stream_dist_plan if it is specified in test case
835                if dist_plan {
836                    let graph = build_graph(stream_plan.clone(), None)?;
837                    *ret_dist_plan_str =
838                        Some(explain_stream_graph(&graph, Some(table.to_prost()), false));
839                }
840
841                if self.expected_outputs.contains(&TestType::BackfillOrderPlan) {
842                    match explain_backfill_order_in_dot_format(
843                        &session,
844                        BackfillOrderStrategy::Auto,
845                        stream_plan,
846                    ) {
847                        Ok(formatted_order_plan) => {
848                            ret.backfill_order_plan = Some(formatted_order_plan);
849                        }
850                        Err(err) => {
851                            *ret_error_str = Some(err.to_report_string_pretty());
852                        }
853                    }
854                }
855            }
856        }
857
858        'sink: {
859            if self.expected_outputs.contains(&TestType::SinkPlan) {
860                let plan_root = plan_root.clone();
861                let sink_name = "sink_test";
862                let mut options = BTreeMap::new();
863                options.insert("connector".to_owned(), "blackhole".to_owned());
864                options.insert("type".to_owned(), "append-only".to_owned());
865                // let options = WithOptionsSecResolved::without_secrets(options);
866                let options = WithOptionsSecResolved::without_secrets(options);
867                let format_desc = (&options).try_into().unwrap();
868                match plan_root.gen_sink_plan(
869                    sink_name.to_owned(),
870                    format!("CREATE SINK {sink_name} AS {}", stmt),
871                    options,
872                    false,
873                    "test_db".into(),
874                    "test_table".into(),
875                    format_desc,
876                    false,
877                    None,
878                    None,
879                    false,
880                ) {
881                    Ok(sink_plan) => {
882                        ret.sink_plan = Some(explain_plan(&sink_plan.into()));
883                        break 'sink;
884                    }
885                    Err(err) => {
886                        ret.sink_error = Some(err.to_report_string_pretty());
887                        break 'sink;
888                    }
889                }
890            }
891        }
892
893        Ok(ret)
894    }
895}
896
897fn explain_plan(plan: &PlanRef) -> String {
898    plan.explain_to_string()
899}
900
901/// Checks that the result matches `test_case.expected_outputs`.
902///
903/// We don't check the result matches here.
904fn check_result(test_case: &TestCase, actual: &TestCaseResult) -> Result<()> {
905    macro_rules! check {
906        ($field:ident) => {
907            paste::paste! {
908                let case_contains = test_case.expected_outputs.contains(&TestType:: [< $field:camel >]  );
909                let actual_contains = &actual.$field;
910                match (case_contains, actual_contains) {
911                    (false, None) | (true, Some(_)) => {},
912                    (false, Some(e)) => return Err(anyhow!("unexpected {}: {}", stringify!($field), e)),
913                    (true, None) => return Err(anyhow!(
914                        "expected {}, but there's no such result during execution",
915                        stringify!($field)
916                    )),
917                }
918            }
919        };
920    }
921
922    check!(binder_error);
923    check!(planner_error);
924    check!(optimizer_error);
925    check!(batch_error);
926    check!(batch_local_error);
927    check!(stream_error);
928    check!(eowc_stream_error);
929
930    check!(logical_plan);
931    check!(optimized_logical_plan_for_batch);
932    check!(optimized_logical_plan_for_stream);
933    check!(batch_plan);
934    check!(batch_local_plan);
935    check!(stream_plan);
936    check!(stream_dist_plan);
937    check!(eowc_stream_plan);
938    check!(eowc_stream_dist_plan);
939    check!(batch_plan_proto);
940    check!(sink_plan);
941
942    check!(explain_output);
943
944    Ok(())
945}
946
947/// `/tests/testdata` directory.
948pub fn test_data_dir() -> PathBuf {
949    std::path::Path::new(env!("CARGO_MANIFEST_DIR"))
950        .join("tests")
951        .join("testdata")
952}
953
954pub async fn run_test_file(file_path: &Path, file_content: &str) -> Result<()> {
955    let file_name = file_path.file_name().unwrap().to_str().unwrap();
956    println!("-- running {file_name} --");
957
958    let mut failed_num = 0;
959    let cases: Vec<TestCase> = serde_yaml::from_str(file_content).map_err(|e| {
960        let context = if let Some(loc) = e.location() {
961            format!(
962                "failed to parse yaml at {}:{}:{}",
963                file_path.display(),
964                loc.line(),
965                loc.column()
966            )
967        } else {
968            "failed to parse yaml".to_owned()
969        };
970        anyhow::anyhow!(e).context(context)
971    })?;
972    let cases = resolve_testcase_id(cases).expect("failed to resolve");
973    let mut outputs = vec![];
974
975    for (i, c) in cases.into_iter().enumerate() {
976        println!(
977            "Running test #{i} (id: {}), SQL:\n{}",
978            c.id().clone().unwrap_or_else(|| "<none>".to_owned()),
979            c.sql()
980        );
981        match c.run(true).await {
982            Ok(case) => {
983                outputs.push(case);
984            }
985            Err(e) => {
986                eprintln!(
987                    "Test #{i} (id: {}) failed, SQL:\n{}\nError: {}",
988                    c.id().clone().unwrap_or_else(|| "<none>".to_owned()),
989                    c.sql(),
990                    e.as_report()
991                );
992                failed_num += 1;
993            }
994        }
995    }
996
997    let output_path = test_data_dir().join("output").join(file_name);
998    check(outputs, expect_test::expect_file![output_path]);
999
1000    if failed_num > 0 {
1001        println!("\n");
1002        bail!(format!("{} test cases failed", failed_num));
1003    }
1004    Ok(())
1005}