1#![allow(clippy::derive_partial_eq_without_eq)]
16
17risingwave_expr_impl::enable!();
20
21mod resolve_id;
22
23use std::collections::{BTreeMap, HashSet};
24use std::path::{Path, PathBuf};
25use std::sync::Arc;
26
27use anyhow::{Result, anyhow, bail};
28pub use resolve_id::*;
29use risingwave_frontend::handler::util::SourceSchemaCompatExt;
30use risingwave_frontend::handler::{
31 HandlerArgs, create_index, create_mv, create_schema, create_source, create_table, create_view,
32 drop_table, explain, variable,
33};
34use risingwave_frontend::optimizer::backfill_order_strategy::explain_backfill_order_in_dot_format;
35use risingwave_frontend::optimizer::plan_node::ConventionMarker;
36use risingwave_frontend::session::SessionImpl;
37use risingwave_frontend::test_utils::{LocalFrontend, create_proto_file, get_explain_output};
38use risingwave_frontend::{
39 Binder, Explain, FrontendOpts, OptimizerContext, OptimizerContextRef, PlanRef, Planner,
40 WithOptionsSecResolved, build_graph, explain_stream_graph,
41};
42use risingwave_sqlparser::ast::{
43 AstOption, BackfillOrderStrategy, DropMode, EmitMode, ExplainOptions, ObjectName, Statement,
44};
45use risingwave_sqlparser::parser::Parser;
46use serde::{Deserialize, Serialize};
47use thiserror_ext::AsReport;
48
49#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Hash, Eq)]
50#[serde(deny_unknown_fields, rename_all = "snake_case")]
51pub enum TestType {
52 ExplainOutput,
57
58 LogicalPlan,
60 OptimizedLogicalPlanForBatch,
62 OptimizedLogicalPlanForStream,
64
65 BatchPlan,
67 BatchPlanProto,
69 BatchLocalPlan,
71 BatchDistributedPlan,
73
74 StreamPlan,
76 StreamDistPlan,
78 EowcStreamPlan,
80 EowcStreamDistPlan,
82 BackfillOrderPlan,
84
85 SinkPlan,
88
89 BinderError,
90 PlannerError,
91 OptimizerError,
92 BatchError,
93 BatchLocalError,
94 StreamError,
95 EowcStreamError,
96}
97
98pub fn check(actual: Vec<TestCaseResult>, expect: expect_test::ExpectFile) {
99 let actual = serde_yaml::to_string(&actual).unwrap();
100 expect.assert_eq(&format!("# This file is automatically generated. See `src/frontend/planner_test/README.md` for more information.\n{}",actual));
101}
102
103#[serde_with::skip_serializing_none]
104#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Default)]
105#[serde(deny_unknown_fields)]
106pub struct TestInput {
107 pub id: Option<String>,
109 pub name: Option<String>,
111 pub before: Option<Vec<String>>,
113 #[serde(skip_serializing)]
115 before_statements: Option<Vec<String>>,
116 pub sql: String,
118}
119
120#[serde_with::skip_serializing_none]
121#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Default)]
122#[serde(deny_unknown_fields)]
123pub struct TestCase {
124 #[serde(flatten)]
125 pub input: TestInput,
126
127 pub create_source: Option<CreateConnector>,
130 pub create_table_with_connector: Option<CreateConnector>,
132 pub with_config_map: Option<BTreeMap<String, String>>,
134
135 pub expected_outputs: HashSet<TestType>,
137}
138
139impl TestCase {
140 pub fn id(&self) -> &Option<String> {
141 &self.input.id
142 }
143
144 pub fn name(&self) -> &Option<String> {
145 &self.input.name
146 }
147
148 pub fn before(&self) -> &Option<Vec<String>> {
149 &self.input.before
150 }
151
152 pub fn before_statements(&self) -> &Option<Vec<String>> {
153 &self.input.before_statements
154 }
155
156 pub fn sql(&self) -> &String {
157 &self.input.sql
158 }
159
160 pub fn create_source(&self) -> &Option<CreateConnector> {
161 &self.create_source
162 }
163
164 pub fn create_table_with_connector(&self) -> &Option<CreateConnector> {
165 &self.create_table_with_connector
166 }
167
168 pub fn with_config_map(&self) -> &Option<BTreeMap<String, String>> {
169 &self.with_config_map
170 }
171}
172
173#[serde_with::skip_serializing_none]
174#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Default)]
175#[serde(deny_unknown_fields)]
176pub struct CreateConnector {
177 format: String,
178 encode: String,
179 name: String,
180 file: Option<String>,
181 is_table: Option<bool>,
182}
183
184#[serde_with::skip_serializing_none]
185#[derive(Debug, PartialEq, Serialize, Deserialize, Default)]
186#[serde(deny_unknown_fields)]
187pub struct TestCaseResult {
188 #[serde(flatten)]
189 pub input: TestInput,
190
191 pub logical_plan: Option<String>,
193
194 pub optimized_logical_plan_for_batch: Option<String>,
196
197 pub optimized_logical_plan_for_stream: Option<String>,
199
200 pub batch_plan: Option<String>,
202
203 pub batch_plan_proto: Option<String>,
205
206 pub batch_local_plan: Option<String>,
208
209 pub batch_distributed_plan: Option<String>,
211
212 pub sink_plan: Option<String>,
214
215 pub stream_plan: Option<String>,
217
218 pub stream_dist_plan: Option<String>,
220
221 pub eowc_stream_plan: Option<String>,
223
224 pub eowc_stream_dist_plan: Option<String>,
226
227 pub backfill_order_plan: Option<String>,
229
230 pub binder_error: Option<String>,
232
233 pub planner_error: Option<String>,
235
236 pub optimizer_error: Option<String>,
238
239 pub batch_error: Option<String>,
241
242 pub batch_local_error: Option<String>,
244
245 pub stream_error: Option<String>,
247
248 pub eowc_stream_error: Option<String>,
250
251 pub sink_error: Option<String>,
253
254 pub explain_output: Option<String>,
259
260 pub create_source: Option<CreateConnector>,
263 pub create_table_with_connector: Option<CreateConnector>,
265 pub with_config_map: Option<BTreeMap<String, String>>,
267}
268
269impl TestCase {
270 pub async fn run(&self, do_check_result: bool) -> Result<TestCaseResult> {
272 let session = {
273 let frontend = LocalFrontend::new(FrontendOpts::default()).await;
274 frontend.session_ref()
275 };
276
277 if let Some(config_map) = self.with_config_map() {
278 for (key, val) in config_map {
279 session.set_config(key, val.to_owned()).unwrap();
280 }
281 }
282
283 let placeholder_empty_vec = vec![];
284
285 self.do_create_source(session.clone()).await?;
287 self.do_create_table_with_connector(session.clone()).await?;
288
289 let mut result: Option<TestCaseResult> = None;
290 for sql in self
291 .before_statements()
292 .as_ref()
293 .unwrap_or(&placeholder_empty_vec)
294 .iter()
295 .chain(std::iter::once(self.sql()))
296 {
297 result = self
298 .run_sql(
299 Arc::from(sql.to_owned()),
300 session.clone(),
301 do_check_result,
302 result,
303 )
304 .await?;
305 }
306
307 let mut result = result.unwrap_or_default();
308 result.input = self.input.clone();
309 result.create_source.clone_from(self.create_source());
310 result
311 .create_table_with_connector
312 .clone_from(self.create_table_with_connector());
313 result.with_config_map.clone_from(self.with_config_map());
314
315 Ok(result)
316 }
317
318 #[inline(always)]
319 fn create_connector_sql(
320 is_table: bool,
321 connector_name: String,
322 connector_format: String,
323 connector_encode: String,
324 ) -> String {
325 let object_to_create = if is_table { "TABLE" } else { "SOURCE" };
326 format!(
327 r#"CREATE {} {}
328 WITH (connector = 'kafka', kafka.topic = 'abc', kafka.brokers = 'localhost:1001')
329 FORMAT {} ENCODE {} (message = '.test.TestRecord', schema.location = 'file://"#,
330 object_to_create, connector_name, connector_format, connector_encode
331 )
332 }
333
334 async fn do_create_table_with_connector(
335 &self,
336 session: Arc<SessionImpl>,
337 ) -> Result<Option<TestCaseResult>> {
338 match self.create_table_with_connector().clone() {
339 Some(connector) => {
340 if let Some(content) = connector.file {
341 let sql = Self::create_connector_sql(
342 true,
343 connector.name,
344 connector.format,
345 connector.encode,
346 );
347 let temp_file = create_proto_file(content.as_str());
348 self.run_sql(
349 Arc::from(sql + temp_file.path().to_str().unwrap() + "')"),
350 session.clone(),
351 false,
352 None,
353 )
354 .await
355 } else {
356 panic!(
357 "{:?} create table with connector must include `file` for the file content",
358 self.id()
359 );
360 }
361 }
362 None => Ok(None),
363 }
364 }
365
366 async fn do_create_source(&self, session: Arc<SessionImpl>) -> Result<Option<TestCaseResult>> {
369 match self.create_source().clone() {
370 Some(source) => {
371 if let Some(content) = source.file {
372 let sql = Self::create_connector_sql(
373 false,
374 source.name,
375 source.format,
376 source.encode,
377 );
378 let temp_file = create_proto_file(content.as_str());
379 self.run_sql(
380 Arc::from(sql + temp_file.path().to_str().unwrap() + "')"),
381 session.clone(),
382 false,
383 None,
384 )
385 .await
386 } else {
387 panic!(
388 "{:?} create source must include `file` for the file content",
389 self.id()
390 );
391 }
392 }
393 None => Ok(None),
394 }
395 }
396
397 async fn run_sql(
398 &self,
399 sql: Arc<str>,
400 session: Arc<SessionImpl>,
401 do_check_result: bool,
402 mut result: Option<TestCaseResult>,
403 ) -> Result<Option<TestCaseResult>> {
404 let statements = Parser::parse_sql(&sql).unwrap();
405 for stmt in statements {
406 let handler_args = HandlerArgs::new(session.clone(), &stmt, sql.clone())?;
408 let _guard = session.txn_begin_implicit();
409 match stmt.clone() {
410 Statement::Query(_)
411 | Statement::Insert { .. }
412 | Statement::Delete { .. }
413 | Statement::Update { .. } => {
414 if result.is_some() {
415 panic!("two queries in one test case");
416 }
417 let explain_options = ExplainOptions {
418 verbose: true,
419 ..Default::default()
420 };
421 let context = OptimizerContext::new(
422 HandlerArgs::new(session.clone(), &stmt, sql.clone())?,
423 explain_options,
424 );
425 let ret = self.apply_query(&stmt, context.into())?;
426 if do_check_result {
427 check_result(self, &ret)?;
428 }
429 result = Some(ret);
430 }
431 Statement::CreateTable {
432 name,
433 columns,
434 constraints,
435 if_not_exists,
436 format_encode,
437 source_watermarks,
438 append_only,
439 on_conflict,
440 with_version_columns,
441 cdc_table_info,
442 include_column_options,
443 wildcard_idx,
444 webhook_info,
445 engine,
446 ..
447 } => {
448 let format_encode = format_encode.map(|schema| schema.into_v2_with_warning());
449
450 create_table::handle_create_table(
451 handler_args,
452 name,
453 columns,
454 wildcard_idx,
455 constraints,
456 if_not_exists,
457 format_encode,
458 source_watermarks,
459 append_only,
460 on_conflict,
461 with_version_columns
462 .iter()
463 .map(|x| x.real_value())
464 .collect(),
465 cdc_table_info,
466 include_column_options,
467 webhook_info,
468 engine,
469 )
470 .await?;
471 }
472 Statement::CreateSource { stmt } => {
473 if let Err(error) =
474 create_source::handle_create_source(handler_args, stmt).await
475 {
476 let actual_result = TestCaseResult {
477 planner_error: Some(error.to_report_string()),
478 ..Default::default()
479 };
480
481 check_result(self, &actual_result)?;
482 result = Some(actual_result);
483 }
484 }
485 Statement::CreateIndex {
486 name,
487 table_name,
488 method,
489 columns,
490 include,
491 distributed_by,
492 if_not_exists,
493 ..
495 } => {
496 create_index::handle_create_index(
497 handler_args,
498 if_not_exists,
499 name,
500 table_name,
501 method,
502 columns,
503 include,
504 distributed_by,
505 )
506 .await?;
507 }
508 Statement::CreateView {
509 materialized: true,
510 or_replace: false,
511 if_not_exists,
512 name,
513 query,
514 columns,
515 emit_mode,
516 ..
517 } => {
518 create_mv::handle_create_mv(
519 handler_args,
520 if_not_exists,
521 name,
522 *query,
523 columns,
524 emit_mode,
525 )
526 .await?;
527 }
528 Statement::CreateView {
529 materialized: false,
530 or_replace: false,
531 if_not_exists,
532 name,
533 query,
534 columns,
535 ..
536 } => {
537 create_view::handle_create_view(
538 handler_args,
539 if_not_exists,
540 name,
541 columns,
542 *query,
543 )
544 .await?;
545 }
546 Statement::Drop(drop_statement) => {
547 drop_table::handle_drop_table(
548 handler_args,
549 drop_statement.object_name,
550 drop_statement.if_exists,
551 matches!(drop_statement.drop_mode, AstOption::Some(DropMode::Cascade)),
552 )
553 .await?;
554 }
555 Statement::SetVariable {
556 local: _,
557 variable,
558 value,
559 } => {
560 variable::handle_set(handler_args, variable, value).unwrap();
561 }
562 Statement::Explain {
563 analyze,
564 statement,
565 options,
566 } => {
567 if result.is_some() {
568 panic!("two queries in one test case");
569 }
570 let rsp =
571 explain::handle_explain(handler_args, *statement, options, analyze).await?;
572
573 let explain_output = get_explain_output(rsp).await;
574 let ret = TestCaseResult {
575 explain_output: Some(explain_output),
576 ..Default::default()
577 };
578 if do_check_result {
579 check_result(self, &ret)?;
580 }
581 result = Some(ret);
582 }
583 Statement::CreateSchema {
584 schema_name,
585 if_not_exists,
586 owner,
587 } => {
588 create_schema::handle_create_schema(
589 handler_args,
590 schema_name,
591 if_not_exists,
592 owner,
593 )
594 .await?;
595 }
596 _ => return Err(anyhow!("Unsupported statement type")),
597 }
598 }
599 Ok(result)
600 }
601
602 fn apply_query(
603 &self,
604 stmt: &Statement,
605 context: OptimizerContextRef,
606 ) -> Result<TestCaseResult> {
607 let session = context.session_ctx().clone();
608 let mut ret = TestCaseResult::default();
609
610 let bound = {
611 let mut binder = Binder::new_for_batch(&session);
612 match binder.bind(stmt.clone()) {
613 Ok(bound) => bound,
614 Err(err) => {
615 ret.binder_error = Some(err.to_report_string_pretty());
616 return Ok(ret);
617 }
618 }
619 };
620
621 let mut planner = Planner::new_for_stream(context.clone());
622
623 let plan_root = match planner.plan(bound) {
624 Ok(plan_root) => {
625 if self.expected_outputs.contains(&TestType::LogicalPlan) {
626 ret.logical_plan =
627 Some(explain_plan(&plan_root.clone().into_unordered_subplan()));
628 }
629 plan_root
630 }
631 Err(err) => {
632 ret.planner_error = Some(err.to_report_string_pretty());
633 return Ok(ret);
634 }
635 };
636
637 if self
638 .expected_outputs
639 .contains(&TestType::OptimizedLogicalPlanForBatch)
640 || self.expected_outputs.contains(&TestType::OptimizerError)
641 {
642 let plan_root = plan_root.clone();
643 let optimized_logical_plan_for_batch =
644 match plan_root.gen_optimized_logical_plan_for_batch() {
645 Ok(optimized_logical_plan_for_batch) => optimized_logical_plan_for_batch,
646 Err(err) => {
647 ret.optimizer_error = Some(err.to_report_string_pretty());
648 return Ok(ret);
649 }
650 };
651
652 if self
654 .expected_outputs
655 .contains(&TestType::OptimizedLogicalPlanForBatch)
656 {
657 ret.optimized_logical_plan_for_batch =
658 Some(explain_plan(&optimized_logical_plan_for_batch.plan));
659 }
660 }
661
662 if self
663 .expected_outputs
664 .contains(&TestType::OptimizedLogicalPlanForStream)
665 || self.expected_outputs.contains(&TestType::OptimizerError)
666 {
667 let plan_root = plan_root.clone();
668 let optimized_logical_plan_for_stream =
669 match plan_root.gen_optimized_logical_plan_for_stream() {
670 Ok(optimized_logical_plan_for_stream) => optimized_logical_plan_for_stream,
671 Err(err) => {
672 ret.optimizer_error = Some(err.to_report_string_pretty());
673 return Ok(ret);
674 }
675 };
676
677 if self
679 .expected_outputs
680 .contains(&TestType::OptimizedLogicalPlanForStream)
681 {
682 ret.optimized_logical_plan_for_stream =
683 Some(explain_plan(&optimized_logical_plan_for_stream.plan));
684 }
685 }
686
687 'batch: {
688 if self.expected_outputs.contains(&TestType::BatchPlan)
689 || self.expected_outputs.contains(&TestType::BatchPlanProto)
690 || self.expected_outputs.contains(&TestType::BatchError)
691 {
692 let plan_root = plan_root.clone();
693 let batch_plan = match plan_root.gen_batch_plan() {
694 Ok(batch_plan) => match batch_plan.gen_batch_distributed_plan() {
695 Ok(batch_plan) => batch_plan,
696 Err(err) => {
697 ret.batch_error = Some(err.to_report_string_pretty());
698 break 'batch;
699 }
700 },
701 Err(err) => {
702 ret.batch_error = Some(err.to_report_string_pretty());
703 break 'batch;
704 }
705 };
706
707 if self.expected_outputs.contains(&TestType::BatchPlan) {
709 ret.batch_plan = Some(explain_plan(&batch_plan));
710 }
711
712 if self.expected_outputs.contains(&TestType::BatchPlanProto) {
714 ret.batch_plan_proto = Some(serde_yaml::to_string(
715 &batch_plan.to_batch_prost_identity(false)?,
716 )?);
717 }
718 }
719 }
720
721 'local_batch: {
722 if self.expected_outputs.contains(&TestType::BatchLocalPlan)
723 || self.expected_outputs.contains(&TestType::BatchError)
724 {
725 let plan_root = plan_root.clone();
726 let batch_plan = match plan_root.gen_batch_plan() {
727 Ok(batch_plan) => match batch_plan.gen_batch_local_plan() {
728 Ok(batch_plan) => batch_plan,
729 Err(err) => {
730 ret.batch_error = Some(err.to_report_string_pretty());
731 break 'local_batch;
732 }
733 },
734 Err(err) => {
735 ret.batch_error = Some(err.to_report_string_pretty());
736 break 'local_batch;
737 }
738 };
739
740 if self.expected_outputs.contains(&TestType::BatchLocalPlan) {
742 ret.batch_local_plan = Some(explain_plan(&batch_plan));
743 }
744 }
745 }
746
747 'distributed_batch: {
748 if self
749 .expected_outputs
750 .contains(&TestType::BatchDistributedPlan)
751 || self.expected_outputs.contains(&TestType::BatchError)
752 {
753 let plan_root = plan_root.clone();
754 let batch_plan = match plan_root.gen_batch_plan() {
755 Ok(batch_plan) => match batch_plan.gen_batch_distributed_plan() {
756 Ok(batch_plan) => batch_plan,
757 Err(err) => {
758 ret.batch_error = Some(err.to_report_string_pretty());
759 break 'distributed_batch;
760 }
761 },
762 Err(err) => {
763 ret.batch_error = Some(err.to_report_string_pretty());
764 break 'distributed_batch;
765 }
766 };
767
768 if self
770 .expected_outputs
771 .contains(&TestType::BatchDistributedPlan)
772 {
773 ret.batch_distributed_plan = Some(explain_plan(&batch_plan));
774 }
775 }
776 }
777
778 {
779 for (
781 emit_mode,
782 plan,
783 ret_plan_str,
784 dist_plan,
785 ret_dist_plan_str,
786 error,
787 ret_error_str,
788 ) in [
789 (
790 EmitMode::Immediately,
791 self.expected_outputs.contains(&TestType::StreamPlan),
792 &mut ret.stream_plan,
793 self.expected_outputs.contains(&TestType::StreamDistPlan),
794 &mut ret.stream_dist_plan,
795 self.expected_outputs.contains(&TestType::StreamError),
796 &mut ret.stream_error,
797 ),
798 (
799 EmitMode::OnWindowClose,
800 self.expected_outputs.contains(&TestType::EowcStreamPlan),
801 &mut ret.eowc_stream_plan,
802 self.expected_outputs
803 .contains(&TestType::EowcStreamDistPlan),
804 &mut ret.eowc_stream_dist_plan,
805 self.expected_outputs.contains(&TestType::EowcStreamError),
806 &mut ret.eowc_stream_error,
807 ),
808 ] {
809 if !plan && !dist_plan && !error {
810 continue;
811 }
812
813 let q = if let Statement::Query(q) = stmt {
814 q.as_ref().clone()
815 } else {
816 return Err(anyhow!("expect a query"));
817 };
818
819 let (stream_plan, table) = match create_mv::gen_create_mv_plan(
820 &session,
821 context.clone(),
822 q,
823 ObjectName(vec!["test".into()]),
824 vec![],
825 Some(emit_mode),
826 ) {
827 Ok(r) => r,
828 Err(err) => {
829 *ret_error_str = Some(err.to_report_string_pretty());
830 continue;
831 }
832 };
833
834 if plan {
836 *ret_plan_str = Some(explain_plan(&stream_plan));
837 }
838
839 if dist_plan {
841 let graph = build_graph(stream_plan.clone(), None)?;
842 *ret_dist_plan_str =
843 Some(explain_stream_graph(&graph, Some(table.to_prost()), false));
844 }
845
846 if self.expected_outputs.contains(&TestType::BackfillOrderPlan) {
847 match explain_backfill_order_in_dot_format(
848 &session,
849 BackfillOrderStrategy::Auto,
850 stream_plan,
851 ) {
852 Ok(formatted_order_plan) => {
853 ret.backfill_order_plan = Some(formatted_order_plan);
854 }
855 Err(err) => {
856 *ret_error_str = Some(err.to_report_string_pretty());
857 }
858 }
859 }
860 }
861 }
862
863 'sink: {
864 if self.expected_outputs.contains(&TestType::SinkPlan) {
865 let plan_root = plan_root;
866 let sink_name = "sink_test";
867 let mut options = BTreeMap::new();
868 options.insert("connector".to_owned(), "blackhole".to_owned());
869 options.insert("type".to_owned(), "append-only".to_owned());
870 let options = WithOptionsSecResolved::without_secrets(options);
872 let format_desc = (&options).try_into().unwrap();
873 match plan_root.gen_sink_plan(
874 sink_name.to_owned(),
875 format!("CREATE SINK {sink_name} AS {}", stmt),
876 options,
877 false,
878 "test_db".into(),
879 "test_table".into(),
880 format_desc,
881 false,
882 None,
883 None,
884 false,
885 None,
886 ) {
887 Ok(sink_plan) => {
888 ret.sink_plan = Some(explain_plan(&sink_plan.into()));
889 break 'sink;
890 }
891 Err(err) => {
892 ret.sink_error = Some(err.to_report_string_pretty());
893 break 'sink;
894 }
895 }
896 }
897 }
898
899 Ok(ret)
900 }
901}
902
903fn explain_plan(plan: &PlanRef<impl ConventionMarker>) -> String {
904 plan.explain_to_string()
905}
906
907fn check_result(test_case: &TestCase, actual: &TestCaseResult) -> Result<()> {
911 macro_rules! check {
912 ($field:ident) => {
913 paste::paste! {
914 let case_contains = test_case.expected_outputs.contains(&TestType:: [< $field:camel >] );
915 let actual_contains = &actual.$field;
916 match (case_contains, actual_contains) {
917 (false, None) | (true, Some(_)) => {},
918 (false, Some(e)) => return Err(anyhow!("unexpected {}: {}", stringify!($field), e)),
919 (true, None) => return Err(anyhow!(
920 "expected {}, but there's no such result during execution",
921 stringify!($field)
922 )),
923 }
924 }
925 };
926 }
927
928 check!(binder_error);
929 check!(planner_error);
930 check!(optimizer_error);
931 check!(batch_error);
932 check!(batch_local_error);
933 check!(stream_error);
934 check!(eowc_stream_error);
935
936 check!(logical_plan);
937 check!(optimized_logical_plan_for_batch);
938 check!(optimized_logical_plan_for_stream);
939 check!(batch_plan);
940 check!(batch_local_plan);
941 check!(stream_plan);
942 check!(stream_dist_plan);
943 check!(eowc_stream_plan);
944 check!(eowc_stream_dist_plan);
945 check!(batch_plan_proto);
946 check!(sink_plan);
947
948 check!(explain_output);
949
950 Ok(())
951}
952
953pub fn test_data_dir() -> PathBuf {
955 std::path::Path::new(env!("CARGO_MANIFEST_DIR"))
956 .join("tests")
957 .join("testdata")
958}
959
960pub async fn run_test_file(file_path: &Path, file_content: &str) -> Result<()> {
961 let file_name = file_path.file_name().unwrap().to_str().unwrap();
962 println!("-- running {file_name} --");
963
964 let mut failed_num = 0;
965 let cases: Vec<TestCase> = serde_yaml::from_str(file_content).map_err(|e| {
966 let context = if let Some(loc) = e.location() {
967 format!(
968 "failed to parse yaml at {}:{}:{}",
969 file_path.display(),
970 loc.line(),
971 loc.column()
972 )
973 } else {
974 "failed to parse yaml".to_owned()
975 };
976 anyhow::anyhow!(e).context(context)
977 })?;
978 let cases = resolve_testcase_id(cases).expect("failed to resolve");
979 let mut outputs = vec![];
980
981 for (i, c) in cases.into_iter().enumerate() {
982 println!(
983 "Running test #{i} (id: {}), SQL:\n{}",
984 c.id().clone().unwrap_or_else(|| "<none>".to_owned()),
985 c.sql()
986 );
987 match c.run(true).await {
988 Ok(case) => {
989 outputs.push(case);
990 }
991 Err(e) => {
992 eprintln!(
993 "Test #{i} (id: {}) failed, SQL:\n{}\nError: {}",
994 c.id().clone().unwrap_or_else(|| "<none>".to_owned()),
995 c.sql(),
996 e.as_report()
997 );
998 failed_num += 1;
999 }
1000 }
1001 }
1002
1003 let output_path = test_data_dir().join("output").join(file_name);
1004 check(outputs, expect_test::expect_file![output_path]);
1005
1006 if failed_num > 0 {
1007 println!("\n");
1008 bail!(format!("{} test cases failed", failed_num));
1009 }
1010 Ok(())
1011}