1#![feature(let_chains)]
16#![allow(clippy::derive_partial_eq_without_eq)]
17
18risingwave_expr_impl::enable!();
21
22mod resolve_id;
23
24use std::collections::{BTreeMap, HashSet};
25use std::path::{Path, PathBuf};
26use std::sync::Arc;
27
28use anyhow::{Result, anyhow, bail};
29pub use resolve_id::*;
30use risingwave_frontend::handler::util::SourceSchemaCompatExt;
31use risingwave_frontend::handler::{
32 HandlerArgs, create_index, create_mv, create_schema, create_source, create_table, create_view,
33 drop_table, explain, variable,
34};
35use risingwave_frontend::optimizer::backfill_order_strategy::explain_backfill_order_in_dot_format;
36use risingwave_frontend::session::SessionImpl;
37use risingwave_frontend::test_utils::{LocalFrontend, create_proto_file, get_explain_output};
38use risingwave_frontend::{
39 Binder, Explain, FrontendOpts, OptimizerContext, OptimizerContextRef, PlanRef, Planner,
40 WithOptionsSecResolved, build_graph, explain_stream_graph,
41};
42use risingwave_sqlparser::ast::{
43 AstOption, BackfillOrderStrategy, DropMode, EmitMode, ExplainOptions, ObjectName, Statement,
44};
45use risingwave_sqlparser::parser::Parser;
46use serde::{Deserialize, Serialize};
47use thiserror_ext::AsReport;
48
49#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Hash, Eq)]
50#[serde(deny_unknown_fields, rename_all = "snake_case")]
51pub enum TestType {
52 ExplainOutput,
57
58 LogicalPlan,
60 OptimizedLogicalPlanForBatch,
62 OptimizedLogicalPlanForStream,
64
65 BatchPlan,
67 BatchPlanProto,
69 BatchLocalPlan,
71 BatchDistributedPlan,
73
74 StreamPlan,
76 StreamDistPlan,
78 EowcStreamPlan,
80 EowcStreamDistPlan,
82 BackfillOrderPlan,
84
85 SinkPlan,
88
89 BinderError,
90 PlannerError,
91 OptimizerError,
92 BatchError,
93 BatchLocalError,
94 StreamError,
95 EowcStreamError,
96}
97
98pub fn check(actual: Vec<TestCaseResult>, expect: expect_test::ExpectFile) {
99 let actual = serde_yaml::to_string(&actual).unwrap();
100 expect.assert_eq(&format!("# This file is automatically generated. See `src/frontend/planner_test/README.md` for more information.\n{}",actual));
101}
102
103#[serde_with::skip_serializing_none]
104#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Default)]
105#[serde(deny_unknown_fields)]
106pub struct TestInput {
107 pub id: Option<String>,
109 pub name: Option<String>,
111 pub before: Option<Vec<String>>,
113 #[serde(skip_serializing)]
115 before_statements: Option<Vec<String>>,
116 pub sql: String,
118}
119
120#[serde_with::skip_serializing_none]
121#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Default)]
122#[serde(deny_unknown_fields)]
123pub struct TestCase {
124 #[serde(flatten)]
125 pub input: TestInput,
126
127 pub create_source: Option<CreateConnector>,
130 pub create_table_with_connector: Option<CreateConnector>,
132 pub with_config_map: Option<BTreeMap<String, String>>,
134
135 pub expected_outputs: HashSet<TestType>,
137}
138
139impl TestCase {
140 pub fn id(&self) -> &Option<String> {
141 &self.input.id
142 }
143
144 pub fn name(&self) -> &Option<String> {
145 &self.input.name
146 }
147
148 pub fn before(&self) -> &Option<Vec<String>> {
149 &self.input.before
150 }
151
152 pub fn before_statements(&self) -> &Option<Vec<String>> {
153 &self.input.before_statements
154 }
155
156 pub fn sql(&self) -> &String {
157 &self.input.sql
158 }
159
160 pub fn create_source(&self) -> &Option<CreateConnector> {
161 &self.create_source
162 }
163
164 pub fn create_table_with_connector(&self) -> &Option<CreateConnector> {
165 &self.create_table_with_connector
166 }
167
168 pub fn with_config_map(&self) -> &Option<BTreeMap<String, String>> {
169 &self.with_config_map
170 }
171}
172
173#[serde_with::skip_serializing_none]
174#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Default)]
175#[serde(deny_unknown_fields)]
176pub struct CreateConnector {
177 format: String,
178 encode: String,
179 name: String,
180 file: Option<String>,
181 is_table: Option<bool>,
182}
183
184#[serde_with::skip_serializing_none]
185#[derive(Debug, PartialEq, Serialize, Deserialize, Default)]
186#[serde(deny_unknown_fields)]
187pub struct TestCaseResult {
188 #[serde(flatten)]
189 pub input: TestInput,
190
191 pub logical_plan: Option<String>,
193
194 pub optimized_logical_plan_for_batch: Option<String>,
196
197 pub optimized_logical_plan_for_stream: Option<String>,
199
200 pub batch_plan: Option<String>,
202
203 pub batch_plan_proto: Option<String>,
205
206 pub batch_local_plan: Option<String>,
208
209 pub batch_distributed_plan: Option<String>,
211
212 pub sink_plan: Option<String>,
214
215 pub stream_plan: Option<String>,
217
218 pub stream_dist_plan: Option<String>,
220
221 pub eowc_stream_plan: Option<String>,
223
224 pub eowc_stream_dist_plan: Option<String>,
226
227 pub backfill_order_plan: Option<String>,
229
230 pub binder_error: Option<String>,
232
233 pub planner_error: Option<String>,
235
236 pub optimizer_error: Option<String>,
238
239 pub batch_error: Option<String>,
241
242 pub batch_local_error: Option<String>,
244
245 pub stream_error: Option<String>,
247
248 pub eowc_stream_error: Option<String>,
250
251 pub sink_error: Option<String>,
253
254 pub explain_output: Option<String>,
259
260 pub create_source: Option<CreateConnector>,
263 pub create_table_with_connector: Option<CreateConnector>,
265 pub with_config_map: Option<BTreeMap<String, String>>,
267}
268
269impl TestCase {
270 pub async fn run(&self, do_check_result: bool) -> Result<TestCaseResult> {
272 let session = {
273 let frontend = LocalFrontend::new(FrontendOpts::default()).await;
274 frontend.session_ref()
275 };
276
277 if let Some(config_map) = self.with_config_map() {
278 for (key, val) in config_map {
279 session.set_config(key, val.to_owned()).unwrap();
280 }
281 }
282
283 let placeholder_empty_vec = vec![];
284
285 self.do_create_source(session.clone()).await?;
287 self.do_create_table_with_connector(session.clone()).await?;
288
289 let mut result: Option<TestCaseResult> = None;
290 for sql in self
291 .before_statements()
292 .as_ref()
293 .unwrap_or(&placeholder_empty_vec)
294 .iter()
295 .chain(std::iter::once(self.sql()))
296 {
297 result = self
298 .run_sql(
299 Arc::from(sql.to_owned()),
300 session.clone(),
301 do_check_result,
302 result,
303 )
304 .await?;
305 }
306
307 let mut result = result.unwrap_or_default();
308 result.input = self.input.clone();
309 result.create_source.clone_from(self.create_source());
310 result
311 .create_table_with_connector
312 .clone_from(self.create_table_with_connector());
313 result.with_config_map.clone_from(self.with_config_map());
314
315 Ok(result)
316 }
317
318 #[inline(always)]
319 fn create_connector_sql(
320 is_table: bool,
321 connector_name: String,
322 connector_format: String,
323 connector_encode: String,
324 ) -> String {
325 let object_to_create = if is_table { "TABLE" } else { "SOURCE" };
326 format!(
327 r#"CREATE {} {}
328 WITH (connector = 'kafka', kafka.topic = 'abc', kafka.brokers = 'localhost:1001')
329 FORMAT {} ENCODE {} (message = '.test.TestRecord', schema.location = 'file://"#,
330 object_to_create, connector_name, connector_format, connector_encode
331 )
332 }
333
334 async fn do_create_table_with_connector(
335 &self,
336 session: Arc<SessionImpl>,
337 ) -> Result<Option<TestCaseResult>> {
338 match self.create_table_with_connector().clone() {
339 Some(connector) => {
340 if let Some(content) = connector.file {
341 let sql = Self::create_connector_sql(
342 true,
343 connector.name,
344 connector.format,
345 connector.encode,
346 );
347 let temp_file = create_proto_file(content.as_str());
348 self.run_sql(
349 Arc::from(sql + temp_file.path().to_str().unwrap() + "')"),
350 session.clone(),
351 false,
352 None,
353 )
354 .await
355 } else {
356 panic!(
357 "{:?} create table with connector must include `file` for the file content",
358 self.id()
359 );
360 }
361 }
362 None => Ok(None),
363 }
364 }
365
366 async fn do_create_source(&self, session: Arc<SessionImpl>) -> Result<Option<TestCaseResult>> {
369 match self.create_source().clone() {
370 Some(source) => {
371 if let Some(content) = source.file {
372 let sql = Self::create_connector_sql(
373 false,
374 source.name,
375 source.format,
376 source.encode,
377 );
378 let temp_file = create_proto_file(content.as_str());
379 self.run_sql(
380 Arc::from(sql + temp_file.path().to_str().unwrap() + "')"),
381 session.clone(),
382 false,
383 None,
384 )
385 .await
386 } else {
387 panic!(
388 "{:?} create source must include `file` for the file content",
389 self.id()
390 );
391 }
392 }
393 None => Ok(None),
394 }
395 }
396
397 async fn run_sql(
398 &self,
399 sql: Arc<str>,
400 session: Arc<SessionImpl>,
401 do_check_result: bool,
402 mut result: Option<TestCaseResult>,
403 ) -> Result<Option<TestCaseResult>> {
404 let statements = Parser::parse_sql(&sql).unwrap();
405 for stmt in statements {
406 let handler_args = HandlerArgs::new(session.clone(), &stmt, sql.clone())?;
408 let _guard = session.txn_begin_implicit();
409 match stmt.clone() {
410 Statement::Query(_)
411 | Statement::Insert { .. }
412 | Statement::Delete { .. }
413 | Statement::Update { .. } => {
414 if result.is_some() {
415 panic!("two queries in one test case");
416 }
417 let explain_options = ExplainOptions {
418 verbose: true,
419 ..Default::default()
420 };
421 let context = OptimizerContext::new(
422 HandlerArgs::new(session.clone(), &stmt, sql.clone())?,
423 explain_options,
424 );
425 let ret = self.apply_query(&stmt, context.into())?;
426 if do_check_result {
427 check_result(self, &ret)?;
428 }
429 result = Some(ret);
430 }
431 Statement::CreateTable {
432 name,
433 columns,
434 constraints,
435 if_not_exists,
436 format_encode,
437 source_watermarks,
438 append_only,
439 on_conflict,
440 with_version_column,
441 cdc_table_info,
442 include_column_options,
443 wildcard_idx,
444 webhook_info,
445 engine,
446 ..
447 } => {
448 let format_encode = format_encode.map(|schema| schema.into_v2_with_warning());
449
450 create_table::handle_create_table(
451 handler_args,
452 name,
453 columns,
454 wildcard_idx,
455 constraints,
456 if_not_exists,
457 format_encode,
458 source_watermarks,
459 append_only,
460 on_conflict,
461 with_version_column.map(|x| x.real_value()),
462 cdc_table_info,
463 include_column_options,
464 webhook_info,
465 engine,
466 )
467 .await?;
468 }
469 Statement::CreateSource { stmt } => {
470 if let Err(error) =
471 create_source::handle_create_source(handler_args, stmt).await
472 {
473 let actual_result = TestCaseResult {
474 planner_error: Some(error.to_report_string()),
475 ..Default::default()
476 };
477
478 check_result(self, &actual_result)?;
479 result = Some(actual_result);
480 }
481 }
482 Statement::CreateIndex {
483 name,
484 table_name,
485 columns,
486 include,
487 distributed_by,
488 if_not_exists,
489 ..
491 } => {
492 create_index::handle_create_index(
493 handler_args,
494 if_not_exists,
495 name,
496 table_name,
497 columns,
498 include,
499 distributed_by,
500 )
501 .await?;
502 }
503 Statement::CreateView {
504 materialized: true,
505 or_replace: false,
506 if_not_exists,
507 name,
508 query,
509 columns,
510 emit_mode,
511 ..
512 } => {
513 create_mv::handle_create_mv(
514 handler_args,
515 if_not_exists,
516 name,
517 *query,
518 columns,
519 emit_mode,
520 )
521 .await?;
522 }
523 Statement::CreateView {
524 materialized: false,
525 or_replace: false,
526 if_not_exists,
527 name,
528 query,
529 columns,
530 ..
531 } => {
532 create_view::handle_create_view(
533 handler_args,
534 if_not_exists,
535 name,
536 columns,
537 *query,
538 )
539 .await?;
540 }
541 Statement::Drop(drop_statement) => {
542 drop_table::handle_drop_table(
543 handler_args,
544 drop_statement.object_name,
545 drop_statement.if_exists,
546 matches!(drop_statement.drop_mode, AstOption::Some(DropMode::Cascade)),
547 )
548 .await?;
549 }
550 Statement::SetVariable {
551 local: _,
552 variable,
553 value,
554 } => {
555 variable::handle_set(handler_args, variable, value).unwrap();
556 }
557 Statement::Explain {
558 analyze,
559 statement,
560 options,
561 } => {
562 if result.is_some() {
563 panic!("two queries in one test case");
564 }
565 let rsp =
566 explain::handle_explain(handler_args, *statement, options, analyze).await?;
567
568 let explain_output = get_explain_output(rsp).await;
569 let ret = TestCaseResult {
570 explain_output: Some(explain_output),
571 ..Default::default()
572 };
573 if do_check_result {
574 check_result(self, &ret)?;
575 }
576 result = Some(ret);
577 }
578 Statement::CreateSchema {
579 schema_name,
580 if_not_exists,
581 owner,
582 } => {
583 create_schema::handle_create_schema(
584 handler_args,
585 schema_name,
586 if_not_exists,
587 owner,
588 )
589 .await?;
590 }
591 _ => return Err(anyhow!("Unsupported statement type")),
592 }
593 }
594 Ok(result)
595 }
596
597 fn apply_query(
598 &self,
599 stmt: &Statement,
600 context: OptimizerContextRef,
601 ) -> Result<TestCaseResult> {
602 let session = context.session_ctx().clone();
603 let mut ret = TestCaseResult::default();
604
605 let bound = {
606 let mut binder = Binder::new(&session);
607 match binder.bind(stmt.clone()) {
608 Ok(bound) => bound,
609 Err(err) => {
610 ret.binder_error = Some(err.to_report_string_pretty());
611 return Ok(ret);
612 }
613 }
614 };
615
616 let mut planner = Planner::new_for_stream(context.clone());
617
618 let plan_root = match planner.plan(bound) {
619 Ok(plan_root) => {
620 if self.expected_outputs.contains(&TestType::LogicalPlan) {
621 ret.logical_plan =
622 Some(explain_plan(&plan_root.clone().into_unordered_subplan()));
623 }
624 plan_root
625 }
626 Err(err) => {
627 ret.planner_error = Some(err.to_report_string_pretty());
628 return Ok(ret);
629 }
630 };
631
632 if self
633 .expected_outputs
634 .contains(&TestType::OptimizedLogicalPlanForBatch)
635 || self.expected_outputs.contains(&TestType::OptimizerError)
636 {
637 let plan_root = plan_root.clone();
638 let optimized_logical_plan_for_batch =
639 match plan_root.gen_optimized_logical_plan_for_batch() {
640 Ok(optimized_logical_plan_for_batch) => optimized_logical_plan_for_batch,
641 Err(err) => {
642 ret.optimizer_error = Some(err.to_report_string_pretty());
643 return Ok(ret);
644 }
645 };
646
647 if self
649 .expected_outputs
650 .contains(&TestType::OptimizedLogicalPlanForBatch)
651 {
652 ret.optimized_logical_plan_for_batch =
653 Some(explain_plan(&optimized_logical_plan_for_batch.plan));
654 }
655 }
656
657 if self
658 .expected_outputs
659 .contains(&TestType::OptimizedLogicalPlanForStream)
660 || self.expected_outputs.contains(&TestType::OptimizerError)
661 {
662 let plan_root = plan_root.clone();
663 let optimized_logical_plan_for_stream =
664 match plan_root.gen_optimized_logical_plan_for_stream() {
665 Ok(optimized_logical_plan_for_stream) => optimized_logical_plan_for_stream,
666 Err(err) => {
667 ret.optimizer_error = Some(err.to_report_string_pretty());
668 return Ok(ret);
669 }
670 };
671
672 if self
674 .expected_outputs
675 .contains(&TestType::OptimizedLogicalPlanForStream)
676 {
677 ret.optimized_logical_plan_for_stream =
678 Some(explain_plan(&optimized_logical_plan_for_stream.plan));
679 }
680 }
681
682 'batch: {
683 if self.expected_outputs.contains(&TestType::BatchPlan)
684 || self.expected_outputs.contains(&TestType::BatchPlanProto)
685 || self.expected_outputs.contains(&TestType::BatchError)
686 {
687 let plan_root = plan_root.clone();
688 let batch_plan = match plan_root.gen_batch_plan() {
689 Ok(batch_plan) => match batch_plan.gen_batch_distributed_plan() {
690 Ok(batch_plan) => batch_plan,
691 Err(err) => {
692 ret.batch_error = Some(err.to_report_string_pretty());
693 break 'batch;
694 }
695 },
696 Err(err) => {
697 ret.batch_error = Some(err.to_report_string_pretty());
698 break 'batch;
699 }
700 };
701
702 if self.expected_outputs.contains(&TestType::BatchPlan) {
704 ret.batch_plan = Some(explain_plan(&batch_plan));
705 }
706
707 if self.expected_outputs.contains(&TestType::BatchPlanProto) {
709 ret.batch_plan_proto = Some(serde_yaml::to_string(
710 &batch_plan.to_batch_prost_identity(false)?,
711 )?);
712 }
713 }
714 }
715
716 'local_batch: {
717 if self.expected_outputs.contains(&TestType::BatchLocalPlan)
718 || self.expected_outputs.contains(&TestType::BatchError)
719 {
720 let plan_root = plan_root.clone();
721 let batch_plan = match plan_root.gen_batch_plan() {
722 Ok(batch_plan) => match batch_plan.gen_batch_local_plan() {
723 Ok(batch_plan) => batch_plan,
724 Err(err) => {
725 ret.batch_error = Some(err.to_report_string_pretty());
726 break 'local_batch;
727 }
728 },
729 Err(err) => {
730 ret.batch_error = Some(err.to_report_string_pretty());
731 break 'local_batch;
732 }
733 };
734
735 if self.expected_outputs.contains(&TestType::BatchLocalPlan) {
737 ret.batch_local_plan = Some(explain_plan(&batch_plan));
738 }
739 }
740 }
741
742 'distributed_batch: {
743 if self
744 .expected_outputs
745 .contains(&TestType::BatchDistributedPlan)
746 || self.expected_outputs.contains(&TestType::BatchError)
747 {
748 let plan_root = plan_root.clone();
749 let batch_plan = match plan_root.gen_batch_plan() {
750 Ok(batch_plan) => match batch_plan.gen_batch_distributed_plan() {
751 Ok(batch_plan) => batch_plan,
752 Err(err) => {
753 ret.batch_error = Some(err.to_report_string_pretty());
754 break 'distributed_batch;
755 }
756 },
757 Err(err) => {
758 ret.batch_error = Some(err.to_report_string_pretty());
759 break 'distributed_batch;
760 }
761 };
762
763 if self
765 .expected_outputs
766 .contains(&TestType::BatchDistributedPlan)
767 {
768 ret.batch_distributed_plan = Some(explain_plan(&batch_plan));
769 }
770 }
771 }
772
773 {
774 for (
776 emit_mode,
777 plan,
778 ret_plan_str,
779 dist_plan,
780 ret_dist_plan_str,
781 error,
782 ret_error_str,
783 ) in [
784 (
785 EmitMode::Immediately,
786 self.expected_outputs.contains(&TestType::StreamPlan),
787 &mut ret.stream_plan,
788 self.expected_outputs.contains(&TestType::StreamDistPlan),
789 &mut ret.stream_dist_plan,
790 self.expected_outputs.contains(&TestType::StreamError),
791 &mut ret.stream_error,
792 ),
793 (
794 EmitMode::OnWindowClose,
795 self.expected_outputs.contains(&TestType::EowcStreamPlan),
796 &mut ret.eowc_stream_plan,
797 self.expected_outputs
798 .contains(&TestType::EowcStreamDistPlan),
799 &mut ret.eowc_stream_dist_plan,
800 self.expected_outputs.contains(&TestType::EowcStreamError),
801 &mut ret.eowc_stream_error,
802 ),
803 ] {
804 if !plan && !dist_plan && !error {
805 continue;
806 }
807
808 let q = if let Statement::Query(q) = stmt {
809 q.as_ref().clone()
810 } else {
811 return Err(anyhow!("expect a query"));
812 };
813
814 let (stream_plan, table) = match create_mv::gen_create_mv_plan(
815 &session,
816 context.clone(),
817 q,
818 ObjectName(vec!["test".into()]),
819 vec![],
820 Some(emit_mode),
821 ) {
822 Ok(r) => r,
823 Err(err) => {
824 *ret_error_str = Some(err.to_report_string_pretty());
825 continue;
826 }
827 };
828
829 if plan {
831 *ret_plan_str = Some(explain_plan(&stream_plan));
832 }
833
834 if dist_plan {
836 let graph = build_graph(stream_plan.clone(), None)?;
837 *ret_dist_plan_str =
838 Some(explain_stream_graph(&graph, Some(table.to_prost()), false));
839 }
840
841 if self.expected_outputs.contains(&TestType::BackfillOrderPlan) {
842 match explain_backfill_order_in_dot_format(
843 &session,
844 BackfillOrderStrategy::Auto,
845 stream_plan,
846 ) {
847 Ok(formatted_order_plan) => {
848 ret.backfill_order_plan = Some(formatted_order_plan);
849 }
850 Err(err) => {
851 *ret_error_str = Some(err.to_report_string_pretty());
852 }
853 }
854 }
855 }
856 }
857
858 'sink: {
859 if self.expected_outputs.contains(&TestType::SinkPlan) {
860 let plan_root = plan_root.clone();
861 let sink_name = "sink_test";
862 let mut options = BTreeMap::new();
863 options.insert("connector".to_owned(), "blackhole".to_owned());
864 options.insert("type".to_owned(), "append-only".to_owned());
865 let options = WithOptionsSecResolved::without_secrets(options);
867 let format_desc = (&options).try_into().unwrap();
868 match plan_root.gen_sink_plan(
869 sink_name.to_owned(),
870 format!("CREATE SINK {sink_name} AS {}", stmt),
871 options,
872 false,
873 "test_db".into(),
874 "test_table".into(),
875 format_desc,
876 false,
877 None,
878 None,
879 false,
880 ) {
881 Ok(sink_plan) => {
882 ret.sink_plan = Some(explain_plan(&sink_plan.into()));
883 break 'sink;
884 }
885 Err(err) => {
886 ret.sink_error = Some(err.to_report_string_pretty());
887 break 'sink;
888 }
889 }
890 }
891 }
892
893 Ok(ret)
894 }
895}
896
897fn explain_plan(plan: &PlanRef) -> String {
898 plan.explain_to_string()
899}
900
901fn check_result(test_case: &TestCase, actual: &TestCaseResult) -> Result<()> {
905 macro_rules! check {
906 ($field:ident) => {
907 paste::paste! {
908 let case_contains = test_case.expected_outputs.contains(&TestType:: [< $field:camel >] );
909 let actual_contains = &actual.$field;
910 match (case_contains, actual_contains) {
911 (false, None) | (true, Some(_)) => {},
912 (false, Some(e)) => return Err(anyhow!("unexpected {}: {}", stringify!($field), e)),
913 (true, None) => return Err(anyhow!(
914 "expected {}, but there's no such result during execution",
915 stringify!($field)
916 )),
917 }
918 }
919 };
920 }
921
922 check!(binder_error);
923 check!(planner_error);
924 check!(optimizer_error);
925 check!(batch_error);
926 check!(batch_local_error);
927 check!(stream_error);
928 check!(eowc_stream_error);
929
930 check!(logical_plan);
931 check!(optimized_logical_plan_for_batch);
932 check!(optimized_logical_plan_for_stream);
933 check!(batch_plan);
934 check!(batch_local_plan);
935 check!(stream_plan);
936 check!(stream_dist_plan);
937 check!(eowc_stream_plan);
938 check!(eowc_stream_dist_plan);
939 check!(batch_plan_proto);
940 check!(sink_plan);
941
942 check!(explain_output);
943
944 Ok(())
945}
946
947pub fn test_data_dir() -> PathBuf {
949 std::path::Path::new(env!("CARGO_MANIFEST_DIR"))
950 .join("tests")
951 .join("testdata")
952}
953
954pub async fn run_test_file(file_path: &Path, file_content: &str) -> Result<()> {
955 let file_name = file_path.file_name().unwrap().to_str().unwrap();
956 println!("-- running {file_name} --");
957
958 let mut failed_num = 0;
959 let cases: Vec<TestCase> = serde_yaml::from_str(file_content).map_err(|e| {
960 let context = if let Some(loc) = e.location() {
961 format!(
962 "failed to parse yaml at {}:{}:{}",
963 file_path.display(),
964 loc.line(),
965 loc.column()
966 )
967 } else {
968 "failed to parse yaml".to_owned()
969 };
970 anyhow::anyhow!(e).context(context)
971 })?;
972 let cases = resolve_testcase_id(cases).expect("failed to resolve");
973 let mut outputs = vec![];
974
975 for (i, c) in cases.into_iter().enumerate() {
976 println!(
977 "Running test #{i} (id: {}), SQL:\n{}",
978 c.id().clone().unwrap_or_else(|| "<none>".to_owned()),
979 c.sql()
980 );
981 match c.run(true).await {
982 Ok(case) => {
983 outputs.push(case);
984 }
985 Err(e) => {
986 eprintln!(
987 "Test #{i} (id: {}) failed, SQL:\n{}\nError: {}",
988 c.id().clone().unwrap_or_else(|| "<none>".to_owned()),
989 c.sql(),
990 e.as_report()
991 );
992 failed_num += 1;
993 }
994 }
995 }
996
997 let output_path = test_data_dir().join("output").join(file_name);
998 check(outputs, expect_test::expect_file![output_path]);
999
1000 if failed_num > 0 {
1001 println!("\n");
1002 bail!(format!("{} test cases failed", failed_num));
1003 }
1004 Ok(())
1005}