1#![allow(clippy::derive_partial_eq_without_eq)]
16
17risingwave_expr_impl::enable!();
20
21mod resolve_id;
22
23use std::collections::{BTreeMap, HashSet};
24use std::path::{Path, PathBuf};
25use std::sync::Arc;
26
27use anyhow::{Result, anyhow, bail};
28pub use resolve_id::*;
29use risingwave_frontend::handler::util::SourceSchemaCompatExt;
30use risingwave_frontend::handler::{
31 HandlerArgs, create_index, create_mv, create_schema, create_source, create_table, create_view,
32 drop_table, explain, variable,
33};
34use risingwave_frontend::optimizer::backfill_order_strategy::explain_backfill_order_in_dot_format;
35use risingwave_frontend::optimizer::plan_node::ConventionMarker;
36use risingwave_frontend::session::SessionImpl;
37use risingwave_frontend::test_utils::{LocalFrontend, create_proto_file, get_explain_output};
38use risingwave_frontend::{
39 Binder, Explain, FrontendOpts, OptimizerContext, OptimizerContextRef, PlanRef, Planner,
40 WithOptionsSecResolved, build_graph, explain_stream_graph,
41};
42use risingwave_sqlparser::ast::{
43 AstOption, BackfillOrderStrategy, DropMode, EmitMode, ExplainOptions, ObjectName, Statement,
44};
45use risingwave_sqlparser::parser::Parser;
46use serde::{Deserialize, Serialize};
47use thiserror_ext::AsReport;
48
49#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Hash, Eq)]
50#[serde(deny_unknown_fields, rename_all = "snake_case")]
51pub enum TestType {
52 ExplainOutput,
57
58 LogicalPlan,
60 OptimizedLogicalPlanForBatch,
62 OptimizedLogicalPlanForStream,
64
65 BatchPlan,
67 BatchPlanProto,
69 BatchLocalPlan,
71 BatchDistributedPlan,
73
74 StreamPlan,
76 StreamDistPlan,
78 EowcStreamPlan,
80 EowcStreamDistPlan,
82 BackfillOrderPlan,
84
85 SinkPlan,
88
89 BinderError,
90 PlannerError,
91 OptimizerError,
92 BatchError,
93 BatchLocalError,
94 StreamError,
95 EowcStreamError,
96}
97
98pub fn check(actual: Vec<TestCaseResult>, expect: expect_test::ExpectFile) {
99 let actual = serde_yaml::to_string(&actual).unwrap();
100 expect.assert_eq(&format!("# This file is automatically generated. See `src/frontend/planner_test/README.md` for more information.\n{}",actual));
101}
102
103#[serde_with::skip_serializing_none]
104#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Default)]
105#[serde(deny_unknown_fields)]
106pub struct TestInput {
107 pub id: Option<String>,
109 pub name: Option<String>,
111 pub before: Option<Vec<String>>,
113 #[serde(skip_serializing)]
115 before_statements: Option<Vec<String>>,
116 pub sql: String,
118}
119
120#[serde_with::skip_serializing_none]
121#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Default)]
122#[serde(deny_unknown_fields)]
123pub struct TestCase {
124 #[serde(flatten)]
125 pub input: TestInput,
126
127 pub create_source: Option<CreateConnector>,
130 pub create_table_with_connector: Option<CreateConnector>,
132 pub with_config_map: Option<BTreeMap<String, String>>,
134
135 pub expected_outputs: HashSet<TestType>,
137}
138
139impl TestCase {
140 pub fn id(&self) -> &Option<String> {
141 &self.input.id
142 }
143
144 pub fn name(&self) -> &Option<String> {
145 &self.input.name
146 }
147
148 pub fn before(&self) -> &Option<Vec<String>> {
149 &self.input.before
150 }
151
152 pub fn before_statements(&self) -> &Option<Vec<String>> {
153 &self.input.before_statements
154 }
155
156 pub fn sql(&self) -> &String {
157 &self.input.sql
158 }
159
160 pub fn create_source(&self) -> &Option<CreateConnector> {
161 &self.create_source
162 }
163
164 pub fn create_table_with_connector(&self) -> &Option<CreateConnector> {
165 &self.create_table_with_connector
166 }
167
168 pub fn with_config_map(&self) -> &Option<BTreeMap<String, String>> {
169 &self.with_config_map
170 }
171}
172
173#[serde_with::skip_serializing_none]
174#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Default)]
175#[serde(deny_unknown_fields)]
176pub struct CreateConnector {
177 format: String,
178 encode: String,
179 name: String,
180 file: Option<String>,
181 is_table: Option<bool>,
182}
183
184#[serde_with::skip_serializing_none]
185#[derive(Debug, PartialEq, Serialize, Deserialize, Default)]
186#[serde(deny_unknown_fields)]
187pub struct TestCaseResult {
188 #[serde(flatten)]
189 pub input: TestInput,
190
191 pub logical_plan: Option<String>,
193
194 pub optimized_logical_plan_for_batch: Option<String>,
196
197 pub optimized_logical_plan_for_stream: Option<String>,
199
200 pub batch_plan: Option<String>,
202
203 pub batch_plan_proto: Option<String>,
205
206 pub batch_local_plan: Option<String>,
208
209 pub batch_distributed_plan: Option<String>,
211
212 pub sink_plan: Option<String>,
214
215 pub stream_plan: Option<String>,
217
218 pub stream_dist_plan: Option<String>,
220
221 pub eowc_stream_plan: Option<String>,
223
224 pub eowc_stream_dist_plan: Option<String>,
226
227 pub backfill_order_plan: Option<String>,
229
230 pub binder_error: Option<String>,
232
233 pub planner_error: Option<String>,
235
236 pub optimizer_error: Option<String>,
238
239 pub batch_error: Option<String>,
241
242 pub batch_local_error: Option<String>,
244
245 pub stream_error: Option<String>,
247
248 pub eowc_stream_error: Option<String>,
250
251 pub sink_error: Option<String>,
253
254 pub explain_output: Option<String>,
259
260 pub create_source: Option<CreateConnector>,
263 pub create_table_with_connector: Option<CreateConnector>,
265 pub with_config_map: Option<BTreeMap<String, String>>,
267}
268
269impl TestCase {
270 pub async fn run(&self, do_check_result: bool) -> Result<TestCaseResult> {
272 let session = {
273 let frontend = LocalFrontend::new(FrontendOpts::default()).await;
274 frontend.session_ref()
275 };
276
277 if let Some(config_map) = self.with_config_map() {
278 for (key, val) in config_map {
279 session.set_config(key, val.to_owned()).unwrap();
280 }
281 }
282
283 let placeholder_empty_vec = vec![];
284
285 Box::pin(self.do_create_source(session.clone())).await?;
287 Box::pin(self.do_create_table_with_connector(session.clone())).await?;
288
289 let mut result: Option<TestCaseResult> = None;
290 for sql in self
291 .before_statements()
292 .as_ref()
293 .unwrap_or(&placeholder_empty_vec)
294 .iter()
295 .chain(std::iter::once(self.sql()))
296 {
297 result = Box::pin(self.run_sql(
298 Arc::from(sql.to_owned()),
299 session.clone(),
300 do_check_result,
301 result,
302 ))
303 .await?;
304 }
305
306 let mut result = result.unwrap_or_default();
307 result.input = self.input.clone();
308 result.create_source.clone_from(self.create_source());
309 result
310 .create_table_with_connector
311 .clone_from(self.create_table_with_connector());
312 result.with_config_map.clone_from(self.with_config_map());
313
314 Ok(result)
315 }
316
317 #[inline(always)]
318 fn create_connector_sql(
319 is_table: bool,
320 connector_name: String,
321 connector_format: String,
322 connector_encode: String,
323 ) -> String {
324 let object_to_create = if is_table { "TABLE" } else { "SOURCE" };
325 format!(
326 r#"CREATE {} {}
327 WITH (connector = 'kafka', kafka.topic = 'abc', kafka.brokers = 'localhost:1001')
328 FORMAT {} ENCODE {} (message = '.test.TestRecord', schema.location = 'file://"#,
329 object_to_create, connector_name, connector_format, connector_encode
330 )
331 }
332
333 async fn do_create_table_with_connector(
334 &self,
335 session: Arc<SessionImpl>,
336 ) -> Result<Option<TestCaseResult>> {
337 match self.create_table_with_connector().clone() {
338 Some(connector) => {
339 if let Some(content) = connector.file {
340 let sql = Self::create_connector_sql(
341 true,
342 connector.name,
343 connector.format,
344 connector.encode,
345 );
346 let temp_file = create_proto_file(content.as_str());
347 Box::pin(self.run_sql(
348 Arc::from(sql + temp_file.path().to_str().unwrap() + "')"),
349 session.clone(),
350 false,
351 None,
352 ))
353 .await
354 } else {
355 panic!(
356 "{:?} create table with connector must include `file` for the file content",
357 self.id()
358 );
359 }
360 }
361 None => Ok(None),
362 }
363 }
364
365 async fn do_create_source(&self, session: Arc<SessionImpl>) -> Result<Option<TestCaseResult>> {
368 match self.create_source().clone() {
369 Some(source) => {
370 if let Some(content) = source.file {
371 let sql = Self::create_connector_sql(
372 false,
373 source.name,
374 source.format,
375 source.encode,
376 );
377 let temp_file = create_proto_file(content.as_str());
378 Box::pin(self.run_sql(
379 Arc::from(sql + temp_file.path().to_str().unwrap() + "')"),
380 session.clone(),
381 false,
382 None,
383 ))
384 .await
385 } else {
386 panic!(
387 "{:?} create source must include `file` for the file content",
388 self.id()
389 );
390 }
391 }
392 None => Ok(None),
393 }
394 }
395
396 async fn run_sql(
397 &self,
398 sql: Arc<str>,
399 session: Arc<SessionImpl>,
400 do_check_result: bool,
401 mut result: Option<TestCaseResult>,
402 ) -> Result<Option<TestCaseResult>> {
403 let statements = Parser::parse_sql(&sql).unwrap();
404 for stmt in statements {
405 let handler_args = HandlerArgs::new(session.clone(), &stmt, sql.clone())?;
407 let _guard = session.txn_begin_implicit();
408 match stmt.clone() {
409 Statement::Query(_)
410 | Statement::Insert { .. }
411 | Statement::Delete { .. }
412 | Statement::Update { .. } => {
413 if result.is_some() {
414 panic!("two queries in one test case");
415 }
416 let explain_options = ExplainOptions {
417 verbose: true,
418 ..Default::default()
419 };
420 let context = OptimizerContext::new(
421 HandlerArgs::new(session.clone(), &stmt, sql.clone())?,
422 explain_options,
423 );
424 let ret = self.apply_query(&stmt, context.into())?;
425 if do_check_result {
426 check_result(self, &ret)?;
427 }
428 result = Some(ret);
429 }
430 Statement::CreateTable {
431 name,
432 columns,
433 constraints,
434 if_not_exists,
435 format_encode,
436 source_watermarks,
437 append_only,
438 on_conflict,
439 with_version_columns,
440 cdc_table_info,
441 include_column_options,
442 wildcard_idx,
443 webhook_info,
444 engine,
445 ..
446 } => {
447 let format_encode = format_encode.map(|schema| schema.into_v2_with_warning());
448
449 Box::pin(create_table::handle_create_table(
450 handler_args,
451 name,
452 columns,
453 wildcard_idx,
454 constraints,
455 if_not_exists,
456 format_encode,
457 source_watermarks,
458 append_only,
459 on_conflict,
460 with_version_columns
461 .iter()
462 .map(|x| x.real_value())
463 .collect(),
464 cdc_table_info,
465 include_column_options,
466 webhook_info,
467 engine,
468 ))
469 .await?;
470 }
471 Statement::CreateSource { stmt } => {
472 if let Err(error) =
473 create_source::handle_create_source(handler_args, stmt).await
474 {
475 let actual_result = TestCaseResult {
476 planner_error: Some(error.to_report_string()),
477 ..Default::default()
478 };
479
480 check_result(self, &actual_result)?;
481 result = Some(actual_result);
482 }
483 }
484 Statement::CreateIndex {
485 name,
486 table_name,
487 method,
488 columns,
489 include,
490 distributed_by,
491 if_not_exists,
492 ..
494 } => {
495 create_index::handle_create_index(
496 handler_args,
497 if_not_exists,
498 name,
499 table_name,
500 method,
501 columns,
502 include,
503 distributed_by,
504 )
505 .await?;
506 }
507 Statement::CreateView {
508 materialized: true,
509 or_replace: false,
510 if_not_exists,
511 name,
512 query,
513 columns,
514 emit_mode,
515 ..
516 } => {
517 create_mv::handle_create_mv(
518 handler_args,
519 if_not_exists,
520 name,
521 *query,
522 columns,
523 emit_mode,
524 )
525 .await?;
526 }
527 Statement::CreateView {
528 materialized: false,
529 or_replace: false,
530 if_not_exists,
531 name,
532 query,
533 columns,
534 ..
535 } => {
536 create_view::handle_create_view(
537 handler_args,
538 if_not_exists,
539 name,
540 columns,
541 *query,
542 )
543 .await?;
544 }
545 Statement::Drop(drop_statement) => {
546 drop_table::handle_drop_table(
547 handler_args,
548 drop_statement.object_name,
549 drop_statement.if_exists,
550 matches!(drop_statement.drop_mode, AstOption::Some(DropMode::Cascade)),
551 )
552 .await?;
553 }
554 Statement::SetVariable {
555 local: _,
556 variable,
557 value,
558 } => {
559 variable::handle_set(handler_args, variable, value).unwrap();
560 }
561 Statement::Explain {
562 analyze,
563 statement,
564 options,
565 } => {
566 if result.is_some() {
567 panic!("two queries in one test case");
568 }
569 let rsp = Box::pin(explain::handle_explain(
570 handler_args,
571 *statement,
572 options,
573 analyze,
574 ))
575 .await?;
576
577 let explain_output = get_explain_output(rsp).await;
578 let ret = TestCaseResult {
579 explain_output: Some(explain_output),
580 ..Default::default()
581 };
582 if do_check_result {
583 check_result(self, &ret)?;
584 }
585 result = Some(ret);
586 }
587 Statement::CreateSchema {
588 schema_name,
589 if_not_exists,
590 owner,
591 } => {
592 create_schema::handle_create_schema(
593 handler_args,
594 schema_name,
595 if_not_exists,
596 owner,
597 )
598 .await?;
599 }
600 _ => return Err(anyhow!("Unsupported statement type")),
601 }
602 }
603 Ok(result)
604 }
605
606 fn apply_query(
607 &self,
608 stmt: &Statement,
609 context: OptimizerContextRef,
610 ) -> Result<TestCaseResult> {
611 let session = context.session_ctx().clone();
612 let mut ret = TestCaseResult::default();
613
614 let bound = {
615 let mut binder = Binder::new_for_batch(&session);
616 match binder.bind(stmt.clone()) {
617 Ok(bound) => bound,
618 Err(err) => {
619 ret.binder_error = Some(err.to_report_string_pretty());
620 return Ok(ret);
621 }
622 }
623 };
624
625 let mut planner = Planner::new_for_stream(context.clone());
626
627 let plan_root = match planner.plan(bound) {
628 Ok(plan_root) => {
629 if self.expected_outputs.contains(&TestType::LogicalPlan) {
630 ret.logical_plan =
631 Some(explain_plan(&plan_root.clone().into_unordered_subplan()));
632 }
633 plan_root
634 }
635 Err(err) => {
636 ret.planner_error = Some(err.to_report_string_pretty());
637 return Ok(ret);
638 }
639 };
640
641 if self
642 .expected_outputs
643 .contains(&TestType::OptimizedLogicalPlanForBatch)
644 || self.expected_outputs.contains(&TestType::OptimizerError)
645 {
646 let plan_root = plan_root.clone();
647 let optimized_logical_plan_for_batch =
648 match plan_root.gen_optimized_logical_plan_for_batch() {
649 Ok(optimized_logical_plan_for_batch) => optimized_logical_plan_for_batch,
650 Err(err) => {
651 ret.optimizer_error = Some(err.to_report_string_pretty());
652 return Ok(ret);
653 }
654 };
655
656 if self
658 .expected_outputs
659 .contains(&TestType::OptimizedLogicalPlanForBatch)
660 {
661 ret.optimized_logical_plan_for_batch =
662 Some(explain_plan(&optimized_logical_plan_for_batch.plan));
663 }
664 }
665
666 if self
667 .expected_outputs
668 .contains(&TestType::OptimizedLogicalPlanForStream)
669 || self.expected_outputs.contains(&TestType::OptimizerError)
670 {
671 let plan_root = plan_root.clone();
672 let optimized_logical_plan_for_stream =
673 match plan_root.gen_optimized_logical_plan_for_stream() {
674 Ok(optimized_logical_plan_for_stream) => optimized_logical_plan_for_stream,
675 Err(err) => {
676 ret.optimizer_error = Some(err.to_report_string_pretty());
677 return Ok(ret);
678 }
679 };
680
681 if self
683 .expected_outputs
684 .contains(&TestType::OptimizedLogicalPlanForStream)
685 {
686 ret.optimized_logical_plan_for_stream =
687 Some(explain_plan(&optimized_logical_plan_for_stream.plan));
688 }
689 }
690
691 'batch: {
692 if self.expected_outputs.contains(&TestType::BatchPlan)
693 || self.expected_outputs.contains(&TestType::BatchPlanProto)
694 || self.expected_outputs.contains(&TestType::BatchError)
695 {
696 let plan_root = plan_root.clone();
697 let batch_plan = match plan_root.gen_batch_plan() {
698 Ok(batch_plan) => match batch_plan.gen_batch_distributed_plan() {
699 Ok(batch_plan) => batch_plan,
700 Err(err) => {
701 ret.batch_error = Some(err.to_report_string_pretty());
702 break 'batch;
703 }
704 },
705 Err(err) => {
706 ret.batch_error = Some(err.to_report_string_pretty());
707 break 'batch;
708 }
709 };
710
711 if self.expected_outputs.contains(&TestType::BatchPlan) {
713 ret.batch_plan = Some(explain_plan(&batch_plan));
714 }
715
716 if self.expected_outputs.contains(&TestType::BatchPlanProto) {
718 ret.batch_plan_proto = Some(serde_yaml::to_string(
719 &batch_plan.to_batch_prost_identity(false)?,
720 )?);
721 }
722 }
723 }
724
725 'local_batch: {
726 if self.expected_outputs.contains(&TestType::BatchLocalPlan)
727 || self.expected_outputs.contains(&TestType::BatchError)
728 {
729 let plan_root = plan_root.clone();
730 let batch_plan = match plan_root.gen_batch_plan() {
731 Ok(batch_plan) => match batch_plan.gen_batch_local_plan() {
732 Ok(batch_plan) => batch_plan,
733 Err(err) => {
734 ret.batch_error = Some(err.to_report_string_pretty());
735 break 'local_batch;
736 }
737 },
738 Err(err) => {
739 ret.batch_error = Some(err.to_report_string_pretty());
740 break 'local_batch;
741 }
742 };
743
744 if self.expected_outputs.contains(&TestType::BatchLocalPlan) {
746 ret.batch_local_plan = Some(explain_plan(&batch_plan));
747 }
748 }
749 }
750
751 'distributed_batch: {
752 if self
753 .expected_outputs
754 .contains(&TestType::BatchDistributedPlan)
755 || self.expected_outputs.contains(&TestType::BatchError)
756 {
757 let plan_root = plan_root.clone();
758 let batch_plan = match plan_root.gen_batch_plan() {
759 Ok(batch_plan) => match batch_plan.gen_batch_distributed_plan() {
760 Ok(batch_plan) => batch_plan,
761 Err(err) => {
762 ret.batch_error = Some(err.to_report_string_pretty());
763 break 'distributed_batch;
764 }
765 },
766 Err(err) => {
767 ret.batch_error = Some(err.to_report_string_pretty());
768 break 'distributed_batch;
769 }
770 };
771
772 if self
774 .expected_outputs
775 .contains(&TestType::BatchDistributedPlan)
776 {
777 ret.batch_distributed_plan = Some(explain_plan(&batch_plan));
778 }
779 }
780 }
781
782 {
783 for (
785 emit_mode,
786 plan,
787 ret_plan_str,
788 dist_plan,
789 ret_dist_plan_str,
790 error,
791 ret_error_str,
792 ) in [
793 (
794 EmitMode::Immediately,
795 self.expected_outputs.contains(&TestType::StreamPlan),
796 &mut ret.stream_plan,
797 self.expected_outputs.contains(&TestType::StreamDistPlan),
798 &mut ret.stream_dist_plan,
799 self.expected_outputs.contains(&TestType::StreamError),
800 &mut ret.stream_error,
801 ),
802 (
803 EmitMode::OnWindowClose,
804 self.expected_outputs.contains(&TestType::EowcStreamPlan),
805 &mut ret.eowc_stream_plan,
806 self.expected_outputs
807 .contains(&TestType::EowcStreamDistPlan),
808 &mut ret.eowc_stream_dist_plan,
809 self.expected_outputs.contains(&TestType::EowcStreamError),
810 &mut ret.eowc_stream_error,
811 ),
812 ] {
813 if !plan && !dist_plan && !error {
814 continue;
815 }
816
817 let q = if let Statement::Query(q) = stmt {
818 q.as_ref().clone()
819 } else {
820 return Err(anyhow!("expect a query"));
821 };
822
823 let (stream_plan, table) = match create_mv::gen_create_mv_plan(
824 &session,
825 context.clone(),
826 q,
827 ObjectName(vec!["test".into()]),
828 vec![],
829 Some(emit_mode),
830 ) {
831 Ok(r) => r,
832 Err(err) => {
833 *ret_error_str = Some(err.to_report_string_pretty());
834 continue;
835 }
836 };
837
838 if plan {
840 *ret_plan_str = Some(explain_plan(&stream_plan));
841 }
842
843 if dist_plan {
845 let graph = build_graph(stream_plan.clone(), None)?;
846 *ret_dist_plan_str =
847 Some(explain_stream_graph(&graph, Some(table.to_prost()), false));
848 }
849
850 if self.expected_outputs.contains(&TestType::BackfillOrderPlan) {
851 match explain_backfill_order_in_dot_format(
852 &session,
853 BackfillOrderStrategy::Auto,
854 stream_plan,
855 ) {
856 Ok(formatted_order_plan) => {
857 ret.backfill_order_plan = Some(formatted_order_plan);
858 }
859 Err(err) => {
860 *ret_error_str = Some(err.to_report_string_pretty());
861 }
862 }
863 }
864 }
865 }
866
867 'sink: {
868 if self.expected_outputs.contains(&TestType::SinkPlan) {
869 let plan_root = plan_root;
870 let sink_name = "sink_test";
871 let mut options = BTreeMap::new();
872 options.insert("connector".to_owned(), "blackhole".to_owned());
873 options.insert("type".to_owned(), "append-only".to_owned());
874 let options = WithOptionsSecResolved::without_secrets(options);
876 let format_desc = (&options).try_into().unwrap();
877 match plan_root.gen_sink_plan(
878 sink_name.to_owned(),
879 format!("CREATE SINK {sink_name} AS {}", stmt),
880 options,
881 false,
882 "test_db".into(),
883 "test_table".into(),
884 format_desc,
885 false,
886 None,
887 None,
888 false,
889 None,
890 true,
891 ) {
892 Ok(sink_plan) => {
893 ret.sink_plan = Some(explain_plan(&sink_plan.into()));
894 break 'sink;
895 }
896 Err(err) => {
897 ret.sink_error = Some(err.to_report_string_pretty());
898 break 'sink;
899 }
900 }
901 }
902 }
903
904 Ok(ret)
905 }
906}
907
908fn explain_plan(plan: &PlanRef<impl ConventionMarker>) -> String {
909 plan.explain_to_string()
910}
911
912fn check_result(test_case: &TestCase, actual: &TestCaseResult) -> Result<()> {
916 macro_rules! check {
917 ($field:ident) => {
918 paste::paste! {
919 let case_contains = test_case.expected_outputs.contains(&TestType:: [< $field:camel >] );
920 let actual_contains = &actual.$field;
921 match (case_contains, actual_contains) {
922 (false, None) | (true, Some(_)) => {},
923 (false, Some(e)) => return Err(anyhow!("unexpected {}: {}", stringify!($field), e)),
924 (true, None) => return Err(anyhow!(
925 "expected {}, but there's no such result during execution",
926 stringify!($field)
927 )),
928 }
929 }
930 };
931 }
932
933 check!(binder_error);
934 check!(planner_error);
935 check!(optimizer_error);
936 check!(batch_error);
937 check!(batch_local_error);
938 check!(stream_error);
939 check!(eowc_stream_error);
940
941 check!(logical_plan);
942 check!(optimized_logical_plan_for_batch);
943 check!(optimized_logical_plan_for_stream);
944 check!(batch_plan);
945 check!(batch_local_plan);
946 check!(stream_plan);
947 check!(stream_dist_plan);
948 check!(eowc_stream_plan);
949 check!(eowc_stream_dist_plan);
950 check!(batch_plan_proto);
951 check!(sink_plan);
952
953 check!(explain_output);
954
955 Ok(())
956}
957
958pub fn test_data_dir() -> PathBuf {
960 std::path::Path::new(env!("CARGO_MANIFEST_DIR"))
961 .join("tests")
962 .join("testdata")
963}
964
965pub async fn run_test_file(file_path: &Path, file_content: &str) -> Result<()> {
966 let file_name = file_path.file_name().unwrap().to_str().unwrap();
967 println!("-- running {file_name} --");
968
969 let mut failed_num = 0;
970 let cases: Vec<TestCase> = serde_yaml::from_str(file_content).map_err(|e| {
971 let context = if let Some(loc) = e.location() {
972 format!(
973 "failed to parse yaml at {}:{}:{}",
974 file_path.display(),
975 loc.line(),
976 loc.column()
977 )
978 } else {
979 "failed to parse yaml".to_owned()
980 };
981 anyhow::anyhow!(e).context(context)
982 })?;
983 let cases = resolve_testcase_id(cases).expect("failed to resolve");
984 let mut outputs = vec![];
985
986 for (i, c) in cases.into_iter().enumerate() {
987 println!(
988 "Running test #{i} (id: {}), SQL:\n{}",
989 c.id().clone().unwrap_or_else(|| "<none>".to_owned()),
990 c.sql()
991 );
992 match Box::pin(c.run(true)).await {
993 Ok(case) => {
994 outputs.push(case);
995 }
996 Err(e) => {
997 eprintln!(
998 "Test #{i} (id: {}) failed, SQL:\n{}\nError: {}",
999 c.id().clone().unwrap_or_else(|| "<none>".to_owned()),
1000 c.sql(),
1001 e.as_report()
1002 );
1003 failed_num += 1;
1004 }
1005 }
1006 }
1007
1008 let output_path = test_data_dir().join("output").join(file_name);
1009 check(outputs, expect_test::expect_file![output_path]);
1010
1011 if failed_num > 0 {
1012 println!("\n");
1013 bail!(format!("{} test cases failed", failed_num));
1014 }
1015 Ok(())
1016}