1use std::iter::Peekable;
16
17use itertools::Itertools;
18use risingwave_common::types::{DataType, ScalarImpl};
19use risingwave_expr::expr::LogReport;
20use risingwave_pb::expr::ExprNode;
21use risingwave_pb::expr::expr_node::{PbType, RexNode};
22
23use super::NonStrictExpression;
24use super::expr_some_all::SomeAllExpression;
25use super::expr_udf::UserDefinedFunction;
26use super::strict::Strict;
27use super::wrapper::EvalErrorReport;
28use super::wrapper::checked::Checked;
29use super::wrapper::non_strict::NonStrict;
30use crate::expr::{
31 BoxedExpression, Expression, ExpressionBoxExt, InputRefExpression, LiteralExpression,
32};
33use crate::expr_context::strict_mode;
34use crate::sig::FUNCTION_REGISTRY;
35use crate::{Result, bail};
36
37pub fn build_from_prost(prost: &ExprNode) -> Result<BoxedExpression> {
39 let expr = ExprBuilder::new_strict().build(prost)?;
40 Ok(Strict::new(expr).boxed())
41}
42
43pub fn build_non_strict_from_prost(
45 prost: &ExprNode,
46 error_report: impl EvalErrorReport + 'static,
47) -> Result<NonStrictExpression> {
48 ExprBuilder::new_non_strict(error_report)
49 .build(prost)
50 .map(NonStrictExpression)
51}
52
53pub fn build_batch_expr_from_prost(prost: &ExprNode) -> Result<BoxedExpression> {
60 if strict_mode()? {
61 build_from_prost(prost)
62 } else {
63 Ok(ExprBuilder::new_non_strict(LogReport).build(prost)?.boxed())
65 }
66}
67
68struct ExprBuilder<R> {
70 error_report: Option<R>,
75}
76
77impl ExprBuilder<!> {
78 fn new_strict() -> Self {
80 Self { error_report: None }
81 }
82}
83
84impl<R> ExprBuilder<R>
85where
86 R: EvalErrorReport + 'static,
87{
88 fn new_non_strict(error_report: R) -> Self {
90 Self {
91 error_report: Some(error_report),
92 }
93 }
94
95 #[expect(clippy::let_and_return)]
97 fn wrap(&self, expr: impl Expression + 'static) -> BoxedExpression {
98 let checked = Checked(expr);
99
100 let may_non_strict = if let Some(error_report) = &self.error_report {
101 NonStrict::new(checked, error_report.clone()).boxed()
102 } else {
103 checked.boxed()
104 };
105
106 may_non_strict
107 }
108
109 fn build(&self, prost: &ExprNode) -> Result<BoxedExpression> {
111 let expr = self.build_inner(prost)?;
112 Ok(self.wrap(expr))
113 }
114
115 fn build_inner(&self, prost: &ExprNode) -> Result<BoxedExpression> {
117 use PbType as E;
118
119 let build_child = |prost: &'_ ExprNode| self.build(prost);
120
121 match prost.get_rex_node()? {
122 RexNode::InputRef(_) => InputRefExpression::build_boxed(prost, build_child),
123 RexNode::Constant(_) => LiteralExpression::build_boxed(prost, build_child),
124 RexNode::Udf(_) => UserDefinedFunction::build_boxed(prost, build_child),
125
126 RexNode::FuncCall(_) => match prost.function_type() {
127 E::All | E::Some => SomeAllExpression::build_boxed(prost, build_child),
129
130 _ => FuncCallBuilder::build_boxed(prost, build_child),
132 },
133
134 RexNode::Now(_) => unreachable!("now should not be built at backend"),
135 }
136 }
137}
138
139pub(crate) trait Build: Expression + Sized {
141 fn build(
145 prost: &ExprNode,
146 build_child: impl Fn(&ExprNode) -> Result<BoxedExpression>,
147 ) -> Result<Self>;
148
149 #[cfg(test)]
152 fn build_for_test(prost: &ExprNode) -> Result<Self> {
153 Self::build(prost, build_from_prost)
154 }
155}
156
157pub(crate) trait BuildBoxed: 'static {
159 fn build_boxed(
161 prost: &ExprNode,
162 build_child: impl Fn(&ExprNode) -> Result<BoxedExpression>,
163 ) -> Result<BoxedExpression>;
164}
165
166impl<E: Build + 'static> BuildBoxed for E {
168 fn build_boxed(
169 prost: &ExprNode,
170 build_child: impl Fn(&ExprNode) -> Result<BoxedExpression>,
171 ) -> Result<BoxedExpression> {
172 Self::build(prost, build_child).map(ExpressionBoxExt::boxed)
173 }
174}
175
176struct FuncCallBuilder;
178
179impl BuildBoxed for FuncCallBuilder {
180 fn build_boxed(
181 prost: &ExprNode,
182 build_child: impl Fn(&ExprNode) -> Result<BoxedExpression>,
183 ) -> Result<BoxedExpression> {
184 let func_type = prost.function_type();
185 let ret_type = DataType::from(prost.get_return_type().unwrap());
186 let func_call = prost
187 .get_rex_node()?
188 .as_func_call()
189 .expect("not a func call");
190
191 let children = func_call
192 .get_children()
193 .iter()
194 .map(build_child)
195 .try_collect()?;
196
197 build_func(func_type, ret_type, children)
198 }
199}
200
201pub fn build_func(
203 func: PbType,
204 ret_type: DataType,
205 children: Vec<BoxedExpression>,
206) -> Result<BoxedExpression> {
207 let args = children.iter().map(|c| c.return_type()).collect_vec();
208 let desc = FUNCTION_REGISTRY.get(func, &args, &ret_type)?;
209 desc.build_scalar(ret_type, children)
210}
211
212pub fn build_func_non_strict(
217 func: PbType,
218 ret_type: DataType,
219 children: Vec<BoxedExpression>,
220 error_report: impl EvalErrorReport + 'static,
221) -> Result<NonStrictExpression> {
222 let expr = build_func(func, ret_type, children)?;
223 let wrapped = NonStrictExpression(ExprBuilder::new_non_strict(error_report).wrap(expr));
224
225 Ok(wrapped)
226}
227
228pub(super) fn get_children_and_return_type(prost: &ExprNode) -> Result<(&[ExprNode], DataType)> {
229 let ret_type = DataType::from(prost.get_return_type().unwrap());
230 if let RexNode::FuncCall(func_call) = prost.get_rex_node().unwrap() {
231 Ok((func_call.get_children(), ret_type))
232 } else {
233 bail!("Expected RexNode::FuncCall");
234 }
235}
236
237pub fn build_from_pretty(s: impl AsRef<str>) -> BoxedExpression {
260 let tokens = lexer(s.as_ref());
261 Parser::new(tokens.into_iter()).parse_expression()
262}
263
264struct Parser<Iter: Iterator> {
265 tokens: Peekable<Iter>,
266}
267
268impl<Iter: Iterator<Item = Token>> Parser<Iter> {
269 fn new(tokens: Iter) -> Self {
270 Self {
271 tokens: tokens.peekable(),
272 }
273 }
274
275 fn parse_expression(&mut self) -> BoxedExpression {
276 match self.tokens.next().expect("Unexpected end of input") {
277 Token::Index(index) => {
278 assert_eq!(self.tokens.next(), Some(Token::Colon), "Expected a Colon");
279 let ty = self.parse_type();
280 InputRefExpression::new(ty, index).boxed()
281 }
282 Token::LParen => {
283 let func = self.parse_function();
284 assert_eq!(self.tokens.next(), Some(Token::Colon), "Expected a Colon");
285 let ty = self.parse_type();
286
287 let mut children = Vec::new();
288 while self.tokens.peek() != Some(&Token::RParen) {
289 children.push(self.parse_expression());
290 }
291 self.tokens.next(); build_func(func, ty, children).expect("Failed to build")
294 }
295 Token::Literal(value) => {
296 assert_eq!(self.tokens.next(), Some(Token::Colon), "Expected a Colon");
297 let ty = self.parse_type();
298 let value = match value.as_str() {
299 "null" | "NULL" => None,
300 _ => Some(ScalarImpl::from_text(&value, &ty).expect_str("value", &value)),
301 };
302 LiteralExpression::new(ty, value).boxed()
303 }
304 _ => panic!("Unexpected token"),
305 }
306 }
307
308 fn parse_type(&mut self) -> DataType {
309 match self.tokens.next().expect("Unexpected end of input") {
310 Token::Literal(name) => name
311 .replace('_', " ")
312 .parse::<DataType>()
313 .expect_str("type", &name),
314 t => panic!("Expected a Literal, got {t:?}"),
315 }
316 }
317
318 fn parse_function(&mut self) -> PbType {
319 match self.tokens.next().expect("Unexpected end of input") {
320 Token::Literal(name) => {
321 PbType::from_str_name(&name.to_uppercase()).expect_str("function", &name)
322 }
323 t => panic!("Expected a Literal, got {t:?}"),
324 }
325 }
326}
327
328#[derive(Debug, PartialEq, Clone)]
329pub(crate) enum Token {
330 LParen,
331 RParen,
332 Colon,
333 Index(usize),
334 Literal(String),
335}
336
337pub(crate) fn lexer(input: &str) -> Vec<Token> {
338 let mut tokens = Vec::new();
339 let mut chars = input.chars().peekable();
340 while let Some(c) = chars.next() {
341 let token = match c {
342 '(' => Token::LParen,
343 ')' => Token::RParen,
344 ':' => Token::Colon,
345 '$' => {
346 let mut number = String::new();
347 while let Some(c) = chars.peek()
348 && c.is_ascii_digit()
349 {
350 number.push(chars.next().unwrap());
351 }
352 let index = number.parse::<usize>().expect("Invalid number");
353 Token::Index(index)
354 }
355 ' ' | '\t' | '\r' | '\n' => continue,
356 _ => {
357 let mut literal = String::new();
358 literal.push(c);
359 while let Some(&c) = chars.peek()
360 && !matches!(c, '(' | ')' | ':' | ' ' | '\t' | '\r' | '\n')
361 {
362 literal.push(chars.next().unwrap());
363 }
364 Token::Literal(literal)
365 }
366 };
367 tokens.push(token);
368 }
369 tokens
370}
371
372pub(crate) trait ExpectExt<T> {
373 fn expect_str(self, what: &str, s: &str) -> T;
374}
375
376impl<T> ExpectExt<T> for Option<T> {
377 #[track_caller]
378 fn expect_str(self, what: &str, s: &str) -> T {
379 match self {
380 Some(x) => x,
381 None => panic!("expect {what} in {s:?}"),
382 }
383 }
384}
385
386impl<T, E> ExpectExt<T> for std::result::Result<T, E> {
387 #[track_caller]
388 fn expect_str(self, what: &str, s: &str) -> T {
389 match self {
390 Ok(x) => x,
391 Err(_) => panic!("expect {what} in {s:?}"),
392 }
393 }
394}