risingwave_frontend/optimizer/plan_visitor/
input_ref_validator.rsuse paste::paste;
use risingwave_common::catalog::{Field, Schema};
use super::{DefaultBehavior, Merge};
use crate::expr::ExprVisitor;
use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::optimizer::plan_node::{Explain, PlanRef, PlanTreeNodeUnary};
use crate::optimizer::plan_visitor::PlanVisitor;
struct ExprVis<'a> {
schema: &'a Schema,
string: Option<String>,
}
impl ExprVisitor for ExprVis<'_> {
fn visit_input_ref(&mut self, input_ref: &crate::expr::InputRef) {
if input_ref.data_type != self.schema[input_ref.index].data_type {
self.string.replace(format!(
"InputRef#{} has type {}, but its type is {} in the input schema",
input_ref.index, input_ref.data_type, self.schema[input_ref.index].data_type
));
}
}
}
#[derive(Debug, Clone, Default)]
pub struct InputRefValidator;
impl InputRefValidator {
#[track_caller]
pub fn validate(mut self, plan: PlanRef) {
if let Some(err) = self.visit(plan.clone()) {
panic!(
"Input references are inconsistent with the input schema: {}, plan:\n{}",
err,
plan.explain_to_string()
);
}
}
}
macro_rules! visit_filter {
($($convention:ident),*) => {
$(
paste! {
fn [<visit_ $convention _filter>](&mut self, plan: &crate::optimizer::plan_node:: [<$convention:camel Filter>]) -> Option<String> {
let input = plan.input();
let mut vis = ExprVis {
schema: input.schema(),
string: None,
};
plan.predicate().visit_expr(&mut vis);
vis.string.or_else(|| {
self.visit(input)
})
}
}
)*
};
}
macro_rules! visit_project {
($($convention:ident),*) => {
$(
paste! {
fn [<visit_ $convention _project>](&mut self, plan: &crate::optimizer::plan_node:: [<$convention:camel Project>]) -> Option<String> {
let input = plan.input();
let mut vis = ExprVis {
schema: input.schema(),
string: None,
};
for expr in plan.exprs() {
vis.visit_expr(expr);
if vis.string.is_some() {
return vis.string;
}
}
self.visit(input)
}
}
)*
};
}
impl PlanVisitor for InputRefValidator {
type Result = Option<String>;
type DefaultBehavior = impl DefaultBehavior<Self::Result>;
visit_filter!(logical, batch, stream);
visit_project!(logical, batch, stream);
fn default_behavior() -> Self::DefaultBehavior {
Merge(|a: Option<String>, b| a.or(b))
}
fn visit_logical_scan(
&mut self,
plan: &crate::optimizer::plan_node::LogicalScan,
) -> Option<String> {
let fields = plan
.table_desc()
.columns
.iter()
.map(|col| Field::from_with_table_name_prefix(col, plan.table_name()))
.collect();
let input_schema = Schema { fields };
let mut vis = ExprVis {
schema: &input_schema,
string: None,
};
plan.predicate().visit_expr(&mut vis);
vis.string
}
}