1#![feature(let_chains)]
16#![allow(clippy::derive_partial_eq_without_eq)]
17
18risingwave_expr_impl::enable!();
21
22mod resolve_id;
23
24use std::collections::{BTreeMap, HashSet};
25use std::path::{Path, PathBuf};
26use std::sync::Arc;
27
28use anyhow::{Result, anyhow, bail};
29pub use resolve_id::*;
30use risingwave_frontend::handler::util::SourceSchemaCompatExt;
31use risingwave_frontend::handler::{
32 HandlerArgs, create_index, create_mv, create_schema, create_source, create_table, create_view,
33 drop_table, explain, variable,
34};
35use risingwave_frontend::session::SessionImpl;
36use risingwave_frontend::test_utils::{LocalFrontend, create_proto_file, get_explain_output};
37use risingwave_frontend::{
38 Binder, Explain, FrontendOpts, OptimizerContext, OptimizerContextRef, PlanRef, Planner,
39 WithOptionsSecResolved, build_graph, explain_stream_graph,
40};
41use risingwave_sqlparser::ast::{
42 AstOption, DropMode, EmitMode, ExplainOptions, ObjectName, Statement,
43};
44use risingwave_sqlparser::parser::Parser;
45use serde::{Deserialize, Serialize};
46use thiserror_ext::AsReport;
47
48#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Hash, Eq)]
49#[serde(deny_unknown_fields, rename_all = "snake_case")]
50pub enum TestType {
51 ExplainOutput,
56
57 LogicalPlan,
59 OptimizedLogicalPlanForBatch,
61 OptimizedLogicalPlanForStream,
63
64 BatchPlan,
66 BatchPlanProto,
68 BatchLocalPlan,
70 BatchDistributedPlan,
72
73 StreamPlan,
75 StreamDistPlan,
77 EowcStreamPlan,
79 EowcStreamDistPlan,
81
82 SinkPlan,
85
86 BinderError,
87 PlannerError,
88 OptimizerError,
89 BatchError,
90 BatchLocalError,
91 StreamError,
92 EowcStreamError,
93}
94
95pub fn check(actual: Vec<TestCaseResult>, expect: expect_test::ExpectFile) {
96 let actual = serde_yaml::to_string(&actual).unwrap();
97 expect.assert_eq(&format!("# This file is automatically generated. See `src/frontend/planner_test/README.md` for more information.\n{}",actual));
98}
99
100#[serde_with::skip_serializing_none]
101#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Default)]
102#[serde(deny_unknown_fields)]
103pub struct TestInput {
104 pub id: Option<String>,
106 pub name: Option<String>,
108 pub before: Option<Vec<String>>,
110 #[serde(skip_serializing)]
112 before_statements: Option<Vec<String>>,
113 pub sql: String,
115}
116
117#[serde_with::skip_serializing_none]
118#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Default)]
119#[serde(deny_unknown_fields)]
120pub struct TestCase {
121 #[serde(flatten)]
122 pub input: TestInput,
123
124 pub create_source: Option<CreateConnector>,
127 pub create_table_with_connector: Option<CreateConnector>,
129 pub with_config_map: Option<BTreeMap<String, String>>,
131
132 pub expected_outputs: HashSet<TestType>,
134}
135
136impl TestCase {
137 pub fn id(&self) -> &Option<String> {
138 &self.input.id
139 }
140
141 pub fn name(&self) -> &Option<String> {
142 &self.input.name
143 }
144
145 pub fn before(&self) -> &Option<Vec<String>> {
146 &self.input.before
147 }
148
149 pub fn before_statements(&self) -> &Option<Vec<String>> {
150 &self.input.before_statements
151 }
152
153 pub fn sql(&self) -> &String {
154 &self.input.sql
155 }
156
157 pub fn create_source(&self) -> &Option<CreateConnector> {
158 &self.create_source
159 }
160
161 pub fn create_table_with_connector(&self) -> &Option<CreateConnector> {
162 &self.create_table_with_connector
163 }
164
165 pub fn with_config_map(&self) -> &Option<BTreeMap<String, String>> {
166 &self.with_config_map
167 }
168}
169
170#[serde_with::skip_serializing_none]
171#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Default)]
172#[serde(deny_unknown_fields)]
173pub struct CreateConnector {
174 format: String,
175 encode: String,
176 name: String,
177 file: Option<String>,
178 is_table: Option<bool>,
179}
180
181#[serde_with::skip_serializing_none]
182#[derive(Debug, PartialEq, Serialize, Deserialize, Default)]
183#[serde(deny_unknown_fields)]
184pub struct TestCaseResult {
185 #[serde(flatten)]
186 pub input: TestInput,
187
188 pub logical_plan: Option<String>,
190
191 pub optimized_logical_plan_for_batch: Option<String>,
193
194 pub optimized_logical_plan_for_stream: Option<String>,
196
197 pub batch_plan: Option<String>,
199
200 pub batch_plan_proto: Option<String>,
202
203 pub batch_local_plan: Option<String>,
205
206 pub batch_distributed_plan: Option<String>,
208
209 pub sink_plan: Option<String>,
211
212 pub stream_plan: Option<String>,
214
215 pub stream_dist_plan: Option<String>,
217
218 pub eowc_stream_plan: Option<String>,
220
221 pub eowc_stream_dist_plan: Option<String>,
223
224 pub binder_error: Option<String>,
226
227 pub planner_error: Option<String>,
229
230 pub optimizer_error: Option<String>,
232
233 pub batch_error: Option<String>,
235
236 pub batch_local_error: Option<String>,
238
239 pub stream_error: Option<String>,
241
242 pub eowc_stream_error: Option<String>,
244
245 pub sink_error: Option<String>,
247
248 pub explain_output: Option<String>,
253
254 pub create_source: Option<CreateConnector>,
257 pub create_table_with_connector: Option<CreateConnector>,
259 pub with_config_map: Option<BTreeMap<String, String>>,
261}
262
263impl TestCase {
264 pub async fn run(&self, do_check_result: bool) -> Result<TestCaseResult> {
266 let session = {
267 let frontend = LocalFrontend::new(FrontendOpts::default()).await;
268 frontend.session_ref()
269 };
270
271 if let Some(config_map) = self.with_config_map() {
272 for (key, val) in config_map {
273 session.set_config(key, val.to_owned()).unwrap();
274 }
275 }
276
277 let placeholder_empty_vec = vec![];
278
279 self.do_create_source(session.clone()).await?;
281 self.do_create_table_with_connector(session.clone()).await?;
282
283 let mut result: Option<TestCaseResult> = None;
284 for sql in self
285 .before_statements()
286 .as_ref()
287 .unwrap_or(&placeholder_empty_vec)
288 .iter()
289 .chain(std::iter::once(self.sql()))
290 {
291 result = self
292 .run_sql(
293 Arc::from(sql.to_owned()),
294 session.clone(),
295 do_check_result,
296 result,
297 )
298 .await?;
299 }
300
301 let mut result = result.unwrap_or_default();
302 result.input = self.input.clone();
303 result.create_source.clone_from(self.create_source());
304 result
305 .create_table_with_connector
306 .clone_from(self.create_table_with_connector());
307 result.with_config_map.clone_from(self.with_config_map());
308
309 Ok(result)
310 }
311
312 #[inline(always)]
313 fn create_connector_sql(
314 is_table: bool,
315 connector_name: String,
316 connector_format: String,
317 connector_encode: String,
318 ) -> String {
319 let object_to_create = if is_table { "TABLE" } else { "SOURCE" };
320 format!(
321 r#"CREATE {} {}
322 WITH (connector = 'kafka', kafka.topic = 'abc', kafka.brokers = 'localhost:1001')
323 FORMAT {} ENCODE {} (message = '.test.TestRecord', schema.location = 'file://"#,
324 object_to_create, connector_name, connector_format, connector_encode
325 )
326 }
327
328 async fn do_create_table_with_connector(
329 &self,
330 session: Arc<SessionImpl>,
331 ) -> Result<Option<TestCaseResult>> {
332 match self.create_table_with_connector().clone() {
333 Some(connector) => {
334 if let Some(content) = connector.file {
335 let sql = Self::create_connector_sql(
336 true,
337 connector.name,
338 connector.format,
339 connector.encode,
340 );
341 let temp_file = create_proto_file(content.as_str());
342 self.run_sql(
343 Arc::from(sql + temp_file.path().to_str().unwrap() + "')"),
344 session.clone(),
345 false,
346 None,
347 )
348 .await
349 } else {
350 panic!(
351 "{:?} create table with connector must include `file` for the file content",
352 self.id()
353 );
354 }
355 }
356 None => Ok(None),
357 }
358 }
359
360 async fn do_create_source(&self, session: Arc<SessionImpl>) -> Result<Option<TestCaseResult>> {
363 match self.create_source().clone() {
364 Some(source) => {
365 if let Some(content) = source.file {
366 let sql = Self::create_connector_sql(
367 false,
368 source.name,
369 source.format,
370 source.encode,
371 );
372 let temp_file = create_proto_file(content.as_str());
373 self.run_sql(
374 Arc::from(sql + temp_file.path().to_str().unwrap() + "')"),
375 session.clone(),
376 false,
377 None,
378 )
379 .await
380 } else {
381 panic!(
382 "{:?} create source must include `file` for the file content",
383 self.id()
384 );
385 }
386 }
387 None => Ok(None),
388 }
389 }
390
391 async fn run_sql(
392 &self,
393 sql: Arc<str>,
394 session: Arc<SessionImpl>,
395 do_check_result: bool,
396 mut result: Option<TestCaseResult>,
397 ) -> Result<Option<TestCaseResult>> {
398 let statements = Parser::parse_sql(&sql).unwrap();
399 for stmt in statements {
400 let handler_args = HandlerArgs::new(session.clone(), &stmt, sql.clone())?;
402 let _guard = session.txn_begin_implicit();
403 match stmt.clone() {
404 Statement::Query(_)
405 | Statement::Insert { .. }
406 | Statement::Delete { .. }
407 | Statement::Update { .. } => {
408 if result.is_some() {
409 panic!("two queries in one test case");
410 }
411 let explain_options = ExplainOptions {
412 verbose: true,
413 ..Default::default()
414 };
415 let context = OptimizerContext::new(
416 HandlerArgs::new(session.clone(), &stmt, sql.clone())?,
417 explain_options,
418 );
419 let ret = self.apply_query(&stmt, context.into())?;
420 if do_check_result {
421 check_result(self, &ret)?;
422 }
423 result = Some(ret);
424 }
425 Statement::CreateTable {
426 name,
427 columns,
428 constraints,
429 if_not_exists,
430 format_encode,
431 source_watermarks,
432 append_only,
433 on_conflict,
434 with_version_column,
435 cdc_table_info,
436 include_column_options,
437 wildcard_idx,
438 webhook_info,
439 engine,
440 ..
441 } => {
442 let format_encode = format_encode.map(|schema| schema.into_v2_with_warning());
443
444 create_table::handle_create_table(
445 handler_args,
446 name,
447 columns,
448 wildcard_idx,
449 constraints,
450 if_not_exists,
451 format_encode,
452 source_watermarks,
453 append_only,
454 on_conflict,
455 with_version_column.map(|x| x.real_value()),
456 cdc_table_info,
457 include_column_options,
458 webhook_info,
459 engine,
460 )
461 .await?;
462 }
463 Statement::CreateSource { stmt } => {
464 if let Err(error) =
465 create_source::handle_create_source(handler_args, stmt).await
466 {
467 let actual_result = TestCaseResult {
468 planner_error: Some(error.to_report_string()),
469 ..Default::default()
470 };
471
472 check_result(self, &actual_result)?;
473 result = Some(actual_result);
474 }
475 }
476 Statement::CreateIndex {
477 name,
478 table_name,
479 columns,
480 include,
481 distributed_by,
482 if_not_exists,
483 ..
485 } => {
486 create_index::handle_create_index(
487 handler_args,
488 if_not_exists,
489 name,
490 table_name,
491 columns,
492 include,
493 distributed_by,
494 )
495 .await?;
496 }
497 Statement::CreateView {
498 materialized: true,
499 or_replace: false,
500 if_not_exists,
501 name,
502 query,
503 columns,
504 emit_mode,
505 ..
506 } => {
507 create_mv::handle_create_mv(
508 handler_args,
509 if_not_exists,
510 name,
511 *query,
512 columns,
513 emit_mode,
514 )
515 .await?;
516 }
517 Statement::CreateView {
518 materialized: false,
519 or_replace: false,
520 if_not_exists,
521 name,
522 query,
523 columns,
524 ..
525 } => {
526 create_view::handle_create_view(
527 handler_args,
528 if_not_exists,
529 name,
530 columns,
531 *query,
532 )
533 .await?;
534 }
535 Statement::Drop(drop_statement) => {
536 drop_table::handle_drop_table(
537 handler_args,
538 drop_statement.object_name,
539 drop_statement.if_exists,
540 matches!(drop_statement.drop_mode, AstOption::Some(DropMode::Cascade)),
541 )
542 .await?;
543 }
544 Statement::SetVariable {
545 local: _,
546 variable,
547 value,
548 } => {
549 variable::handle_set(handler_args, variable, value).unwrap();
550 }
551 Statement::Explain {
552 analyze,
553 statement,
554 options,
555 } => {
556 if result.is_some() {
557 panic!("two queries in one test case");
558 }
559 let rsp =
560 explain::handle_explain(handler_args, *statement, options, analyze).await?;
561
562 let explain_output = get_explain_output(rsp).await;
563 let ret = TestCaseResult {
564 explain_output: Some(explain_output),
565 ..Default::default()
566 };
567 if do_check_result {
568 check_result(self, &ret)?;
569 }
570 result = Some(ret);
571 }
572 Statement::CreateSchema {
573 schema_name,
574 if_not_exists,
575 owner,
576 } => {
577 create_schema::handle_create_schema(
578 handler_args,
579 schema_name,
580 if_not_exists,
581 owner,
582 )
583 .await?;
584 }
585 _ => return Err(anyhow!("Unsupported statement type")),
586 }
587 }
588 Ok(result)
589 }
590
591 fn apply_query(
592 &self,
593 stmt: &Statement,
594 context: OptimizerContextRef,
595 ) -> Result<TestCaseResult> {
596 let session = context.session_ctx().clone();
597 let mut ret = TestCaseResult::default();
598
599 let bound = {
600 let mut binder = Binder::new(&session);
601 match binder.bind(stmt.clone()) {
602 Ok(bound) => bound,
603 Err(err) => {
604 ret.binder_error = Some(err.to_report_string_pretty());
605 return Ok(ret);
606 }
607 }
608 };
609
610 let mut planner = Planner::new_for_stream(context.clone());
611
612 let plan_root = match planner.plan(bound) {
613 Ok(plan_root) => {
614 if self.expected_outputs.contains(&TestType::LogicalPlan) {
615 ret.logical_plan =
616 Some(explain_plan(&plan_root.clone().into_unordered_subplan()));
617 }
618 plan_root
619 }
620 Err(err) => {
621 ret.planner_error = Some(err.to_report_string_pretty());
622 return Ok(ret);
623 }
624 };
625
626 if self
627 .expected_outputs
628 .contains(&TestType::OptimizedLogicalPlanForBatch)
629 || self.expected_outputs.contains(&TestType::OptimizerError)
630 {
631 let mut plan_root = plan_root.clone();
632 let optimized_logical_plan_for_batch =
633 match plan_root.gen_optimized_logical_plan_for_batch() {
634 Ok(optimized_logical_plan_for_batch) => optimized_logical_plan_for_batch,
635 Err(err) => {
636 ret.optimizer_error = Some(err.to_report_string_pretty());
637 return Ok(ret);
638 }
639 };
640
641 if self
643 .expected_outputs
644 .contains(&TestType::OptimizedLogicalPlanForBatch)
645 {
646 ret.optimized_logical_plan_for_batch =
647 Some(explain_plan(&optimized_logical_plan_for_batch));
648 }
649 }
650
651 if self
652 .expected_outputs
653 .contains(&TestType::OptimizedLogicalPlanForStream)
654 || self.expected_outputs.contains(&TestType::OptimizerError)
655 {
656 let mut plan_root = plan_root.clone();
657 let optimized_logical_plan_for_stream =
658 match plan_root.gen_optimized_logical_plan_for_stream() {
659 Ok(optimized_logical_plan_for_stream) => optimized_logical_plan_for_stream,
660 Err(err) => {
661 ret.optimizer_error = Some(err.to_report_string_pretty());
662 return Ok(ret);
663 }
664 };
665
666 if self
668 .expected_outputs
669 .contains(&TestType::OptimizedLogicalPlanForStream)
670 {
671 ret.optimized_logical_plan_for_stream =
672 Some(explain_plan(&optimized_logical_plan_for_stream));
673 }
674 }
675
676 'batch: {
677 if self.expected_outputs.contains(&TestType::BatchPlan)
678 || self.expected_outputs.contains(&TestType::BatchPlanProto)
679 || self.expected_outputs.contains(&TestType::BatchError)
680 {
681 let mut plan_root = plan_root.clone();
682 let batch_plan = match plan_root.gen_batch_plan() {
683 Ok(_batch_plan) => match plan_root.gen_batch_distributed_plan() {
684 Ok(batch_plan) => batch_plan,
685 Err(err) => {
686 ret.batch_error = Some(err.to_report_string_pretty());
687 break 'batch;
688 }
689 },
690 Err(err) => {
691 ret.batch_error = Some(err.to_report_string_pretty());
692 break 'batch;
693 }
694 };
695
696 if self.expected_outputs.contains(&TestType::BatchPlan) {
698 ret.batch_plan = Some(explain_plan(&batch_plan));
699 }
700
701 if self.expected_outputs.contains(&TestType::BatchPlanProto) {
703 ret.batch_plan_proto = Some(serde_yaml::to_string(
704 &batch_plan.to_batch_prost_identity(false)?,
705 )?);
706 }
707 }
708 }
709
710 'local_batch: {
711 if self.expected_outputs.contains(&TestType::BatchLocalPlan)
712 || self.expected_outputs.contains(&TestType::BatchError)
713 {
714 let mut plan_root = plan_root.clone();
715 let batch_plan = match plan_root.gen_batch_plan() {
716 Ok(_batch_plan) => match plan_root.gen_batch_local_plan() {
717 Ok(batch_plan) => batch_plan,
718 Err(err) => {
719 ret.batch_error = Some(err.to_report_string_pretty());
720 break 'local_batch;
721 }
722 },
723 Err(err) => {
724 ret.batch_error = Some(err.to_report_string_pretty());
725 break 'local_batch;
726 }
727 };
728
729 if self.expected_outputs.contains(&TestType::BatchLocalPlan) {
731 ret.batch_local_plan = Some(explain_plan(&batch_plan));
732 }
733 }
734 }
735
736 'distributed_batch: {
737 if self
738 .expected_outputs
739 .contains(&TestType::BatchDistributedPlan)
740 || self.expected_outputs.contains(&TestType::BatchError)
741 {
742 let mut plan_root = plan_root.clone();
743 let batch_plan = match plan_root.gen_batch_plan() {
744 Ok(_batch_plan) => match plan_root.gen_batch_distributed_plan() {
745 Ok(batch_plan) => batch_plan,
746 Err(err) => {
747 ret.batch_error = Some(err.to_report_string_pretty());
748 break 'distributed_batch;
749 }
750 },
751 Err(err) => {
752 ret.batch_error = Some(err.to_report_string_pretty());
753 break 'distributed_batch;
754 }
755 };
756
757 if self
759 .expected_outputs
760 .contains(&TestType::BatchDistributedPlan)
761 {
762 ret.batch_distributed_plan = Some(explain_plan(&batch_plan));
763 }
764 }
765 }
766
767 {
768 for (
770 emit_mode,
771 plan,
772 ret_plan_str,
773 dist_plan,
774 ret_dist_plan_str,
775 error,
776 ret_error_str,
777 ) in [
778 (
779 EmitMode::Immediately,
780 self.expected_outputs.contains(&TestType::StreamPlan),
781 &mut ret.stream_plan,
782 self.expected_outputs.contains(&TestType::StreamDistPlan),
783 &mut ret.stream_dist_plan,
784 self.expected_outputs.contains(&TestType::StreamError),
785 &mut ret.stream_error,
786 ),
787 (
788 EmitMode::OnWindowClose,
789 self.expected_outputs.contains(&TestType::EowcStreamPlan),
790 &mut ret.eowc_stream_plan,
791 self.expected_outputs
792 .contains(&TestType::EowcStreamDistPlan),
793 &mut ret.eowc_stream_dist_plan,
794 self.expected_outputs.contains(&TestType::EowcStreamError),
795 &mut ret.eowc_stream_error,
796 ),
797 ] {
798 if !plan && !dist_plan && !error {
799 continue;
800 }
801
802 let q = if let Statement::Query(q) = stmt {
803 q.as_ref().clone()
804 } else {
805 return Err(anyhow!("expect a query"));
806 };
807
808 let stream_plan = match create_mv::gen_create_mv_plan(
809 &session,
810 context.clone(),
811 q,
812 ObjectName(vec!["test".into()]),
813 vec![],
814 Some(emit_mode),
815 ) {
816 Ok((stream_plan, _)) => stream_plan,
817 Err(err) => {
818 *ret_error_str = Some(err.to_report_string_pretty());
819 continue;
820 }
821 };
822
823 if plan {
825 *ret_plan_str = Some(explain_plan(&stream_plan));
826 }
827
828 if dist_plan {
830 let graph = build_graph(stream_plan)?;
831 *ret_dist_plan_str = Some(explain_stream_graph(&graph, false));
832 }
833 }
834 }
835
836 'sink: {
837 if self.expected_outputs.contains(&TestType::SinkPlan) {
838 let mut plan_root = plan_root.clone();
839 let sink_name = "sink_test";
840 let mut options = BTreeMap::new();
841 options.insert("connector".to_owned(), "blackhole".to_owned());
842 options.insert("type".to_owned(), "append-only".to_owned());
843 let options = WithOptionsSecResolved::without_secrets(options);
845 let format_desc = (&options).try_into().unwrap();
846 match plan_root.gen_sink_plan(
847 sink_name.to_owned(),
848 format!("CREATE SINK {sink_name} AS {}", stmt),
849 options,
850 false,
851 "test_db".into(),
852 "test_table".into(),
853 format_desc,
854 false,
855 None,
856 None,
857 false,
858 ) {
859 Ok(sink_plan) => {
860 ret.sink_plan = Some(explain_plan(&sink_plan.into()));
861 break 'sink;
862 }
863 Err(err) => {
864 ret.sink_error = Some(err.to_report_string_pretty());
865 break 'sink;
866 }
867 }
868 }
869 }
870
871 Ok(ret)
872 }
873}
874
875fn explain_plan(plan: &PlanRef) -> String {
876 plan.explain_to_string()
877}
878
879fn check_result(test_case: &TestCase, actual: &TestCaseResult) -> Result<()> {
883 macro_rules! check {
884 ($field:ident) => {
885 paste::paste! {
886 let case_contains = test_case.expected_outputs.contains(&TestType:: [< $field:camel >] );
887 let actual_contains = &actual.$field;
888 match (case_contains, actual_contains) {
889 (false, None) | (true, Some(_)) => {},
890 (false, Some(e)) => return Err(anyhow!("unexpected {}: {}", stringify!($field), e)),
891 (true, None) => return Err(anyhow!(
892 "expected {}, but there's no such result during execution",
893 stringify!($field)
894 )),
895 }
896 }
897 };
898 }
899
900 check!(binder_error);
901 check!(planner_error);
902 check!(optimizer_error);
903 check!(batch_error);
904 check!(batch_local_error);
905 check!(stream_error);
906 check!(eowc_stream_error);
907
908 check!(logical_plan);
909 check!(optimized_logical_plan_for_batch);
910 check!(optimized_logical_plan_for_stream);
911 check!(batch_plan);
912 check!(batch_local_plan);
913 check!(stream_plan);
914 check!(stream_dist_plan);
915 check!(eowc_stream_plan);
916 check!(eowc_stream_dist_plan);
917 check!(batch_plan_proto);
918 check!(sink_plan);
919
920 check!(explain_output);
921
922 Ok(())
923}
924
925pub fn test_data_dir() -> PathBuf {
927 std::path::Path::new(env!("CARGO_MANIFEST_DIR"))
928 .join("tests")
929 .join("testdata")
930}
931
932pub async fn run_test_file(file_path: &Path, file_content: &str) -> Result<()> {
933 let file_name = file_path.file_name().unwrap().to_str().unwrap();
934 println!("-- running {file_name} --");
935
936 let mut failed_num = 0;
937 let cases: Vec<TestCase> = serde_yaml::from_str(file_content).map_err(|e| {
938 let context = if let Some(loc) = e.location() {
939 format!(
940 "failed to parse yaml at {}:{}:{}",
941 file_path.display(),
942 loc.line(),
943 loc.column()
944 )
945 } else {
946 "failed to parse yaml".to_owned()
947 };
948 anyhow::anyhow!(e).context(context)
949 })?;
950 let cases = resolve_testcase_id(cases).expect("failed to resolve");
951 let mut outputs = vec![];
952
953 for (i, c) in cases.into_iter().enumerate() {
954 println!(
955 "Running test #{i} (id: {}), SQL:\n{}",
956 c.id().clone().unwrap_or_else(|| "<none>".to_owned()),
957 c.sql()
958 );
959 match c.run(true).await {
960 Ok(case) => {
961 outputs.push(case);
962 }
963 Err(e) => {
964 eprintln!(
965 "Test #{i} (id: {}) failed, SQL:\n{}\nError: {}",
966 c.id().clone().unwrap_or_else(|| "<none>".to_owned()),
967 c.sql(),
968 e.as_report()
969 );
970 failed_num += 1;
971 }
972 }
973 }
974
975 let output_path = test_data_dir().join("output").join(file_name);
976 check(outputs, expect_test::expect_file![output_path]);
977
978 if failed_num > 0 {
979 println!("\n");
980 bail!(format!("{} test cases failed", failed_num));
981 }
982 Ok(())
983}