risingwave_planner_test/
lib.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![allow(clippy::derive_partial_eq_without_eq)]
16
17//! Data-driven tests.
18
19risingwave_expr_impl::enable!();
20
21mod resolve_id;
22
23use std::collections::{BTreeMap, HashSet};
24use std::path::{Path, PathBuf};
25use std::sync::Arc;
26
27use anyhow::{Result, anyhow, bail};
28pub use resolve_id::*;
29use risingwave_frontend::handler::util::SourceSchemaCompatExt;
30use risingwave_frontend::handler::{
31    HandlerArgs, create_index, create_mv, create_schema, create_source, create_table, create_view,
32    drop_table, explain, variable,
33};
34use risingwave_frontend::optimizer::backfill_order_strategy::explain_backfill_order_in_dot_format;
35use risingwave_frontend::optimizer::plan_node::ConventionMarker;
36use risingwave_frontend::session::SessionImpl;
37use risingwave_frontend::test_utils::{LocalFrontend, create_proto_file, get_explain_output};
38use risingwave_frontend::{
39    Binder, Explain, FrontendOpts, OptimizerContext, OptimizerContextRef, PlanRef, Planner,
40    WithOptionsSecResolved, build_graph, explain_stream_graph,
41};
42use risingwave_sqlparser::ast::{
43    AstOption, BackfillOrderStrategy, DropMode, EmitMode, ExplainOptions, ObjectName, Statement,
44};
45use risingwave_sqlparser::parser::Parser;
46use serde::{Deserialize, Serialize};
47use thiserror_ext::AsReport;
48
49#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Hash, Eq)]
50#[serde(deny_unknown_fields, rename_all = "snake_case")]
51pub enum TestType {
52    /// The result of an `EXPLAIN` statement.
53    ///
54    /// This field is used when `sql` is an `EXPLAIN` statement.
55    /// In this case, all other fields are invalid.
56    ExplainOutput,
57
58    /// The original logical plan
59    LogicalPlan,
60    /// Logical plan with optimization `.gen_optimized_logical_plan_for_batch()`
61    OptimizedLogicalPlanForBatch,
62    /// Logical plan with optimization `.gen_optimized_logical_plan_for_stream()`
63    OptimizedLogicalPlanForStream,
64
65    /// Distributed batch plan `.gen_batch_query_plan()`
66    BatchPlan,
67    /// Proto JSON of generated batch plan
68    BatchPlanProto,
69    /// Batch plan for local execution `.gen_batch_local_plan()`
70    BatchLocalPlan,
71    /// Batch plan for local execution `.gen_batch_distributed_plan()`
72    BatchDistributedPlan,
73
74    /// Create MV plan `.gen_create_mv_plan()`
75    StreamPlan,
76    /// Create MV fragments plan
77    StreamDistPlan,
78    /// Create MV plan with EOWC semantics `.gen_create_mv_plan(.., EmitMode::OnWindowClose)`
79    EowcStreamPlan,
80    /// Create MV fragments plan with EOWC semantics
81    EowcStreamDistPlan,
82    /// Create Backfill Order Plan
83    BackfillOrderPlan,
84
85    /// Create sink plan (assumes blackhole sink)
86    /// TODO: Other sinks
87    SinkPlan,
88
89    BinderError,
90    PlannerError,
91    OptimizerError,
92    BatchError,
93    BatchLocalError,
94    StreamError,
95    EowcStreamError,
96}
97
98pub fn check(actual: Vec<TestCaseResult>, expect: expect_test::ExpectFile) {
99    let actual = serde_yaml::to_string(&actual).unwrap();
100    expect.assert_eq(&format!("# This file is automatically generated. See `src/frontend/planner_test/README.md` for more information.\n{}",actual));
101}
102
103#[serde_with::skip_serializing_none]
104#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Default)]
105#[serde(deny_unknown_fields)]
106pub struct TestInput {
107    /// Id of the test case, used in before.
108    pub id: Option<String>,
109    /// A brief description of the test case.
110    pub name: Option<String>,
111    /// Before running the SQL statements, the test runner will execute the specified test cases
112    pub before: Option<Vec<String>>,
113    /// The resolved statements of the before ids
114    #[serde(skip_serializing)]
115    before_statements: Option<Vec<String>>,
116    /// The SQL statements
117    pub sql: String,
118}
119
120#[serde_with::skip_serializing_none]
121#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Default)]
122#[serde(deny_unknown_fields)]
123pub struct TestCase {
124    #[serde(flatten)]
125    pub input: TestInput,
126
127    // TODO: these should also be in TestInput, but it affects ordering. So next PR
128    /// Support using file content or file location to create source.
129    pub create_source: Option<CreateConnector>,
130    /// Support using file content or file location to create table with connector.
131    pub create_table_with_connector: Option<CreateConnector>,
132    /// Provide config map to frontend
133    pub with_config_map: Option<BTreeMap<String, String>>,
134
135    /// Specify what output fields to check
136    pub expected_outputs: HashSet<TestType>,
137}
138
139impl TestCase {
140    pub fn id(&self) -> &Option<String> {
141        &self.input.id
142    }
143
144    pub fn name(&self) -> &Option<String> {
145        &self.input.name
146    }
147
148    pub fn before(&self) -> &Option<Vec<String>> {
149        &self.input.before
150    }
151
152    pub fn before_statements(&self) -> &Option<Vec<String>> {
153        &self.input.before_statements
154    }
155
156    pub fn sql(&self) -> &String {
157        &self.input.sql
158    }
159
160    pub fn create_source(&self) -> &Option<CreateConnector> {
161        &self.create_source
162    }
163
164    pub fn create_table_with_connector(&self) -> &Option<CreateConnector> {
165        &self.create_table_with_connector
166    }
167
168    pub fn with_config_map(&self) -> &Option<BTreeMap<String, String>> {
169        &self.with_config_map
170    }
171}
172
173#[serde_with::skip_serializing_none]
174#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Default)]
175#[serde(deny_unknown_fields)]
176pub struct CreateConnector {
177    format: String,
178    encode: String,
179    name: String,
180    file: Option<String>,
181    is_table: Option<bool>,
182}
183
184#[serde_with::skip_serializing_none]
185#[derive(Debug, PartialEq, Serialize, Deserialize, Default)]
186#[serde(deny_unknown_fields)]
187pub struct TestCaseResult {
188    #[serde(flatten)]
189    pub input: TestInput,
190
191    /// The original logical plan
192    pub logical_plan: Option<String>,
193
194    /// Logical plan with optimization `.gen_optimized_logical_plan_for_batch()`
195    pub optimized_logical_plan_for_batch: Option<String>,
196
197    /// Logical plan with optimization `.gen_optimized_logical_plan_for_stream()`
198    pub optimized_logical_plan_for_stream: Option<String>,
199
200    /// Distributed batch plan `.gen_batch_query_plan()`
201    pub batch_plan: Option<String>,
202
203    /// Proto JSON of generated batch plan
204    pub batch_plan_proto: Option<String>,
205
206    /// Batch plan for local execution `.gen_batch_local_plan()`
207    pub batch_local_plan: Option<String>,
208
209    /// Batch plan for distributed execution `.gen_batch_distributed_plan()`
210    pub batch_distributed_plan: Option<String>,
211
212    /// Generate sink plan
213    pub sink_plan: Option<String>,
214
215    /// Create MV plan `.gen_create_mv_plan()`
216    pub stream_plan: Option<String>,
217
218    /// Create MV fragments plan
219    pub stream_dist_plan: Option<String>,
220
221    /// Create MV plan with EOWC semantics `.gen_create_mv_plan(.., EmitMode::OnWindowClose)`
222    pub eowc_stream_plan: Option<String>,
223
224    /// Create MV fragments plan with EOWC semantics
225    pub eowc_stream_dist_plan: Option<String>,
226
227    /// Create Backfill Order Plan
228    pub backfill_order_plan: Option<String>,
229
230    /// Error of binder
231    pub binder_error: Option<String>,
232
233    /// Error of planner
234    pub planner_error: Option<String>,
235
236    /// Error of optimizer
237    pub optimizer_error: Option<String>,
238
239    /// Error of `.gen_batch_query_plan()`
240    pub batch_error: Option<String>,
241
242    /// Error of `.gen_batch_local_plan()`
243    pub batch_local_error: Option<String>,
244
245    /// Error of `.gen_stream_plan()`
246    pub stream_error: Option<String>,
247
248    /// Error of `.gen_stream_plan()` with `emit_on_window_close = true`
249    pub eowc_stream_error: Option<String>,
250
251    /// Error of `.gen_sink_plan()`
252    pub sink_error: Option<String>,
253
254    /// The result of an `EXPLAIN` statement.
255    ///
256    /// This field is used when `sql` is an `EXPLAIN` statement.
257    /// In this case, all other fields are invalid.
258    pub explain_output: Option<String>,
259
260    // TODO: these should also be in TestInput, but it affects ordering. So next PR
261    /// Support using file content or file location to create source.
262    pub create_source: Option<CreateConnector>,
263    /// Support using file content or file location to create table with connector.
264    pub create_table_with_connector: Option<CreateConnector>,
265    /// Provide config map to frontend
266    pub with_config_map: Option<BTreeMap<String, String>>,
267}
268
269impl TestCase {
270    /// Run the test case, and return the expected output.
271    pub async fn run(&self, do_check_result: bool) -> Result<TestCaseResult> {
272        let session = {
273            let frontend = LocalFrontend::new(FrontendOpts::default()).await;
274            frontend.session_ref()
275        };
276
277        if let Some(config_map) = self.with_config_map() {
278            for (key, val) in config_map {
279                session.set_config(key, val.to_owned()).unwrap();
280            }
281        }
282
283        let placeholder_empty_vec = vec![];
284
285        // Since temp file will be deleted when it goes out of scope, so create source in advance.
286        Box::pin(self.do_create_source(session.clone())).await?;
287        Box::pin(self.do_create_table_with_connector(session.clone())).await?;
288
289        let mut result: Option<TestCaseResult> = None;
290        for sql in self
291            .before_statements()
292            .as_ref()
293            .unwrap_or(&placeholder_empty_vec)
294            .iter()
295            .chain(std::iter::once(self.sql()))
296        {
297            result = Box::pin(self.run_sql(
298                Arc::from(sql.to_owned()),
299                session.clone(),
300                do_check_result,
301                result,
302            ))
303            .await?;
304        }
305
306        let mut result = result.unwrap_or_default();
307        result.input = self.input.clone();
308        result.create_source.clone_from(self.create_source());
309        result
310            .create_table_with_connector
311            .clone_from(self.create_table_with_connector());
312        result.with_config_map.clone_from(self.with_config_map());
313
314        Ok(result)
315    }
316
317    #[inline(always)]
318    fn create_connector_sql(
319        is_table: bool,
320        connector_name: String,
321        connector_format: String,
322        connector_encode: String,
323    ) -> String {
324        let object_to_create = if is_table { "TABLE" } else { "SOURCE" };
325        format!(
326            r#"CREATE {} {}
327    WITH (connector = 'kafka', kafka.topic = 'abc', kafka.brokers = 'localhost:1001')
328    FORMAT {} ENCODE {} (message = '.test.TestRecord', schema.location = 'file://"#,
329            object_to_create, connector_name, connector_format, connector_encode
330        )
331    }
332
333    async fn do_create_table_with_connector(
334        &self,
335        session: Arc<SessionImpl>,
336    ) -> Result<Option<TestCaseResult>> {
337        match self.create_table_with_connector().clone() {
338            Some(connector) => {
339                if let Some(content) = connector.file {
340                    let sql = Self::create_connector_sql(
341                        true,
342                        connector.name,
343                        connector.format,
344                        connector.encode,
345                    );
346                    let temp_file = create_proto_file(content.as_str());
347                    Box::pin(self.run_sql(
348                        Arc::from(sql + temp_file.path().to_str().unwrap() + "')"),
349                        session.clone(),
350                        false,
351                        None,
352                    ))
353                    .await
354                } else {
355                    panic!(
356                        "{:?} create table with connector must include `file` for the file content",
357                        self.id()
358                    );
359                }
360            }
361            None => Ok(None),
362        }
363    }
364
365    // If testcase have create source info, run sql to create source.
366    // Support create source by file content or file location.
367    async fn do_create_source(&self, session: Arc<SessionImpl>) -> Result<Option<TestCaseResult>> {
368        match self.create_source().clone() {
369            Some(source) => {
370                if let Some(content) = source.file {
371                    let sql = Self::create_connector_sql(
372                        false,
373                        source.name,
374                        source.format,
375                        source.encode,
376                    );
377                    let temp_file = create_proto_file(content.as_str());
378                    Box::pin(self.run_sql(
379                        Arc::from(sql + temp_file.path().to_str().unwrap() + "')"),
380                        session.clone(),
381                        false,
382                        None,
383                    ))
384                    .await
385                } else {
386                    panic!(
387                        "{:?} create source must include `file` for the file content",
388                        self.id()
389                    );
390                }
391            }
392            None => Ok(None),
393        }
394    }
395
396    async fn run_sql(
397        &self,
398        sql: Arc<str>,
399        session: Arc<SessionImpl>,
400        do_check_result: bool,
401        mut result: Option<TestCaseResult>,
402    ) -> Result<Option<TestCaseResult>> {
403        let statements = Parser::parse_sql(&sql).unwrap();
404        for stmt in statements {
405            // TODO: `sql` may contain multiple statements here.
406            let handler_args = HandlerArgs::new(session.clone(), &stmt, sql.clone())?;
407            let _guard = session.txn_begin_implicit();
408            match stmt.clone() {
409                Statement::Query(_)
410                | Statement::Insert { .. }
411                | Statement::Delete { .. }
412                | Statement::Update { .. } => {
413                    if result.is_some() {
414                        panic!("two queries in one test case");
415                    }
416                    let explain_options = ExplainOptions {
417                        verbose: true,
418                        ..Default::default()
419                    };
420                    let context = OptimizerContext::new(
421                        HandlerArgs::new(session.clone(), &stmt, sql.clone())?,
422                        explain_options,
423                    );
424                    let ret = self.apply_query(&stmt, context.into())?;
425                    if do_check_result {
426                        check_result(self, &ret)?;
427                    }
428                    result = Some(ret);
429                }
430                Statement::CreateTable {
431                    name,
432                    columns,
433                    constraints,
434                    if_not_exists,
435                    format_encode,
436                    source_watermarks,
437                    append_only,
438                    on_conflict,
439                    with_version_columns,
440                    cdc_table_info,
441                    include_column_options,
442                    wildcard_idx,
443                    webhook_info,
444                    engine,
445                    ..
446                } => {
447                    let format_encode = format_encode.map(|schema| schema.into_v2_with_warning());
448
449                    Box::pin(create_table::handle_create_table(
450                        handler_args,
451                        name,
452                        columns,
453                        wildcard_idx,
454                        constraints,
455                        if_not_exists,
456                        format_encode,
457                        source_watermarks,
458                        append_only,
459                        on_conflict,
460                        with_version_columns
461                            .iter()
462                            .map(|x| x.real_value())
463                            .collect(),
464                        cdc_table_info,
465                        include_column_options,
466                        webhook_info,
467                        engine,
468                    ))
469                    .await?;
470                }
471                Statement::CreateSource { stmt } => {
472                    if let Err(error) =
473                        create_source::handle_create_source(handler_args, stmt).await
474                    {
475                        let actual_result = TestCaseResult {
476                            planner_error: Some(error.to_report_string()),
477                            ..Default::default()
478                        };
479
480                        check_result(self, &actual_result)?;
481                        result = Some(actual_result);
482                    }
483                }
484                Statement::CreateIndex {
485                    name,
486                    table_name,
487                    method,
488                    columns,
489                    include,
490                    distributed_by,
491                    if_not_exists,
492                    // TODO: support unique and if_not_exist in planner test
493                    ..
494                } => {
495                    create_index::handle_create_index(
496                        handler_args,
497                        if_not_exists,
498                        name,
499                        table_name,
500                        method,
501                        columns,
502                        include,
503                        distributed_by,
504                    )
505                    .await?;
506                }
507                Statement::CreateView {
508                    materialized: true,
509                    or_replace: false,
510                    if_not_exists,
511                    name,
512                    query,
513                    columns,
514                    emit_mode,
515                    ..
516                } => {
517                    create_mv::handle_create_mv(
518                        handler_args,
519                        if_not_exists,
520                        name,
521                        *query,
522                        columns,
523                        emit_mode,
524                    )
525                    .await?;
526                }
527                Statement::CreateView {
528                    materialized: false,
529                    or_replace: false,
530                    if_not_exists,
531                    name,
532                    query,
533                    columns,
534                    ..
535                } => {
536                    create_view::handle_create_view(
537                        handler_args,
538                        if_not_exists,
539                        name,
540                        columns,
541                        *query,
542                    )
543                    .await?;
544                }
545                Statement::Drop(drop_statement) => {
546                    drop_table::handle_drop_table(
547                        handler_args,
548                        drop_statement.object_name,
549                        drop_statement.if_exists,
550                        matches!(drop_statement.drop_mode, AstOption::Some(DropMode::Cascade)),
551                    )
552                    .await?;
553                }
554                Statement::SetVariable {
555                    local: _,
556                    variable,
557                    value,
558                } => {
559                    variable::handle_set(handler_args, variable, value).unwrap();
560                }
561                Statement::Explain {
562                    analyze,
563                    statement,
564                    options,
565                } => {
566                    if result.is_some() {
567                        panic!("two queries in one test case");
568                    }
569                    let rsp = Box::pin(explain::handle_explain(
570                        handler_args,
571                        *statement,
572                        options,
573                        analyze,
574                    ))
575                    .await?;
576
577                    let explain_output = get_explain_output(rsp).await;
578                    let ret = TestCaseResult {
579                        explain_output: Some(explain_output),
580                        ..Default::default()
581                    };
582                    if do_check_result {
583                        check_result(self, &ret)?;
584                    }
585                    result = Some(ret);
586                }
587                Statement::CreateSchema {
588                    schema_name,
589                    if_not_exists,
590                    owner,
591                } => {
592                    create_schema::handle_create_schema(
593                        handler_args,
594                        schema_name,
595                        if_not_exists,
596                        owner,
597                    )
598                    .await?;
599                }
600                _ => return Err(anyhow!("Unsupported statement type")),
601            }
602        }
603        Ok(result)
604    }
605
606    fn apply_query(
607        &self,
608        stmt: &Statement,
609        context: OptimizerContextRef,
610    ) -> Result<TestCaseResult> {
611        let session = context.session_ctx().clone();
612        let mut ret = TestCaseResult::default();
613
614        let bound = {
615            let mut binder = Binder::new_for_batch(&session);
616            match binder.bind(stmt.clone()) {
617                Ok(bound) => bound,
618                Err(err) => {
619                    ret.binder_error = Some(err.to_report_string_pretty());
620                    return Ok(ret);
621                }
622            }
623        };
624
625        let mut planner = Planner::new_for_stream(context.clone());
626
627        let plan_root = match planner.plan(bound) {
628            Ok(plan_root) => {
629                if self.expected_outputs.contains(&TestType::LogicalPlan) {
630                    ret.logical_plan =
631                        Some(explain_plan(&plan_root.clone().into_unordered_subplan()));
632                }
633                plan_root
634            }
635            Err(err) => {
636                ret.planner_error = Some(err.to_report_string_pretty());
637                return Ok(ret);
638            }
639        };
640
641        if self
642            .expected_outputs
643            .contains(&TestType::OptimizedLogicalPlanForBatch)
644            || self.expected_outputs.contains(&TestType::OptimizerError)
645        {
646            let plan_root = plan_root.clone();
647            let optimized_logical_plan_for_batch =
648                match plan_root.gen_optimized_logical_plan_for_batch() {
649                    Ok(optimized_logical_plan_for_batch) => optimized_logical_plan_for_batch,
650                    Err(err) => {
651                        ret.optimizer_error = Some(err.to_report_string_pretty());
652                        return Ok(ret);
653                    }
654                };
655
656            // Only generate optimized_logical_plan_for_batch if it is specified in test case
657            if self
658                .expected_outputs
659                .contains(&TestType::OptimizedLogicalPlanForBatch)
660            {
661                ret.optimized_logical_plan_for_batch =
662                    Some(explain_plan(&optimized_logical_plan_for_batch.plan));
663            }
664        }
665
666        if self
667            .expected_outputs
668            .contains(&TestType::OptimizedLogicalPlanForStream)
669            || self.expected_outputs.contains(&TestType::OptimizerError)
670        {
671            let plan_root = plan_root.clone();
672            let optimized_logical_plan_for_stream =
673                match plan_root.gen_optimized_logical_plan_for_stream() {
674                    Ok(optimized_logical_plan_for_stream) => optimized_logical_plan_for_stream,
675                    Err(err) => {
676                        ret.optimizer_error = Some(err.to_report_string_pretty());
677                        return Ok(ret);
678                    }
679                };
680
681            // Only generate optimized_logical_plan_for_stream if it is specified in test case
682            if self
683                .expected_outputs
684                .contains(&TestType::OptimizedLogicalPlanForStream)
685            {
686                ret.optimized_logical_plan_for_stream =
687                    Some(explain_plan(&optimized_logical_plan_for_stream.plan));
688            }
689        }
690
691        'batch: {
692            if self.expected_outputs.contains(&TestType::BatchPlan)
693                || self.expected_outputs.contains(&TestType::BatchPlanProto)
694                || self.expected_outputs.contains(&TestType::BatchError)
695            {
696                let plan_root = plan_root.clone();
697                let batch_plan = match plan_root.gen_batch_plan() {
698                    Ok(batch_plan) => match batch_plan.gen_batch_distributed_plan() {
699                        Ok(batch_plan) => batch_plan,
700                        Err(err) => {
701                            ret.batch_error = Some(err.to_report_string_pretty());
702                            break 'batch;
703                        }
704                    },
705                    Err(err) => {
706                        ret.batch_error = Some(err.to_report_string_pretty());
707                        break 'batch;
708                    }
709                };
710
711                // Only generate batch_plan if it is specified in test case
712                if self.expected_outputs.contains(&TestType::BatchPlan) {
713                    ret.batch_plan = Some(explain_plan(&batch_plan));
714                }
715
716                // Only generate batch_plan_proto if it is specified in test case
717                if self.expected_outputs.contains(&TestType::BatchPlanProto) {
718                    ret.batch_plan_proto = Some(serde_yaml::to_string(
719                        &batch_plan.to_batch_prost_identity(false)?,
720                    )?);
721                }
722            }
723        }
724
725        'local_batch: {
726            if self.expected_outputs.contains(&TestType::BatchLocalPlan)
727                || self.expected_outputs.contains(&TestType::BatchError)
728            {
729                let plan_root = plan_root.clone();
730                let batch_plan = match plan_root.gen_batch_plan() {
731                    Ok(batch_plan) => match batch_plan.gen_batch_local_plan() {
732                        Ok(batch_plan) => batch_plan,
733                        Err(err) => {
734                            ret.batch_error = Some(err.to_report_string_pretty());
735                            break 'local_batch;
736                        }
737                    },
738                    Err(err) => {
739                        ret.batch_error = Some(err.to_report_string_pretty());
740                        break 'local_batch;
741                    }
742                };
743
744                // Only generate batch_plan if it is specified in test case
745                if self.expected_outputs.contains(&TestType::BatchLocalPlan) {
746                    ret.batch_local_plan = Some(explain_plan(&batch_plan));
747                }
748            }
749        }
750
751        'distributed_batch: {
752            if self
753                .expected_outputs
754                .contains(&TestType::BatchDistributedPlan)
755                || self.expected_outputs.contains(&TestType::BatchError)
756            {
757                let plan_root = plan_root.clone();
758                let batch_plan = match plan_root.gen_batch_plan() {
759                    Ok(batch_plan) => match batch_plan.gen_batch_distributed_plan() {
760                        Ok(batch_plan) => batch_plan,
761                        Err(err) => {
762                            ret.batch_error = Some(err.to_report_string_pretty());
763                            break 'distributed_batch;
764                        }
765                    },
766                    Err(err) => {
767                        ret.batch_error = Some(err.to_report_string_pretty());
768                        break 'distributed_batch;
769                    }
770                };
771
772                // Only generate batch_plan if it is specified in test case
773                if self
774                    .expected_outputs
775                    .contains(&TestType::BatchDistributedPlan)
776                {
777                    ret.batch_distributed_plan = Some(explain_plan(&batch_plan));
778                }
779            }
780        }
781
782        {
783            // stream
784            for (
785                emit_mode,
786                plan,
787                ret_plan_str,
788                dist_plan,
789                ret_dist_plan_str,
790                error,
791                ret_error_str,
792            ) in [
793                (
794                    EmitMode::Immediately,
795                    self.expected_outputs.contains(&TestType::StreamPlan),
796                    &mut ret.stream_plan,
797                    self.expected_outputs.contains(&TestType::StreamDistPlan),
798                    &mut ret.stream_dist_plan,
799                    self.expected_outputs.contains(&TestType::StreamError),
800                    &mut ret.stream_error,
801                ),
802                (
803                    EmitMode::OnWindowClose,
804                    self.expected_outputs.contains(&TestType::EowcStreamPlan),
805                    &mut ret.eowc_stream_plan,
806                    self.expected_outputs
807                        .contains(&TestType::EowcStreamDistPlan),
808                    &mut ret.eowc_stream_dist_plan,
809                    self.expected_outputs.contains(&TestType::EowcStreamError),
810                    &mut ret.eowc_stream_error,
811                ),
812            ] {
813                if !plan && !dist_plan && !error {
814                    continue;
815                }
816
817                let q = if let Statement::Query(q) = stmt {
818                    q.as_ref().clone()
819                } else {
820                    return Err(anyhow!("expect a query"));
821                };
822
823                let (stream_plan, table) = match create_mv::gen_create_mv_plan(
824                    &session,
825                    context.clone(),
826                    q,
827                    ObjectName(vec!["test".into()]),
828                    vec![],
829                    Some(emit_mode),
830                ) {
831                    Ok(r) => r,
832                    Err(err) => {
833                        *ret_error_str = Some(err.to_report_string_pretty());
834                        continue;
835                    }
836                };
837
838                // Only generate stream_plan if it is specified in test case
839                if plan {
840                    *ret_plan_str = Some(explain_plan(&stream_plan));
841                }
842
843                // Only generate stream_dist_plan if it is specified in test case
844                if dist_plan {
845                    let graph = build_graph(stream_plan.clone(), None)?;
846                    *ret_dist_plan_str =
847                        Some(explain_stream_graph(&graph, Some(table.to_prost()), false));
848                }
849
850                if self.expected_outputs.contains(&TestType::BackfillOrderPlan) {
851                    match explain_backfill_order_in_dot_format(
852                        &session,
853                        BackfillOrderStrategy::Auto,
854                        stream_plan,
855                    ) {
856                        Ok(formatted_order_plan) => {
857                            ret.backfill_order_plan = Some(formatted_order_plan);
858                        }
859                        Err(err) => {
860                            *ret_error_str = Some(err.to_report_string_pretty());
861                        }
862                    }
863                }
864            }
865        }
866
867        'sink: {
868            if self.expected_outputs.contains(&TestType::SinkPlan) {
869                let plan_root = plan_root;
870                let sink_name = "sink_test";
871                let mut options = BTreeMap::new();
872                options.insert("connector".to_owned(), "blackhole".to_owned());
873                options.insert("type".to_owned(), "append-only".to_owned());
874                // let options = WithOptionsSecResolved::without_secrets(options);
875                let options = WithOptionsSecResolved::without_secrets(options);
876                let format_desc = (&options).try_into().unwrap();
877                match plan_root.gen_sink_plan(
878                    sink_name.to_owned(),
879                    format!("CREATE SINK {sink_name} AS {}", stmt),
880                    options,
881                    false,
882                    "test_db".into(),
883                    "test_table".into(),
884                    format_desc,
885                    false,
886                    None,
887                    None,
888                    false,
889                    None,
890                    true,
891                ) {
892                    Ok(sink_plan) => {
893                        ret.sink_plan = Some(explain_plan(&sink_plan.into()));
894                        break 'sink;
895                    }
896                    Err(err) => {
897                        ret.sink_error = Some(err.to_report_string_pretty());
898                        break 'sink;
899                    }
900                }
901            }
902        }
903
904        Ok(ret)
905    }
906}
907
908fn explain_plan(plan: &PlanRef<impl ConventionMarker>) -> String {
909    plan.explain_to_string()
910}
911
912/// Checks that the result matches `test_case.expected_outputs`.
913///
914/// We don't check the result matches here.
915fn check_result(test_case: &TestCase, actual: &TestCaseResult) -> Result<()> {
916    macro_rules! check {
917        ($field:ident) => {
918            paste::paste! {
919                let case_contains = test_case.expected_outputs.contains(&TestType:: [< $field:camel >]  );
920                let actual_contains = &actual.$field;
921                match (case_contains, actual_contains) {
922                    (false, None) | (true, Some(_)) => {},
923                    (false, Some(e)) => return Err(anyhow!("unexpected {}: {}", stringify!($field), e)),
924                    (true, None) => return Err(anyhow!(
925                        "expected {}, but there's no such result during execution",
926                        stringify!($field)
927                    )),
928                }
929            }
930        };
931    }
932
933    check!(binder_error);
934    check!(planner_error);
935    check!(optimizer_error);
936    check!(batch_error);
937    check!(batch_local_error);
938    check!(stream_error);
939    check!(eowc_stream_error);
940
941    check!(logical_plan);
942    check!(optimized_logical_plan_for_batch);
943    check!(optimized_logical_plan_for_stream);
944    check!(batch_plan);
945    check!(batch_local_plan);
946    check!(stream_plan);
947    check!(stream_dist_plan);
948    check!(eowc_stream_plan);
949    check!(eowc_stream_dist_plan);
950    check!(batch_plan_proto);
951    check!(sink_plan);
952
953    check!(explain_output);
954
955    Ok(())
956}
957
958/// `/tests/testdata` directory.
959pub fn test_data_dir() -> PathBuf {
960    std::path::Path::new(env!("CARGO_MANIFEST_DIR"))
961        .join("tests")
962        .join("testdata")
963}
964
965pub async fn run_test_file(file_path: &Path, file_content: &str) -> Result<()> {
966    let file_name = file_path.file_name().unwrap().to_str().unwrap();
967    println!("-- running {file_name} --");
968
969    let mut failed_num = 0;
970    let cases: Vec<TestCase> = serde_yaml::from_str(file_content).map_err(|e| {
971        let context = if let Some(loc) = e.location() {
972            format!(
973                "failed to parse yaml at {}:{}:{}",
974                file_path.display(),
975                loc.line(),
976                loc.column()
977            )
978        } else {
979            "failed to parse yaml".to_owned()
980        };
981        anyhow::anyhow!(e).context(context)
982    })?;
983    let cases = resolve_testcase_id(cases).expect("failed to resolve");
984    let mut outputs = vec![];
985
986    for (i, c) in cases.into_iter().enumerate() {
987        println!(
988            "Running test #{i} (id: {}), SQL:\n{}",
989            c.id().clone().unwrap_or_else(|| "<none>".to_owned()),
990            c.sql()
991        );
992        match Box::pin(c.run(true)).await {
993            Ok(case) => {
994                outputs.push(case);
995            }
996            Err(e) => {
997                eprintln!(
998                    "Test #{i} (id: {}) failed, SQL:\n{}\nError: {}",
999                    c.id().clone().unwrap_or_else(|| "<none>".to_owned()),
1000                    c.sql(),
1001                    e.as_report()
1002                );
1003                failed_num += 1;
1004            }
1005        }
1006    }
1007
1008    let output_path = test_data_dir().join("output").join(file_name);
1009    check(outputs, expect_test::expect_file![output_path]);
1010
1011    if failed_num > 0 {
1012        println!("\n");
1013        bail!(format!("{} test cases failed", failed_num));
1014    }
1015    Ok(())
1016}