1use std::iter::Peekable;
16
17use itertools::Itertools;
18use risingwave_common::types::{DataType, ScalarImpl};
19use risingwave_expr::expr::LogReport;
20use risingwave_pb::expr::ExprNode;
21use risingwave_pb::expr::expr_node::{PbType, RexNode};
22
23use super::NonStrictExpression;
24use super::expr_some_all::SomeAllExpression;
25use super::expr_udf::UserDefinedFunction;
26use super::strict::Strict;
27use super::wrapper::EvalErrorReport;
28use super::wrapper::checked::Checked;
29use super::wrapper::non_strict::NonStrict;
30use crate::expr::{
31 BoxedExpression, Expression, ExpressionBoxExt, InputRefExpression, LiteralExpression,
32};
33use crate::expr_context::strict_mode;
34use crate::sig::FUNCTION_REGISTRY;
35use crate::{Result, bail};
36
37pub fn build_from_prost(prost: &ExprNode) -> Result<BoxedExpression> {
39 let expr = ExprBuilder::new_strict().build(prost)?;
40 Ok(Strict::new(expr).boxed())
41}
42
43pub fn build_non_strict_from_prost(
45 prost: &ExprNode,
46 error_report: impl EvalErrorReport + 'static,
47) -> Result<NonStrictExpression> {
48 ExprBuilder::new_non_strict(error_report)
49 .build(prost)
50 .map(NonStrictExpression)
51}
52
53pub fn build_batch_expr_from_prost(prost: &ExprNode) -> Result<BoxedExpression> {
60 if strict_mode()? {
61 build_from_prost(prost)
62 } else {
63 Ok(ExprBuilder::new_non_strict(LogReport).build(prost)?.boxed())
65 }
66}
67
68struct ExprBuilder<R> {
70 error_report: Option<R>,
75}
76
77impl ExprBuilder<!> {
78 fn new_strict() -> Self {
80 Self { error_report: None }
81 }
82}
83
84impl<R> ExprBuilder<R>
85where
86 R: EvalErrorReport + 'static,
87{
88 fn new_non_strict(error_report: R) -> Self {
90 Self {
91 error_report: Some(error_report),
92 }
93 }
94
95 #[expect(clippy::let_and_return)]
97 fn wrap(&self, expr: impl Expression + 'static) -> BoxedExpression {
98 let checked = Checked(expr);
99
100 let may_non_strict = if let Some(error_report) = &self.error_report {
101 NonStrict::new(checked, error_report.clone()).boxed()
102 } else {
103 checked.boxed()
104 };
105
106 may_non_strict
107 }
108
109 fn build(&self, prost: &ExprNode) -> Result<BoxedExpression> {
111 let expr = self.build_inner(prost)?;
112 Ok(self.wrap(expr))
113 }
114
115 fn build_inner(&self, prost: &ExprNode) -> Result<BoxedExpression> {
117 use PbType as E;
118
119 let build_child = |prost: &'_ ExprNode| self.build(prost);
120
121 match prost.get_rex_node()? {
122 RexNode::InputRef(_) => InputRefExpression::build_boxed(prost, build_child),
123 RexNode::Constant(_) => LiteralExpression::build_boxed(prost, build_child),
124 RexNode::Udf(_) => UserDefinedFunction::build_boxed(prost, build_child),
125
126 RexNode::FuncCall(_) => match prost.function_type() {
127 E::All | E::Some => SomeAllExpression::build_boxed(prost, build_child),
129
130 _ => FuncCallBuilder::build_boxed(prost, build_child),
132 },
133
134 RexNode::Now(_) => unreachable!("now should not be built at backend"),
135
136 RexNode::SecretRef(sr) => {
137 use risingwave_common::secret::LocalSecretManager;
138 use risingwave_pb::secret::SecretRef as PbSecretRef;
139
140 let pb_ref = PbSecretRef {
141 secret_id: sr.secret_id.into(),
142 ref_as: sr.ref_as,
143 };
144 let value = LocalSecretManager::global()
145 .fill_secret(pb_ref)
146 .map_err(|e| anyhow::anyhow!(e))?;
147 Ok(
148 LiteralExpression::new(DataType::Varchar, Some(ScalarImpl::Utf8(value.into())))
149 .boxed(),
150 )
151 }
152 }
153 }
154}
155
156pub(crate) trait Build: Expression + Sized {
158 fn build(
162 prost: &ExprNode,
163 build_child: impl Fn(&ExprNode) -> Result<BoxedExpression>,
164 ) -> Result<Self>;
165
166 #[cfg(test)]
169 fn build_for_test(prost: &ExprNode) -> Result<Self> {
170 Self::build(prost, build_from_prost)
171 }
172}
173
174pub(crate) trait BuildBoxed: 'static {
176 fn build_boxed(
178 prost: &ExprNode,
179 build_child: impl Fn(&ExprNode) -> Result<BoxedExpression>,
180 ) -> Result<BoxedExpression>;
181}
182
183impl<E: Build + 'static> BuildBoxed for E {
185 fn build_boxed(
186 prost: &ExprNode,
187 build_child: impl Fn(&ExprNode) -> Result<BoxedExpression>,
188 ) -> Result<BoxedExpression> {
189 Self::build(prost, build_child).map(ExpressionBoxExt::boxed)
190 }
191}
192
193struct FuncCallBuilder;
195
196impl BuildBoxed for FuncCallBuilder {
197 fn build_boxed(
198 prost: &ExprNode,
199 build_child: impl Fn(&ExprNode) -> Result<BoxedExpression>,
200 ) -> Result<BoxedExpression> {
201 let func_type = prost.function_type();
202 let ret_type = DataType::from(prost.get_return_type().unwrap());
203 let func_call = prost
204 .get_rex_node()?
205 .as_func_call()
206 .expect("not a func call");
207
208 let children = func_call
209 .get_children()
210 .iter()
211 .map(build_child)
212 .try_collect()?;
213
214 build_func(func_type, ret_type, children)
215 }
216}
217
218pub fn build_func(
220 func: PbType,
221 ret_type: DataType,
222 children: Vec<BoxedExpression>,
223) -> Result<BoxedExpression> {
224 let args = children.iter().map(|c| c.return_type()).collect_vec();
225 let desc = FUNCTION_REGISTRY.get(func, &args, &ret_type)?;
226 desc.build_scalar(ret_type, children)
227}
228
229pub fn build_func_non_strict(
234 func: PbType,
235 ret_type: DataType,
236 children: Vec<BoxedExpression>,
237 error_report: impl EvalErrorReport + 'static,
238) -> Result<NonStrictExpression> {
239 let expr = build_func(func, ret_type, children)?;
240 let wrapped = NonStrictExpression(ExprBuilder::new_non_strict(error_report).wrap(expr));
241
242 Ok(wrapped)
243}
244
245pub(super) fn get_children_and_return_type(prost: &ExprNode) -> Result<(&[ExprNode], DataType)> {
246 let ret_type = DataType::from(prost.get_return_type().unwrap());
247 if let RexNode::FuncCall(func_call) = prost.get_rex_node().unwrap() {
248 Ok((func_call.get_children(), ret_type))
249 } else {
250 bail!("Expected RexNode::FuncCall");
251 }
252}
253
254pub fn build_from_pretty(s: impl AsRef<str>) -> BoxedExpression {
277 let tokens = lexer(s.as_ref());
278 Parser::new(tokens.into_iter()).parse_expression()
279}
280
281struct Parser<Iter: Iterator> {
282 tokens: Peekable<Iter>,
283}
284
285impl<Iter: Iterator<Item = Token>> Parser<Iter> {
286 fn new(tokens: Iter) -> Self {
287 Self {
288 tokens: tokens.peekable(),
289 }
290 }
291
292 fn parse_expression(&mut self) -> BoxedExpression {
293 match self.tokens.next().expect("Unexpected end of input") {
294 Token::Index(index) => {
295 assert_eq!(self.tokens.next(), Some(Token::Colon), "Expected a Colon");
296 let ty = self.parse_type();
297 InputRefExpression::new(ty, index).boxed()
298 }
299 Token::LParen => {
300 let func = self.parse_function();
301 assert_eq!(self.tokens.next(), Some(Token::Colon), "Expected a Colon");
302 let ty = self.parse_type();
303
304 let mut children = Vec::new();
305 while self.tokens.peek() != Some(&Token::RParen) {
306 children.push(self.parse_expression());
307 }
308 self.tokens.next(); build_func(func, ty, children).expect("Failed to build")
311 }
312 Token::Literal(value) => {
313 assert_eq!(self.tokens.next(), Some(Token::Colon), "Expected a Colon");
314 let ty = self.parse_type();
315 let value = match value.as_str() {
316 "null" | "NULL" => None,
317 _ => Some(ScalarImpl::from_text(&value, &ty).expect_str("value", &value)),
318 };
319 LiteralExpression::new(ty, value).boxed()
320 }
321 _ => panic!("Unexpected token"),
322 }
323 }
324
325 fn parse_type(&mut self) -> DataType {
326 match self.tokens.next().expect("Unexpected end of input") {
327 Token::Literal(name) => {
328 let mut processed_name = name.replace('_', " ");
329
330 if processed_name.starts_with("map") {
333 processed_name = processed_name.replace('<', "(").replace('>', ")");
334 }
335
336 processed_name
337 .parse::<DataType>()
338 .expect_str("type", &processed_name)
339 }
340 t => panic!("Expected a Literal, got {t:?}"),
341 }
342 }
343
344 fn parse_function(&mut self) -> PbType {
345 match self.tokens.next().expect("Unexpected end of input") {
346 Token::Literal(name) => {
347 PbType::from_str_name(&name.to_uppercase()).expect_str("function", &name)
348 }
349 t => panic!("Expected a Literal, got {t:?}"),
350 }
351 }
352}
353
354#[derive(Debug, PartialEq, Clone)]
355pub(crate) enum Token {
356 LParen,
357 RParen,
358 Colon,
359 Index(usize),
360 Literal(String),
361}
362
363pub(crate) fn lexer(input: &str) -> Vec<Token> {
364 let mut tokens = Vec::new();
365 let mut chars = input.chars().peekable();
366 while let Some(c) = chars.next() {
367 let token = match c {
368 '(' => Token::LParen,
369 ')' => Token::RParen,
370 ':' => Token::Colon,
371 '$' => {
372 let mut number = String::new();
373 while let Some(c) = chars.peek()
374 && c.is_ascii_digit()
375 {
376 number.push(chars.next().unwrap());
377 }
378 let index = number.parse::<usize>().expect("Invalid number");
379 Token::Index(index)
380 }
381 ' ' | '\t' | '\r' | '\n' => continue,
382 _ => {
383 let mut literal = String::new();
384 literal.push(c);
385 while let Some(&c) = chars.peek()
386 && !matches!(c, '(' | ')' | ':' | ' ' | '\t' | '\r' | '\n')
387 {
388 literal.push(chars.next().unwrap());
389 }
390 Token::Literal(literal)
391 }
392 };
393 tokens.push(token);
394 }
395 tokens
396}
397
398pub(crate) trait ExpectExt<T> {
399 fn expect_str(self, what: &str, s: &str) -> T;
400}
401
402impl<T> ExpectExt<T> for Option<T> {
403 #[track_caller]
404 fn expect_str(self, what: &str, s: &str) -> T {
405 match self {
406 Some(x) => x,
407 None => panic!("expect {what} in {s:?}"),
408 }
409 }
410}
411
412impl<T, E> ExpectExt<T> for std::result::Result<T, E> {
413 #[track_caller]
414 fn expect_str(self, what: &str, s: &str) -> T {
415 match self {
416 Ok(x) => x,
417 Err(_) => panic!("expect {what} in {s:?}"),
418 }
419 }
420}