risingwave_frontend/expr/
expr_rewriter.rsuse risingwave_common::util::recursive::{tracker, Recurse};
use super::{
AggCall, CorrelatedInputRef, ExprImpl, FunctionCall, FunctionCallWithLambda, InputRef, Literal,
Parameter, Subquery, TableFunction, UserDefinedFunction, WindowFunction, EXPR_DEPTH_THRESHOLD,
EXPR_TOO_DEEP_NOTICE,
};
use crate::expr::Now;
use crate::session::current::notice_to_user;
pub fn default_rewrite_expr<R: ExprRewriter + ?Sized>(
rewriter: &mut R,
expr: ExprImpl,
) -> ExprImpl {
tracker!().recurse(|t| {
if t.depth_reaches(EXPR_DEPTH_THRESHOLD) {
notice_to_user(EXPR_TOO_DEEP_NOTICE);
}
match expr {
ExprImpl::InputRef(inner) => rewriter.rewrite_input_ref(*inner),
ExprImpl::Literal(inner) => rewriter.rewrite_literal(*inner),
ExprImpl::FunctionCall(inner) => rewriter.rewrite_function_call(*inner),
ExprImpl::FunctionCallWithLambda(inner) => {
rewriter.rewrite_function_call_with_lambda(*inner)
}
ExprImpl::AggCall(inner) => rewriter.rewrite_agg_call(*inner),
ExprImpl::Subquery(inner) => rewriter.rewrite_subquery(*inner),
ExprImpl::CorrelatedInputRef(inner) => rewriter.rewrite_correlated_input_ref(*inner),
ExprImpl::TableFunction(inner) => rewriter.rewrite_table_function(*inner),
ExprImpl::WindowFunction(inner) => rewriter.rewrite_window_function(*inner),
ExprImpl::UserDefinedFunction(inner) => rewriter.rewrite_user_defined_function(*inner),
ExprImpl::Parameter(inner) => rewriter.rewrite_parameter(*inner),
ExprImpl::Now(inner) => rewriter.rewrite_now(*inner),
}
})
}
pub trait ExprRewriter {
fn rewrite_expr(&mut self, expr: ExprImpl) -> ExprImpl {
default_rewrite_expr(self, expr)
}
fn rewrite_function_call(&mut self, func_call: FunctionCall) -> ExprImpl {
let (func_type, inputs, ret) = func_call.decompose();
let inputs = inputs
.into_iter()
.map(|expr| self.rewrite_expr(expr))
.collect();
FunctionCall::new_unchecked(func_type, inputs, ret).into()
}
fn rewrite_function_call_with_lambda(&mut self, func_call: FunctionCallWithLambda) -> ExprImpl {
let (func_type, inputs, lambda_arg, ret) = func_call.into_parts();
let inputs = inputs
.into_iter()
.map(|expr| self.rewrite_expr(expr))
.collect();
FunctionCallWithLambda::new_unchecked(func_type, inputs, lambda_arg, ret).into()
}
fn rewrite_agg_call(&mut self, agg_call: AggCall) -> ExprImpl {
let AggCall {
agg_type,
return_type,
args,
distinct,
order_by,
filter,
direct_args,
} = agg_call;
let args = args
.into_iter()
.map(|expr| self.rewrite_expr(expr))
.collect();
let order_by = order_by.rewrite_expr(self);
let filter = filter.rewrite_expr(self);
AggCall {
agg_type,
return_type,
args,
distinct,
order_by,
filter,
direct_args,
}
.into()
}
fn rewrite_parameter(&mut self, parameter: Parameter) -> ExprImpl {
parameter.into()
}
fn rewrite_literal(&mut self, literal: Literal) -> ExprImpl {
literal.into()
}
fn rewrite_input_ref(&mut self, input_ref: InputRef) -> ExprImpl {
input_ref.into()
}
fn rewrite_subquery(&mut self, subquery: Subquery) -> ExprImpl {
subquery.into()
}
fn rewrite_correlated_input_ref(&mut self, input_ref: CorrelatedInputRef) -> ExprImpl {
input_ref.into()
}
fn rewrite_table_function(&mut self, table_func: TableFunction) -> ExprImpl {
let TableFunction {
args,
return_type,
function_type,
user_defined: udtf_catalog,
} = table_func;
let args = args
.into_iter()
.map(|expr| self.rewrite_expr(expr))
.collect();
TableFunction {
args,
return_type,
function_type,
user_defined: udtf_catalog,
}
.into()
}
fn rewrite_window_function(&mut self, window_func: WindowFunction) -> ExprImpl {
let WindowFunction {
args,
return_type,
kind,
partition_by,
order_by,
frame,
} = window_func;
let args = args
.into_iter()
.map(|expr| self.rewrite_expr(expr))
.collect();
WindowFunction {
kind,
args,
return_type,
partition_by,
order_by,
frame,
}
.into()
}
fn rewrite_user_defined_function(&mut self, udf: UserDefinedFunction) -> ExprImpl {
let UserDefinedFunction { args, catalog } = udf;
let args = args
.into_iter()
.map(|expr| self.rewrite_expr(expr))
.collect();
UserDefinedFunction { args, catalog }.into()
}
fn rewrite_now(&mut self, now: Now) -> ExprImpl {
now.into()
}
}