risingwave_frontend/optimizer/plan_visitor/
input_ref_validator.rs1use paste::paste;
16use risingwave_common::catalog::{Field, Schema};
17
18use super::{DefaultBehavior, Merge};
19use crate::expr::ExprVisitor;
20use crate::optimizer::plan_node::generic::GenericPlanRef;
21use crate::optimizer::plan_node::{Explain, PlanRef, PlanTreeNodeUnary};
22use crate::optimizer::plan_visitor::PlanVisitor;
23
24struct ExprVis<'a> {
25 schema: &'a Schema,
26 string: Option<String>,
27}
28
29impl ExprVisitor for ExprVis<'_> {
30 fn visit_input_ref(&mut self, input_ref: &crate::expr::InputRef) {
31 if input_ref.data_type != self.schema[input_ref.index].data_type {
32 self.string.replace(format!(
33 "InputRef#{} has type {}, but its type is {} in the input schema",
34 input_ref.index, input_ref.data_type, self.schema[input_ref.index].data_type
35 ));
36 }
37 }
38}
39
40#[derive(Debug, Clone, Default)]
44pub struct InputRefValidator;
45
46impl InputRefValidator {
47 #[track_caller]
48 pub fn validate(mut self, plan: PlanRef) {
49 if let Some(err) = self.visit(plan.clone()) {
50 panic!(
51 "Input references are inconsistent with the input schema: {}, plan:\n{}",
52 err,
53 plan.explain_to_string()
54 );
55 }
56 }
57}
58
59macro_rules! visit_filter {
60 ($($convention:ident),*) => {
61 $(
62 paste! {
63 fn [<visit_ $convention _filter>](&mut self, plan: &crate::optimizer::plan_node:: [<$convention:camel Filter>]) -> Option<String> {
64 let input = plan.input();
65 let mut vis = ExprVis {
66 schema: input.schema(),
67 string: None,
68 };
69 plan.predicate().visit_expr(&mut vis);
70 vis.string.or_else(|| {
71 self.visit(input)
72 })
73 }
74 }
75 )*
76 };
77}
78
79macro_rules! visit_project {
80 ($($convention:ident),*) => {
81 $(
82 paste! {
83 fn [<visit_ $convention _project>](&mut self, plan: &crate::optimizer::plan_node:: [<$convention:camel Project>]) -> Option<String> {
84 let input = plan.input();
85 let mut vis = ExprVis {
86 schema: input.schema(),
87 string: None,
88 };
89 for expr in plan.exprs() {
90 vis.visit_expr(expr);
91 if vis.string.is_some() {
92 return vis.string;
93 }
94 }
95 self.visit(input)
96 }
97 }
98 )*
99 };
100}
101
102impl PlanVisitor for InputRefValidator {
103 type Result = Option<String>;
104
105 type DefaultBehavior = impl DefaultBehavior<Self::Result>;
106
107 visit_filter!(logical, batch, stream);
108
109 visit_project!(logical, batch, stream);
110
111 fn default_behavior() -> Self::DefaultBehavior {
112 Merge(|a: Option<String>, b| a.or(b))
113 }
114
115 fn visit_logical_scan(
116 &mut self,
117 plan: &crate::optimizer::plan_node::LogicalScan,
118 ) -> Option<String> {
119 let fields = plan
120 .table_desc()
121 .columns
122 .iter()
123 .map(|col| Field::from_with_table_name_prefix(col, plan.table_name()))
124 .collect();
125 let input_schema = Schema { fields };
126 let mut vis = ExprVis {
127 schema: &input_schema,
128 string: None,
129 };
130 plan.predicate().visit_expr(&mut vis);
131 vis.string
132 }
133
134 }