risingwave_planner_test/
lib.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![allow(clippy::derive_partial_eq_without_eq)]
16
17//! Data-driven tests.
18
19risingwave_expr_impl::enable!();
20
21mod resolve_id;
22
23use std::collections::{BTreeMap, HashSet};
24use std::path::{Path, PathBuf};
25use std::sync::Arc;
26
27use anyhow::{Result, anyhow, bail};
28pub use resolve_id::*;
29use risingwave_frontend::handler::util::SourceSchemaCompatExt;
30use risingwave_frontend::handler::{
31    HandlerArgs, create_index, create_mv, create_schema, create_source, create_table, create_view,
32    drop_table, explain, variable,
33};
34use risingwave_frontend::optimizer::backfill_order_strategy::explain_backfill_order_in_dot_format;
35use risingwave_frontend::optimizer::plan_node::ConventionMarker;
36use risingwave_frontend::session::SessionImpl;
37use risingwave_frontend::test_utils::{LocalFrontend, create_proto_file, get_explain_output};
38use risingwave_frontend::{
39    Binder, Explain, FrontendOpts, OptimizerContext, OptimizerContextRef, PlanRef, Planner,
40    WithOptionsSecResolved, build_graph, explain_stream_graph,
41};
42use risingwave_sqlparser::ast::{
43    AstOption, BackfillOrderStrategy, DropMode, EmitMode, ExplainOptions, ObjectName, Statement,
44};
45use risingwave_sqlparser::parser::Parser;
46use serde::{Deserialize, Serialize};
47use thiserror_ext::AsReport;
48
49#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Hash, Eq)]
50#[serde(deny_unknown_fields, rename_all = "snake_case")]
51pub enum TestType {
52    /// The result of an `EXPLAIN` statement.
53    ///
54    /// This field is used when `sql` is an `EXPLAIN` statement.
55    /// In this case, all other fields are invalid.
56    ExplainOutput,
57
58    /// The original logical plan
59    LogicalPlan,
60    /// Logical plan with optimization `.gen_optimized_logical_plan_for_batch()`
61    OptimizedLogicalPlanForBatch,
62    /// Logical plan with optimization `.gen_optimized_logical_plan_for_stream()`
63    OptimizedLogicalPlanForStream,
64
65    /// Distributed batch plan `.gen_batch_query_plan()`
66    BatchPlan,
67    /// Proto JSON of generated batch plan
68    BatchPlanProto,
69    /// Batch plan for local execution `.gen_batch_local_plan()`
70    BatchLocalPlan,
71    /// Batch plan for local execution `.gen_batch_distributed_plan()`
72    BatchDistributedPlan,
73
74    /// Create MV plan `.gen_create_mv_plan()`
75    StreamPlan,
76    /// Create MV fragments plan
77    StreamDistPlan,
78    /// Create MV plan with EOWC semantics `.gen_create_mv_plan(.., EmitMode::OnWindowClose)`
79    EowcStreamPlan,
80    /// Create MV fragments plan with EOWC semantics
81    EowcStreamDistPlan,
82    /// Create Backfill Order Plan
83    BackfillOrderPlan,
84
85    /// Create sink plan (assumes blackhole sink)
86    /// TODO: Other sinks
87    SinkPlan,
88
89    BinderError,
90    PlannerError,
91    OptimizerError,
92    BatchError,
93    BatchLocalError,
94    StreamError,
95    EowcStreamError,
96}
97
98pub fn check(actual: Vec<TestCaseResult>, expect: expect_test::ExpectFile) {
99    let actual = serde_yaml::to_string(&actual).unwrap();
100    expect.assert_eq(&format!("# This file is automatically generated. See `src/frontend/planner_test/README.md` for more information.\n{}",actual));
101}
102
103#[serde_with::skip_serializing_none]
104#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Default)]
105#[serde(deny_unknown_fields)]
106pub struct TestInput {
107    /// Id of the test case, used in before.
108    pub id: Option<String>,
109    /// A brief description of the test case.
110    pub name: Option<String>,
111    /// Before running the SQL statements, the test runner will execute the specified test cases
112    pub before: Option<Vec<String>>,
113    /// The resolved statements of the before ids
114    #[serde(skip_serializing)]
115    before_statements: Option<Vec<String>>,
116    /// The SQL statements
117    pub sql: String,
118}
119
120#[serde_with::skip_serializing_none]
121#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Default)]
122#[serde(deny_unknown_fields)]
123pub struct TestCase {
124    #[serde(flatten)]
125    pub input: TestInput,
126
127    // TODO: these should also be in TestInput, but it affects ordering. So next PR
128    /// Support using file content or file location to create source.
129    pub create_source: Option<CreateConnector>,
130    /// Support using file content or file location to create table with connector.
131    pub create_table_with_connector: Option<CreateConnector>,
132    /// Provide config map to frontend
133    pub with_config_map: Option<BTreeMap<String, String>>,
134
135    /// Specify what output fields to check
136    pub expected_outputs: HashSet<TestType>,
137}
138
139impl TestCase {
140    pub fn id(&self) -> &Option<String> {
141        &self.input.id
142    }
143
144    pub fn name(&self) -> &Option<String> {
145        &self.input.name
146    }
147
148    pub fn before(&self) -> &Option<Vec<String>> {
149        &self.input.before
150    }
151
152    pub fn before_statements(&self) -> &Option<Vec<String>> {
153        &self.input.before_statements
154    }
155
156    pub fn sql(&self) -> &String {
157        &self.input.sql
158    }
159
160    pub fn create_source(&self) -> &Option<CreateConnector> {
161        &self.create_source
162    }
163
164    pub fn create_table_with_connector(&self) -> &Option<CreateConnector> {
165        &self.create_table_with_connector
166    }
167
168    pub fn with_config_map(&self) -> &Option<BTreeMap<String, String>> {
169        &self.with_config_map
170    }
171}
172
173#[serde_with::skip_serializing_none]
174#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Default)]
175#[serde(deny_unknown_fields)]
176pub struct CreateConnector {
177    format: String,
178    encode: String,
179    name: String,
180    file: Option<String>,
181    is_table: Option<bool>,
182}
183
184#[serde_with::skip_serializing_none]
185#[derive(Debug, PartialEq, Serialize, Deserialize, Default)]
186#[serde(deny_unknown_fields)]
187pub struct TestCaseResult {
188    #[serde(flatten)]
189    pub input: TestInput,
190
191    /// The original logical plan
192    pub logical_plan: Option<String>,
193
194    /// Logical plan with optimization `.gen_optimized_logical_plan_for_batch()`
195    pub optimized_logical_plan_for_batch: Option<String>,
196
197    /// Logical plan with optimization `.gen_optimized_logical_plan_for_stream()`
198    pub optimized_logical_plan_for_stream: Option<String>,
199
200    /// Distributed batch plan `.gen_batch_query_plan()`
201    pub batch_plan: Option<String>,
202
203    /// Proto JSON of generated batch plan
204    pub batch_plan_proto: Option<String>,
205
206    /// Batch plan for local execution `.gen_batch_local_plan()`
207    pub batch_local_plan: Option<String>,
208
209    /// Batch plan for distributed execution `.gen_batch_distributed_plan()`
210    pub batch_distributed_plan: Option<String>,
211
212    /// Generate sink plan
213    pub sink_plan: Option<String>,
214
215    /// Create MV plan `.gen_create_mv_plan()`
216    pub stream_plan: Option<String>,
217
218    /// Create MV fragments plan
219    pub stream_dist_plan: Option<String>,
220
221    /// Create MV plan with EOWC semantics `.gen_create_mv_plan(.., EmitMode::OnWindowClose)`
222    pub eowc_stream_plan: Option<String>,
223
224    /// Create MV fragments plan with EOWC semantics
225    pub eowc_stream_dist_plan: Option<String>,
226
227    /// Create Backfill Order Plan
228    pub backfill_order_plan: Option<String>,
229
230    /// Error of binder
231    pub binder_error: Option<String>,
232
233    /// Error of planner
234    pub planner_error: Option<String>,
235
236    /// Error of optimizer
237    pub optimizer_error: Option<String>,
238
239    /// Error of `.gen_batch_query_plan()`
240    pub batch_error: Option<String>,
241
242    /// Error of `.gen_batch_local_plan()`
243    pub batch_local_error: Option<String>,
244
245    /// Error of `.gen_stream_plan()`
246    pub stream_error: Option<String>,
247
248    /// Error of `.gen_stream_plan()` with `emit_on_window_close = true`
249    pub eowc_stream_error: Option<String>,
250
251    /// Error of `.gen_sink_plan()`
252    pub sink_error: Option<String>,
253
254    /// The result of an `EXPLAIN` statement.
255    ///
256    /// This field is used when `sql` is an `EXPLAIN` statement.
257    /// In this case, all other fields are invalid.
258    pub explain_output: Option<String>,
259
260    // TODO: these should also be in TestInput, but it affects ordering. So next PR
261    /// Support using file content or file location to create source.
262    pub create_source: Option<CreateConnector>,
263    /// Support using file content or file location to create table with connector.
264    pub create_table_with_connector: Option<CreateConnector>,
265    /// Provide config map to frontend
266    pub with_config_map: Option<BTreeMap<String, String>>,
267}
268
269impl TestCase {
270    /// Run the test case, and return the expected output.
271    pub async fn run(&self, do_check_result: bool) -> Result<TestCaseResult> {
272        let session = {
273            let frontend = LocalFrontend::new(FrontendOpts::default()).await;
274            frontend.session_ref()
275        };
276
277        if let Some(config_map) = self.with_config_map() {
278            for (key, val) in config_map {
279                session.set_config(key, val.to_owned()).unwrap();
280            }
281        }
282
283        let placeholder_empty_vec = vec![];
284
285        // Since temp file will be deleted when it goes out of scope, so create source in advance.
286        self.do_create_source(session.clone()).await?;
287        self.do_create_table_with_connector(session.clone()).await?;
288
289        let mut result: Option<TestCaseResult> = None;
290        for sql in self
291            .before_statements()
292            .as_ref()
293            .unwrap_or(&placeholder_empty_vec)
294            .iter()
295            .chain(std::iter::once(self.sql()))
296        {
297            result = self
298                .run_sql(
299                    Arc::from(sql.to_owned()),
300                    session.clone(),
301                    do_check_result,
302                    result,
303                )
304                .await?;
305        }
306
307        let mut result = result.unwrap_or_default();
308        result.input = self.input.clone();
309        result.create_source.clone_from(self.create_source());
310        result
311            .create_table_with_connector
312            .clone_from(self.create_table_with_connector());
313        result.with_config_map.clone_from(self.with_config_map());
314
315        Ok(result)
316    }
317
318    #[inline(always)]
319    fn create_connector_sql(
320        is_table: bool,
321        connector_name: String,
322        connector_format: String,
323        connector_encode: String,
324    ) -> String {
325        let object_to_create = if is_table { "TABLE" } else { "SOURCE" };
326        format!(
327            r#"CREATE {} {}
328    WITH (connector = 'kafka', kafka.topic = 'abc', kafka.brokers = 'localhost:1001')
329    FORMAT {} ENCODE {} (message = '.test.TestRecord', schema.location = 'file://"#,
330            object_to_create, connector_name, connector_format, connector_encode
331        )
332    }
333
334    async fn do_create_table_with_connector(
335        &self,
336        session: Arc<SessionImpl>,
337    ) -> Result<Option<TestCaseResult>> {
338        match self.create_table_with_connector().clone() {
339            Some(connector) => {
340                if let Some(content) = connector.file {
341                    let sql = Self::create_connector_sql(
342                        true,
343                        connector.name,
344                        connector.format,
345                        connector.encode,
346                    );
347                    let temp_file = create_proto_file(content.as_str());
348                    self.run_sql(
349                        Arc::from(sql + temp_file.path().to_str().unwrap() + "')"),
350                        session.clone(),
351                        false,
352                        None,
353                    )
354                    .await
355                } else {
356                    panic!(
357                        "{:?} create table with connector must include `file` for the file content",
358                        self.id()
359                    );
360                }
361            }
362            None => Ok(None),
363        }
364    }
365
366    // If testcase have create source info, run sql to create source.
367    // Support create source by file content or file location.
368    async fn do_create_source(&self, session: Arc<SessionImpl>) -> Result<Option<TestCaseResult>> {
369        match self.create_source().clone() {
370            Some(source) => {
371                if let Some(content) = source.file {
372                    let sql = Self::create_connector_sql(
373                        false,
374                        source.name,
375                        source.format,
376                        source.encode,
377                    );
378                    let temp_file = create_proto_file(content.as_str());
379                    self.run_sql(
380                        Arc::from(sql + temp_file.path().to_str().unwrap() + "')"),
381                        session.clone(),
382                        false,
383                        None,
384                    )
385                    .await
386                } else {
387                    panic!(
388                        "{:?} create source must include `file` for the file content",
389                        self.id()
390                    );
391                }
392            }
393            None => Ok(None),
394        }
395    }
396
397    async fn run_sql(
398        &self,
399        sql: Arc<str>,
400        session: Arc<SessionImpl>,
401        do_check_result: bool,
402        mut result: Option<TestCaseResult>,
403    ) -> Result<Option<TestCaseResult>> {
404        let statements = Parser::parse_sql(&sql).unwrap();
405        for stmt in statements {
406            // TODO: `sql` may contain multiple statements here.
407            let handler_args = HandlerArgs::new(session.clone(), &stmt, sql.clone())?;
408            let _guard = session.txn_begin_implicit();
409            match stmt.clone() {
410                Statement::Query(_)
411                | Statement::Insert { .. }
412                | Statement::Delete { .. }
413                | Statement::Update { .. } => {
414                    if result.is_some() {
415                        panic!("two queries in one test case");
416                    }
417                    let explain_options = ExplainOptions {
418                        verbose: true,
419                        ..Default::default()
420                    };
421                    let context = OptimizerContext::new(
422                        HandlerArgs::new(session.clone(), &stmt, sql.clone())?,
423                        explain_options,
424                    );
425                    let ret = self.apply_query(&stmt, context.into())?;
426                    if do_check_result {
427                        check_result(self, &ret)?;
428                    }
429                    result = Some(ret);
430                }
431                Statement::CreateTable {
432                    name,
433                    columns,
434                    constraints,
435                    if_not_exists,
436                    format_encode,
437                    source_watermarks,
438                    append_only,
439                    on_conflict,
440                    with_version_columns,
441                    cdc_table_info,
442                    include_column_options,
443                    wildcard_idx,
444                    webhook_info,
445                    engine,
446                    ..
447                } => {
448                    let format_encode = format_encode.map(|schema| schema.into_v2_with_warning());
449
450                    create_table::handle_create_table(
451                        handler_args,
452                        name,
453                        columns,
454                        wildcard_idx,
455                        constraints,
456                        if_not_exists,
457                        format_encode,
458                        source_watermarks,
459                        append_only,
460                        on_conflict,
461                        with_version_columns
462                            .iter()
463                            .map(|x| x.real_value())
464                            .collect(),
465                        cdc_table_info,
466                        include_column_options,
467                        webhook_info,
468                        engine,
469                    )
470                    .await?;
471                }
472                Statement::CreateSource { stmt } => {
473                    if let Err(error) =
474                        create_source::handle_create_source(handler_args, stmt).await
475                    {
476                        let actual_result = TestCaseResult {
477                            planner_error: Some(error.to_report_string()),
478                            ..Default::default()
479                        };
480
481                        check_result(self, &actual_result)?;
482                        result = Some(actual_result);
483                    }
484                }
485                Statement::CreateIndex {
486                    name,
487                    table_name,
488                    method,
489                    columns,
490                    include,
491                    distributed_by,
492                    if_not_exists,
493                    // TODO: support unique and if_not_exist in planner test
494                    ..
495                } => {
496                    create_index::handle_create_index(
497                        handler_args,
498                        if_not_exists,
499                        name,
500                        table_name,
501                        method,
502                        columns,
503                        include,
504                        distributed_by,
505                    )
506                    .await?;
507                }
508                Statement::CreateView {
509                    materialized: true,
510                    or_replace: false,
511                    if_not_exists,
512                    name,
513                    query,
514                    columns,
515                    emit_mode,
516                    ..
517                } => {
518                    create_mv::handle_create_mv(
519                        handler_args,
520                        if_not_exists,
521                        name,
522                        *query,
523                        columns,
524                        emit_mode,
525                    )
526                    .await?;
527                }
528                Statement::CreateView {
529                    materialized: false,
530                    or_replace: false,
531                    if_not_exists,
532                    name,
533                    query,
534                    columns,
535                    ..
536                } => {
537                    create_view::handle_create_view(
538                        handler_args,
539                        if_not_exists,
540                        name,
541                        columns,
542                        *query,
543                    )
544                    .await?;
545                }
546                Statement::Drop(drop_statement) => {
547                    drop_table::handle_drop_table(
548                        handler_args,
549                        drop_statement.object_name,
550                        drop_statement.if_exists,
551                        matches!(drop_statement.drop_mode, AstOption::Some(DropMode::Cascade)),
552                    )
553                    .await?;
554                }
555                Statement::SetVariable {
556                    local: _,
557                    variable,
558                    value,
559                } => {
560                    variable::handle_set(handler_args, variable, value).unwrap();
561                }
562                Statement::Explain {
563                    analyze,
564                    statement,
565                    options,
566                } => {
567                    if result.is_some() {
568                        panic!("two queries in one test case");
569                    }
570                    let rsp =
571                        explain::handle_explain(handler_args, *statement, options, analyze).await?;
572
573                    let explain_output = get_explain_output(rsp).await;
574                    let ret = TestCaseResult {
575                        explain_output: Some(explain_output),
576                        ..Default::default()
577                    };
578                    if do_check_result {
579                        check_result(self, &ret)?;
580                    }
581                    result = Some(ret);
582                }
583                Statement::CreateSchema {
584                    schema_name,
585                    if_not_exists,
586                    owner,
587                } => {
588                    create_schema::handle_create_schema(
589                        handler_args,
590                        schema_name,
591                        if_not_exists,
592                        owner,
593                    )
594                    .await?;
595                }
596                _ => return Err(anyhow!("Unsupported statement type")),
597            }
598        }
599        Ok(result)
600    }
601
602    fn apply_query(
603        &self,
604        stmt: &Statement,
605        context: OptimizerContextRef,
606    ) -> Result<TestCaseResult> {
607        let session = context.session_ctx().clone();
608        let mut ret = TestCaseResult::default();
609
610        let bound = {
611            let mut binder = Binder::new_for_batch(&session);
612            match binder.bind(stmt.clone()) {
613                Ok(bound) => bound,
614                Err(err) => {
615                    ret.binder_error = Some(err.to_report_string_pretty());
616                    return Ok(ret);
617                }
618            }
619        };
620
621        let mut planner = Planner::new_for_stream(context.clone());
622
623        let plan_root = match planner.plan(bound) {
624            Ok(plan_root) => {
625                if self.expected_outputs.contains(&TestType::LogicalPlan) {
626                    ret.logical_plan =
627                        Some(explain_plan(&plan_root.clone().into_unordered_subplan()));
628                }
629                plan_root
630            }
631            Err(err) => {
632                ret.planner_error = Some(err.to_report_string_pretty());
633                return Ok(ret);
634            }
635        };
636
637        if self
638            .expected_outputs
639            .contains(&TestType::OptimizedLogicalPlanForBatch)
640            || self.expected_outputs.contains(&TestType::OptimizerError)
641        {
642            let plan_root = plan_root.clone();
643            let optimized_logical_plan_for_batch =
644                match plan_root.gen_optimized_logical_plan_for_batch() {
645                    Ok(optimized_logical_plan_for_batch) => optimized_logical_plan_for_batch,
646                    Err(err) => {
647                        ret.optimizer_error = Some(err.to_report_string_pretty());
648                        return Ok(ret);
649                    }
650                };
651
652            // Only generate optimized_logical_plan_for_batch if it is specified in test case
653            if self
654                .expected_outputs
655                .contains(&TestType::OptimizedLogicalPlanForBatch)
656            {
657                ret.optimized_logical_plan_for_batch =
658                    Some(explain_plan(&optimized_logical_plan_for_batch.plan));
659            }
660        }
661
662        if self
663            .expected_outputs
664            .contains(&TestType::OptimizedLogicalPlanForStream)
665            || self.expected_outputs.contains(&TestType::OptimizerError)
666        {
667            let plan_root = plan_root.clone();
668            let optimized_logical_plan_for_stream =
669                match plan_root.gen_optimized_logical_plan_for_stream() {
670                    Ok(optimized_logical_plan_for_stream) => optimized_logical_plan_for_stream,
671                    Err(err) => {
672                        ret.optimizer_error = Some(err.to_report_string_pretty());
673                        return Ok(ret);
674                    }
675                };
676
677            // Only generate optimized_logical_plan_for_stream if it is specified in test case
678            if self
679                .expected_outputs
680                .contains(&TestType::OptimizedLogicalPlanForStream)
681            {
682                ret.optimized_logical_plan_for_stream =
683                    Some(explain_plan(&optimized_logical_plan_for_stream.plan));
684            }
685        }
686
687        'batch: {
688            if self.expected_outputs.contains(&TestType::BatchPlan)
689                || self.expected_outputs.contains(&TestType::BatchPlanProto)
690                || self.expected_outputs.contains(&TestType::BatchError)
691            {
692                let plan_root = plan_root.clone();
693                let batch_plan = match plan_root.gen_batch_plan() {
694                    Ok(batch_plan) => match batch_plan.gen_batch_distributed_plan() {
695                        Ok(batch_plan) => batch_plan,
696                        Err(err) => {
697                            ret.batch_error = Some(err.to_report_string_pretty());
698                            break 'batch;
699                        }
700                    },
701                    Err(err) => {
702                        ret.batch_error = Some(err.to_report_string_pretty());
703                        break 'batch;
704                    }
705                };
706
707                // Only generate batch_plan if it is specified in test case
708                if self.expected_outputs.contains(&TestType::BatchPlan) {
709                    ret.batch_plan = Some(explain_plan(&batch_plan));
710                }
711
712                // Only generate batch_plan_proto if it is specified in test case
713                if self.expected_outputs.contains(&TestType::BatchPlanProto) {
714                    ret.batch_plan_proto = Some(serde_yaml::to_string(
715                        &batch_plan.to_batch_prost_identity(false)?,
716                    )?);
717                }
718            }
719        }
720
721        'local_batch: {
722            if self.expected_outputs.contains(&TestType::BatchLocalPlan)
723                || self.expected_outputs.contains(&TestType::BatchError)
724            {
725                let plan_root = plan_root.clone();
726                let batch_plan = match plan_root.gen_batch_plan() {
727                    Ok(batch_plan) => match batch_plan.gen_batch_local_plan() {
728                        Ok(batch_plan) => batch_plan,
729                        Err(err) => {
730                            ret.batch_error = Some(err.to_report_string_pretty());
731                            break 'local_batch;
732                        }
733                    },
734                    Err(err) => {
735                        ret.batch_error = Some(err.to_report_string_pretty());
736                        break 'local_batch;
737                    }
738                };
739
740                // Only generate batch_plan if it is specified in test case
741                if self.expected_outputs.contains(&TestType::BatchLocalPlan) {
742                    ret.batch_local_plan = Some(explain_plan(&batch_plan));
743                }
744            }
745        }
746
747        'distributed_batch: {
748            if self
749                .expected_outputs
750                .contains(&TestType::BatchDistributedPlan)
751                || self.expected_outputs.contains(&TestType::BatchError)
752            {
753                let plan_root = plan_root.clone();
754                let batch_plan = match plan_root.gen_batch_plan() {
755                    Ok(batch_plan) => match batch_plan.gen_batch_distributed_plan() {
756                        Ok(batch_plan) => batch_plan,
757                        Err(err) => {
758                            ret.batch_error = Some(err.to_report_string_pretty());
759                            break 'distributed_batch;
760                        }
761                    },
762                    Err(err) => {
763                        ret.batch_error = Some(err.to_report_string_pretty());
764                        break 'distributed_batch;
765                    }
766                };
767
768                // Only generate batch_plan if it is specified in test case
769                if self
770                    .expected_outputs
771                    .contains(&TestType::BatchDistributedPlan)
772                {
773                    ret.batch_distributed_plan = Some(explain_plan(&batch_plan));
774                }
775            }
776        }
777
778        {
779            // stream
780            for (
781                emit_mode,
782                plan,
783                ret_plan_str,
784                dist_plan,
785                ret_dist_plan_str,
786                error,
787                ret_error_str,
788            ) in [
789                (
790                    EmitMode::Immediately,
791                    self.expected_outputs.contains(&TestType::StreamPlan),
792                    &mut ret.stream_plan,
793                    self.expected_outputs.contains(&TestType::StreamDistPlan),
794                    &mut ret.stream_dist_plan,
795                    self.expected_outputs.contains(&TestType::StreamError),
796                    &mut ret.stream_error,
797                ),
798                (
799                    EmitMode::OnWindowClose,
800                    self.expected_outputs.contains(&TestType::EowcStreamPlan),
801                    &mut ret.eowc_stream_plan,
802                    self.expected_outputs
803                        .contains(&TestType::EowcStreamDistPlan),
804                    &mut ret.eowc_stream_dist_plan,
805                    self.expected_outputs.contains(&TestType::EowcStreamError),
806                    &mut ret.eowc_stream_error,
807                ),
808            ] {
809                if !plan && !dist_plan && !error {
810                    continue;
811                }
812
813                let q = if let Statement::Query(q) = stmt {
814                    q.as_ref().clone()
815                } else {
816                    return Err(anyhow!("expect a query"));
817                };
818
819                let (stream_plan, table) = match create_mv::gen_create_mv_plan(
820                    &session,
821                    context.clone(),
822                    q,
823                    ObjectName(vec!["test".into()]),
824                    vec![],
825                    Some(emit_mode),
826                ) {
827                    Ok(r) => r,
828                    Err(err) => {
829                        *ret_error_str = Some(err.to_report_string_pretty());
830                        continue;
831                    }
832                };
833
834                // Only generate stream_plan if it is specified in test case
835                if plan {
836                    *ret_plan_str = Some(explain_plan(&stream_plan));
837                }
838
839                // Only generate stream_dist_plan if it is specified in test case
840                if dist_plan {
841                    let graph = build_graph(stream_plan.clone(), None)?;
842                    *ret_dist_plan_str =
843                        Some(explain_stream_graph(&graph, Some(table.to_prost()), false));
844                }
845
846                if self.expected_outputs.contains(&TestType::BackfillOrderPlan) {
847                    match explain_backfill_order_in_dot_format(
848                        &session,
849                        BackfillOrderStrategy::Auto,
850                        stream_plan,
851                    ) {
852                        Ok(formatted_order_plan) => {
853                            ret.backfill_order_plan = Some(formatted_order_plan);
854                        }
855                        Err(err) => {
856                            *ret_error_str = Some(err.to_report_string_pretty());
857                        }
858                    }
859                }
860            }
861        }
862
863        'sink: {
864            if self.expected_outputs.contains(&TestType::SinkPlan) {
865                let plan_root = plan_root;
866                let sink_name = "sink_test";
867                let mut options = BTreeMap::new();
868                options.insert("connector".to_owned(), "blackhole".to_owned());
869                options.insert("type".to_owned(), "append-only".to_owned());
870                // let options = WithOptionsSecResolved::without_secrets(options);
871                let options = WithOptionsSecResolved::without_secrets(options);
872                let format_desc = (&options).try_into().unwrap();
873                match plan_root.gen_sink_plan(
874                    sink_name.to_owned(),
875                    format!("CREATE SINK {sink_name} AS {}", stmt),
876                    options,
877                    false,
878                    "test_db".into(),
879                    "test_table".into(),
880                    format_desc,
881                    false,
882                    None,
883                    None,
884                    false,
885                    None,
886                ) {
887                    Ok(sink_plan) => {
888                        ret.sink_plan = Some(explain_plan(&sink_plan.into()));
889                        break 'sink;
890                    }
891                    Err(err) => {
892                        ret.sink_error = Some(err.to_report_string_pretty());
893                        break 'sink;
894                    }
895                }
896            }
897        }
898
899        Ok(ret)
900    }
901}
902
903fn explain_plan(plan: &PlanRef<impl ConventionMarker>) -> String {
904    plan.explain_to_string()
905}
906
907/// Checks that the result matches `test_case.expected_outputs`.
908///
909/// We don't check the result matches here.
910fn check_result(test_case: &TestCase, actual: &TestCaseResult) -> Result<()> {
911    macro_rules! check {
912        ($field:ident) => {
913            paste::paste! {
914                let case_contains = test_case.expected_outputs.contains(&TestType:: [< $field:camel >]  );
915                let actual_contains = &actual.$field;
916                match (case_contains, actual_contains) {
917                    (false, None) | (true, Some(_)) => {},
918                    (false, Some(e)) => return Err(anyhow!("unexpected {}: {}", stringify!($field), e)),
919                    (true, None) => return Err(anyhow!(
920                        "expected {}, but there's no such result during execution",
921                        stringify!($field)
922                    )),
923                }
924            }
925        };
926    }
927
928    check!(binder_error);
929    check!(planner_error);
930    check!(optimizer_error);
931    check!(batch_error);
932    check!(batch_local_error);
933    check!(stream_error);
934    check!(eowc_stream_error);
935
936    check!(logical_plan);
937    check!(optimized_logical_plan_for_batch);
938    check!(optimized_logical_plan_for_stream);
939    check!(batch_plan);
940    check!(batch_local_plan);
941    check!(stream_plan);
942    check!(stream_dist_plan);
943    check!(eowc_stream_plan);
944    check!(eowc_stream_dist_plan);
945    check!(batch_plan_proto);
946    check!(sink_plan);
947
948    check!(explain_output);
949
950    Ok(())
951}
952
953/// `/tests/testdata` directory.
954pub fn test_data_dir() -> PathBuf {
955    std::path::Path::new(env!("CARGO_MANIFEST_DIR"))
956        .join("tests")
957        .join("testdata")
958}
959
960pub async fn run_test_file(file_path: &Path, file_content: &str) -> Result<()> {
961    let file_name = file_path.file_name().unwrap().to_str().unwrap();
962    println!("-- running {file_name} --");
963
964    let mut failed_num = 0;
965    let cases: Vec<TestCase> = serde_yaml::from_str(file_content).map_err(|e| {
966        let context = if let Some(loc) = e.location() {
967            format!(
968                "failed to parse yaml at {}:{}:{}",
969                file_path.display(),
970                loc.line(),
971                loc.column()
972            )
973        } else {
974            "failed to parse yaml".to_owned()
975        };
976        anyhow::anyhow!(e).context(context)
977    })?;
978    let cases = resolve_testcase_id(cases).expect("failed to resolve");
979    let mut outputs = vec![];
980
981    for (i, c) in cases.into_iter().enumerate() {
982        println!(
983            "Running test #{i} (id: {}), SQL:\n{}",
984            c.id().clone().unwrap_or_else(|| "<none>".to_owned()),
985            c.sql()
986        );
987        match c.run(true).await {
988            Ok(case) => {
989                outputs.push(case);
990            }
991            Err(e) => {
992                eprintln!(
993                    "Test #{i} (id: {}) failed, SQL:\n{}\nError: {}",
994                    c.id().clone().unwrap_or_else(|| "<none>".to_owned()),
995                    c.sql(),
996                    e.as_report()
997                );
998                failed_num += 1;
999            }
1000        }
1001    }
1002
1003    let output_path = test_data_dir().join("output").join(file_name);
1004    check(outputs, expect_test::expect_file![output_path]);
1005
1006    if failed_num > 0 {
1007        println!("\n");
1008        bail!(format!("{} test cases failed", failed_num));
1009    }
1010    Ok(())
1011}