risingwave_frontend/optimizer/plan_visitor/
input_ref_validator.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use paste::paste;
16use risingwave_common::catalog::{Field, Schema};
17
18use super::{DefaultBehavior, Merge};
19use crate::expr::ExprVisitor;
20use crate::optimizer::plan_node::generic::GenericPlanRef;
21use crate::optimizer::plan_node::{Explain, PlanRef, PlanTreeNodeUnary};
22use crate::optimizer::plan_visitor::PlanVisitor;
23
24struct ExprVis<'a> {
25    schema: &'a Schema,
26    string: Option<String>,
27}
28
29impl ExprVisitor for ExprVis<'_> {
30    fn visit_input_ref(&mut self, input_ref: &crate::expr::InputRef) {
31        if input_ref.data_type != self.schema[input_ref.index].data_type {
32            self.string.replace(format!(
33                "InputRef#{} has type {}, but its type is {} in the input schema",
34                input_ref.index, input_ref.data_type, self.schema[input_ref.index].data_type
35            ));
36        }
37    }
38}
39
40/// Validates that input references are consistent with the input schema.
41///
42/// Use `InputRefValidator::validate` as an assertion.
43#[derive(Debug, Clone, Default)]
44pub struct InputRefValidator;
45
46impl InputRefValidator {
47    #[track_caller]
48    pub fn validate(mut self, plan: PlanRef) {
49        if let Some(err) = self.visit(plan.clone()) {
50            panic!(
51                "Input references are inconsistent with the input schema: {}, plan:\n{}",
52                err,
53                plan.explain_to_string()
54            );
55        }
56    }
57}
58
59macro_rules! visit_filter {
60    ($($convention:ident),*) => {
61        $(
62            paste! {
63                fn [<visit_ $convention _filter>](&mut self, plan: &crate::optimizer::plan_node:: [<$convention:camel Filter>]) -> Option<String> {
64                    let input = plan.input();
65                    let mut vis = ExprVis {
66                        schema: input.schema(),
67                        string: None,
68                    };
69                    plan.predicate().visit_expr(&mut vis);
70                    vis.string.or_else(|| {
71                        self.visit(input)
72                    })
73                }
74            }
75        )*
76    };
77}
78
79macro_rules! visit_project {
80    ($($convention:ident),*) => {
81        $(
82            paste! {
83                fn [<visit_ $convention _project>](&mut self, plan: &crate::optimizer::plan_node:: [<$convention:camel Project>]) -> Option<String> {
84                    let input = plan.input();
85                    let mut vis = ExprVis {
86                        schema: input.schema(),
87                        string: None,
88                    };
89                    for expr in plan.exprs() {
90                        vis.visit_expr(expr);
91                        if vis.string.is_some() {
92                            return vis.string;
93                        }
94                    }
95                    self.visit(input)
96                }
97            }
98        )*
99    };
100}
101
102impl PlanVisitor for InputRefValidator {
103    type Result = Option<String>;
104
105    type DefaultBehavior = impl DefaultBehavior<Self::Result>;
106
107    visit_filter!(logical, batch, stream);
108
109    visit_project!(logical, batch, stream);
110
111    fn default_behavior() -> Self::DefaultBehavior {
112        Merge(|a: Option<String>, b| a.or(b))
113    }
114
115    fn visit_logical_scan(
116        &mut self,
117        plan: &crate::optimizer::plan_node::LogicalScan,
118    ) -> Option<String> {
119        let fields = plan
120            .table_desc()
121            .columns
122            .iter()
123            .map(|col| Field::from_with_table_name_prefix(col, plan.table_name()))
124            .collect();
125        let input_schema = Schema { fields };
126        let mut vis = ExprVis {
127            schema: &input_schema,
128            string: None,
129        };
130        plan.predicate().visit_expr(&mut vis);
131        vis.string
132    }
133
134    // TODO: add more checks
135}