1use std::collections::{HashMap, HashSet};
16use std::str::FromStr;
17use std::sync::Arc;
18
19use anyhow::Context;
20use itertools::Itertools;
21use risingwave_common::acl::AclMode;
22use risingwave_common::bail_not_implemented;
23use risingwave_common::catalog::{INFORMATION_SCHEMA_SCHEMA_NAME, PG_CATALOG_SCHEMA_NAME};
24use risingwave_common::types::{DataType, MapType};
25use risingwave_expr::aggregate::AggType;
26use risingwave_expr::window_function::WindowFuncKind;
27use risingwave_pb::user::grant_privilege::PbObject;
28use risingwave_sqlparser::ast::{
29 self, Function, FunctionArg, FunctionArgExpr, FunctionArgList, Ident, OrderByExpr, Window,
30};
31use risingwave_sqlparser::parser::ParserError;
32
33use crate::binder::bind_context::Clause;
34use crate::binder::{Binder, UdfContext};
35use crate::catalog::function_catalog::FunctionCatalog;
36use crate::error::{ErrorCode, Result, RwError};
37use crate::expr::{
38 Expr, ExprImpl, ExprType, FunctionCallWithLambda, InputRef, TableFunction, TableFunctionType,
39 UserDefinedFunction,
40};
41
42mod aggregate;
43mod builtin_scalar;
44mod window;
45
46const SYS_FUNCTION_WITHOUT_ARGS: &[&str] = &[
48 "session_user",
49 "user",
50 "current_user",
51 "current_role",
52 "current_catalog",
53 "current_schema",
54 "current_timestamp",
55];
56
57pub(super) fn is_sys_function_without_args(ident: &Ident) -> bool {
58 SYS_FUNCTION_WITHOUT_ARGS
59 .iter()
60 .any(|e| ident.real_value().as_str() == *e && ident.quote_style().is_none())
61}
62
63const SQL_UDF_MAX_CALLING_DEPTH: u32 = 16;
68
69macro_rules! reject_syntax {
70 ($pred:expr, $msg:expr) => {
71 if $pred {
72 return Err(ErrorCode::InvalidInputSyntax($msg.to_string()).into());
73 }
74 };
75
76 ($pred:expr, $fmt:expr, $($arg:tt)*) => {
77 if $pred {
78 return Err(ErrorCode::InvalidInputSyntax(
79 format!($fmt, $($arg)*)
80 ).into());
81 }
82 };
83}
84
85impl Binder {
86 pub(in crate::binder) fn bind_function(
87 &mut self,
88 Function {
89 scalar_as_agg,
90 name,
91 arg_list,
92 within_group,
93 filter,
94 over,
95 }: Function,
96 ) -> Result<ExprImpl> {
97 let (schema_name, func_name) = match name.0.as_slice() {
98 [name] => (None, name.real_value()),
99 [schema, name] => {
100 let schema_name = schema.real_value();
101 let func_name = if schema_name == PG_CATALOG_SCHEMA_NAME {
102 name.real_value()
105 } else if schema_name == INFORMATION_SCHEMA_SCHEMA_NAME {
106 let function_name = name.real_value();
110 if function_name != "_pg_expandarray" {
111 bail_not_implemented!(
112 issue = 12422,
113 "Unsupported function name under schema: {}",
114 schema_name
115 );
116 }
117 function_name
118 } else {
119 bail_not_implemented!(
120 issue = 12422,
121 "Unsupported function name under schema: {}",
122 schema_name
123 );
124 };
125 (Some(schema_name), func_name)
126 }
127 _ => bail_not_implemented!(issue = 112, "qualified function {}", name),
128 };
129
130 if func_name == "obj_description" || func_name == "col_description" {
137 return Ok(ExprImpl::literal_varchar("".to_owned()));
138 }
139
140 if func_name == "array_transform" || func_name == "map_filter" {
142 return self.validate_and_bind_special_function_params(
143 &func_name,
144 scalar_as_agg,
145 arg_list,
146 &within_group,
147 &filter,
148 &over,
149 );
150 }
151
152 let mut args: Vec<_> = arg_list
153 .args
154 .iter()
155 .map(|arg| self.bind_function_arg(arg.clone()))
156 .flatten_ok()
157 .try_collect()?;
158
159 let mut referred_udfs = HashSet::new();
160
161 let wrapped_agg_type = if scalar_as_agg {
162 let mut array_args = args
165 .iter()
166 .enumerate()
167 .map(|(i, expr)| {
168 InputRef::new(i, DataType::List(Box::new(expr.return_type()))).into()
169 })
170 .collect_vec();
171 let schema_path = self.bind_schema_path(schema_name.as_deref());
172 let scalar_func_expr = if let Ok((func, _)) = self.catalog.get_function_by_name_inputs(
173 &self.db_name,
174 schema_path,
175 &func_name,
176 &mut array_args,
177 ) {
178 referred_udfs.insert(func.id);
180 self.check_privilege(
181 PbObject::FunctionId(func.id.function_id()),
182 self.database_id,
183 AclMode::Execute,
184 func.owner,
185 )?;
186
187 if !func.kind.is_scalar() {
188 return Err(ErrorCode::InvalidInputSyntax(
189 "expect a scalar function after `AGGREGATE:`".to_owned(),
190 )
191 .into());
192 }
193
194 if func.language == "sql" {
195 self.bind_sql_udf(func.clone(), array_args)?
196 } else {
197 UserDefinedFunction::new(func.clone(), array_args).into()
198 }
199 } else {
200 self.bind_builtin_scalar_function(&func_name, array_args, arg_list.variadic)?
201 };
202
203 Some(AggType::WrapScalar(scalar_func_expr.to_expr_proto()))
205 } else {
206 None
207 };
208
209 let schema_path = self.bind_schema_path(schema_name.as_deref());
210 let udf = if wrapped_agg_type.is_none()
211 && let Ok((func, _)) = self.catalog.get_function_by_name_inputs(
212 &self.db_name,
213 schema_path,
214 &func_name,
215 &mut args,
216 ) {
217 referred_udfs.insert(func.id);
219 self.check_privilege(
220 PbObject::FunctionId(func.id.function_id()),
221 self.database_id,
222 AclMode::Execute,
223 func.owner,
224 )?;
225 Some(func.clone())
226 } else {
227 None
228 };
229
230 self.included_udfs.extend(referred_udfs);
231
232 let agg_type = if wrapped_agg_type.is_some() {
233 wrapped_agg_type
234 } else if let Some(ref udf) = udf
235 && udf.kind.is_aggregate()
236 {
237 assert_ne!(udf.language, "sql", "SQL UDAF is not supported yet");
238 Some(AggType::UserDefined(udf.as_ref().into()))
239 } else {
240 AggType::from_str(&func_name).ok()
241 };
242
243 if let Some(over) = over {
245 reject_syntax!(
246 arg_list.distinct,
247 "`DISTINCT` is not allowed in window function call"
248 );
249 reject_syntax!(
250 arg_list.variadic,
251 "`VARIADIC` is not allowed in window function call"
252 );
253 reject_syntax!(
254 !arg_list.order_by.is_empty(),
255 "`ORDER BY` is not allowed in window function call argument list"
256 );
257 reject_syntax!(
258 within_group.is_some(),
259 "`WITHIN GROUP` is not allowed in window function call"
260 );
261
262 let kind = if let Some(agg_type) = agg_type {
263 WindowFuncKind::Aggregate(agg_type)
265 } else if let Ok(kind) = WindowFuncKind::from_str(&func_name) {
266 kind
267 } else {
268 bail_not_implemented!(issue = 8961, "Unrecognized window function: {}", func_name);
269 };
270 return self.bind_window_function(kind, args, arg_list.ignore_nulls, filter, over);
271 }
272
273 reject_syntax!(
275 arg_list.ignore_nulls,
276 "`IGNORE NULLS` is not allowed in aggregate/scalar/table function call"
277 );
278
279 if let Some(agg_type) = agg_type {
281 reject_syntax!(
282 arg_list.variadic,
283 "`VARIADIC` is not allowed in aggregate function call"
284 );
285 return self.bind_aggregate_function(
286 agg_type,
287 arg_list.distinct,
288 args,
289 arg_list.order_by,
290 within_group,
291 filter,
292 );
293 }
294
295 reject_syntax!(
297 arg_list.distinct,
298 "`DISTINCT` is not allowed in scalar/table function call"
299 );
300 reject_syntax!(
301 !arg_list.order_by.is_empty(),
302 "`ORDER BY` is not allowed in scalar/table function call"
303 );
304 reject_syntax!(
305 within_group.is_some(),
306 "`WITHIN GROUP` is not allowed in scalar/table function call"
307 );
308 reject_syntax!(
309 filter.is_some(),
310 "`FILTER` is not allowed in scalar/table function call"
311 );
312
313 {
315 if func_name.eq_ignore_ascii_case("file_scan") {
317 reject_syntax!(
318 arg_list.variadic,
319 "`VARIADIC` is not allowed in table function call"
320 );
321 self.ensure_table_function_allowed()?;
322 return Ok(TableFunction::new_file_scan(args)?.into());
323 }
324 if func_name.eq("postgres_query") {
326 reject_syntax!(
327 arg_list.variadic,
328 "`VARIADIC` is not allowed in table function call"
329 );
330 self.ensure_table_function_allowed()?;
331 return Ok(TableFunction::new_postgres_query(
332 &self.catalog,
333 &self.db_name,
334 self.bind_schema_path(schema_name.as_deref()),
335 args,
336 )
337 .context("postgres_query error")?
338 .into());
339 }
340 if func_name.eq("mysql_query") {
342 reject_syntax!(
343 arg_list.variadic,
344 "`VARIADIC` is not allowed in table function call"
345 );
346 self.ensure_table_function_allowed()?;
347 return Ok(TableFunction::new_mysql_query(
348 &self.catalog,
349 &self.db_name,
350 self.bind_schema_path(schema_name.as_deref()),
351 args,
352 )
353 .context("mysql_query error")?
354 .into());
355 }
356 if func_name.eq("internal_backfill_progress") {
358 reject_syntax!(
359 arg_list.variadic,
360 "`VARIADIC` is not allowed in table function call"
361 );
362 self.ensure_table_function_allowed()?;
363 return Ok(TableFunction::new_internal_backfill_progress().into());
364 }
365 if let Some(ref udf) = udf
367 && udf.kind.is_table()
368 {
369 reject_syntax!(
370 arg_list.variadic,
371 "`VARIADIC` is not allowed in table function call"
372 );
373 self.ensure_table_function_allowed()?;
374 if udf.language == "sql" {
375 return self.bind_sql_udf(udf.clone(), args);
376 }
377 return Ok(TableFunction::new_user_defined(udf.clone(), args).into());
378 }
379 if let Ok(function_type) = TableFunctionType::from_str(&func_name) {
381 reject_syntax!(
382 arg_list.variadic,
383 "`VARIADIC` is not allowed in table function call"
384 );
385 self.ensure_table_function_allowed()?;
386 return Ok(TableFunction::new(function_type, args)?.into());
387 }
388 }
389
390 if let Some(ref udf) = udf {
392 assert!(udf.kind.is_scalar());
393 reject_syntax!(
394 arg_list.variadic,
395 "`VARIADIC` is not allowed in user-defined function call"
396 );
397 if udf.language == "sql" {
398 return self.bind_sql_udf(udf.clone(), args);
399 }
400 return Ok(UserDefinedFunction::new(udf.clone(), args).into());
401 }
402
403 self.bind_builtin_scalar_function(&func_name, args, arg_list.variadic)
404 }
405
406 fn validate_and_bind_special_function_params(
407 &mut self,
408 func_name: &str,
409 scalar_as_agg: bool,
410 arg_list: FunctionArgList,
411 within_group: &Option<Box<OrderByExpr>>,
412 filter: &Option<Box<risingwave_sqlparser::ast::Expr>>,
413 over: &Option<Window>,
414 ) -> Result<ExprImpl> {
415 assert!(["array_transform", "map_filter"].contains(&func_name));
416
417 reject_syntax!(
418 scalar_as_agg,
419 "`AGGREGATE:` prefix is not allowed for `{}`",
420 func_name
421 );
422 reject_syntax!(
423 !arg_list.is_args_only(),
424 "keywords like `DISTINCT`, `ORDER BY` are not allowed in `{}` argument list",
425 func_name
426 );
427 reject_syntax!(
428 within_group.is_some(),
429 "`WITHIN GROUP` is not allowed in `{}` call",
430 func_name
431 );
432 reject_syntax!(
433 filter.is_some(),
434 "`FILTER` is not allowed in `{}` call",
435 func_name
436 );
437 reject_syntax!(
438 over.is_some(),
439 "`OVER` is not allowed in `{}` call",
440 func_name
441 );
442 if func_name == "array_transform" {
443 self.bind_array_transform(arg_list.args)
444 } else {
445 self.bind_map_filter(arg_list.args)
446 }
447 }
448
449 fn bind_array_transform(&mut self, args: Vec<FunctionArg>) -> Result<ExprImpl> {
450 let [array, lambda] = <[FunctionArg; 2]>::try_from(args).map_err(|args| -> RwError {
451 ErrorCode::BindError(format!(
452 "`array_transform` expect two inputs `array` and `lambda`, but {} were given",
453 args.len()
454 ))
455 .into()
456 })?;
457
458 let bound_array = self.bind_function_arg(array)?;
459 let [bound_array] = <[ExprImpl; 1]>::try_from(bound_array).map_err(|bound_array| -> RwError {
460 ErrorCode::BindError(format!("The `array` argument for `array_transform` should be bound to one argument, but {} were got", bound_array.len()))
461 .into()
462 })?;
463
464 let inner_ty = match bound_array.return_type() {
465 DataType::List(ty) => *ty,
466 real_type => return Err(ErrorCode::BindError(format!(
467 "The `array` argument for `array_transform` should be an array, but {} were got",
468 real_type
469 ))
470 .into()),
471 };
472
473 let ast::FunctionArgExpr::Expr(ast::Expr::LambdaFunction {
474 args: lambda_args,
475 body: lambda_body,
476 }) = lambda.get_expr()
477 else {
478 return Err(ErrorCode::BindError(
479 "The `lambda` argument for `array_transform` should be a lambda function"
480 .to_owned(),
481 )
482 .into());
483 };
484
485 let [lambda_arg] = <[Ident; 1]>::try_from(lambda_args).map_err(|args| -> RwError {
486 ErrorCode::BindError(format!(
487 "The `lambda` argument for `array_transform` should be a lambda function with one argument, but {} were given",
488 args.len()
489 ))
490 .into()
491 })?;
492
493 let bound_lambda = self.bind_unary_lambda_function(inner_ty, lambda_arg, *lambda_body)?;
494
495 let lambda_ret_type = bound_lambda.return_type();
496 let transform_ret_type = DataType::List(Box::new(lambda_ret_type));
497
498 Ok(ExprImpl::FunctionCallWithLambda(Box::new(
499 FunctionCallWithLambda::new_unchecked(
500 ExprType::ArrayTransform,
501 vec![bound_array],
502 bound_lambda,
503 transform_ret_type,
504 ),
505 )))
506 }
507
508 fn bind_unary_lambda_function(
509 &mut self,
510 input_ty: DataType,
511 arg: Ident,
512 body: ast::Expr,
513 ) -> Result<ExprImpl> {
514 let lambda_args = HashMap::from([(arg.real_value(), (0usize, input_ty))]);
515 let orig_lambda_args = self.context.lambda_args.replace(lambda_args);
516 let body = self.bind_expr_inner(body)?;
517 self.context.lambda_args = orig_lambda_args;
518
519 Ok(body)
520 }
521
522 fn bind_map_filter(&mut self, args: Vec<FunctionArg>) -> Result<ExprImpl> {
523 let [input, lambda] = <[FunctionArg; 2]>::try_from(args).map_err(|args| {
524 ErrorCode::BindError(format!(
525 "`map_filter` requires two arguments (input_map and lambda), got {}",
526 args.len()
527 ))
528 })?;
529
530 let bound_input = self.bind_function_arg(input)?;
531 let [bound_input] = <[ExprImpl; 1]>::try_from(bound_input).map_err(|e| {
532 ErrorCode::BindError(format!(
533 "Input argument should resolve to single expression, got {}",
534 e.len()
535 ))
536 })?;
537
538 let (key_type, value_type) = match bound_input.return_type() {
539 DataType::Map(map_type) => (map_type.key().clone(), map_type.value().clone()),
540 t => {
541 return Err(
542 ErrorCode::BindError(format!("Input must be Map type, got {}", t)).into(),
543 );
544 }
545 };
546
547 let ast::FunctionArgExpr::Expr(ast::Expr::LambdaFunction {
548 args: lambda_args,
549 body: lambda_body,
550 }) = lambda.get_expr()
551 else {
552 return Err(ErrorCode::BindError(
553 "Second argument must be a lambda function".to_owned(),
554 )
555 .into());
556 };
557
558 let [key_arg, value_arg] = <[Ident; 2]>::try_from(lambda_args).map_err(|args| {
559 ErrorCode::BindError(format!(
560 "Lambda must have exactly two parameters (key, value), got {}",
561 args.len()
562 ))
563 })?;
564
565 let bound_lambda = self.bind_binary_lambda_function(
566 key_arg,
567 key_type.clone(),
568 value_arg,
569 value_type.clone(),
570 *lambda_body,
571 )?;
572
573 let lambda_ret_type = bound_lambda.return_type();
574 if lambda_ret_type != DataType::Boolean {
575 return Err(ErrorCode::BindError(format!(
576 "Lambda must return Boolean type, got {}",
577 lambda_ret_type
578 ))
579 .into());
580 }
581
582 let map_type = MapType::from_kv(key_type, value_type);
583 let return_type = DataType::Map(map_type);
584
585 Ok(ExprImpl::FunctionCallWithLambda(Box::new(
586 FunctionCallWithLambda::new_unchecked(
587 ExprType::MapFilter,
588 vec![bound_input],
589 bound_lambda,
590 return_type,
591 ),
592 )))
593 }
594
595 fn bind_binary_lambda_function(
596 &mut self,
597 first_arg: Ident,
598 first_ty: DataType,
599 second_arg: Ident,
600 second_ty: DataType,
601 body: ast::Expr,
602 ) -> Result<ExprImpl> {
603 let lambda_args = HashMap::from([
604 (first_arg.real_value(), (0usize, first_ty)),
605 (second_arg.real_value(), (1usize, second_ty)),
606 ]);
607
608 let orig_ctx = self.context.lambda_args.replace(lambda_args);
609 let bound_body = self.bind_expr_inner(body)?;
610 self.context.lambda_args = orig_ctx;
611
612 Ok(bound_body)
613 }
614
615 fn ensure_table_function_allowed(&self) -> Result<()> {
616 if let Some(clause) = self.context.clause {
617 match clause {
618 Clause::JoinOn
619 | Clause::Where
620 | Clause::Having
621 | Clause::Filter
622 | Clause::Values
623 | Clause::Insert
624 | Clause::GeneratedColumn => {
625 return Err(ErrorCode::InvalidInputSyntax(format!(
626 "table functions are not allowed in {}",
627 clause
628 ))
629 .into());
630 }
631 Clause::GroupBy | Clause::From => {}
632 }
633 }
634 Ok(())
635 }
636
637 fn bind_sql_udf(
638 &mut self,
639 func: Arc<FunctionCatalog>,
640 args: Vec<ExprImpl>,
641 ) -> Result<ExprImpl> {
642 if func.body.is_none() {
643 return Err(
644 ErrorCode::InvalidInputSyntax("`body` must exist for sql udf".to_owned()).into(),
645 );
646 }
647
648 let parse_result =
650 risingwave_sqlparser::parser::Parser::parse_sql(func.body.as_ref().unwrap().as_str());
651 if let Err(ParserError::ParserError(err)) | Err(ParserError::TokenizerError(err)) =
652 parse_result
653 {
654 return Err(ErrorCode::InvalidInputSyntax(err).into());
656 }
657
658 debug_assert!(parse_result.is_ok());
659
660 let ast = parse_result.unwrap();
662
663 let stashed_udf_context = self.udf_context.get_context();
669
670 let mut udf_context = HashMap::new();
673 for (i, arg) in args.into_iter().enumerate() {
674 if func.arg_names[i].is_empty() {
675 udf_context.insert(format!("${}", i + 1), arg);
677 } else {
678 udf_context.insert(func.arg_names[i].clone(), arg);
680 }
681 }
682 self.udf_context.update_context(udf_context);
683
684 if self.udf_context.global_count() >= SQL_UDF_MAX_CALLING_DEPTH {
686 return Err(ErrorCode::BindError(format!(
687 "function {} calling stack depth limit exceeded",
688 func.name
689 ))
690 .into());
691 } else {
692 self.udf_context.incr_global_count();
694 }
695
696 if let Ok(expr) = UdfContext::extract_udf_expression(ast) {
697 let bind_result = self.bind_expr(expr);
698
699 self.udf_context.decr_global_count();
703
704 self.udf_context.update_context(stashed_udf_context);
706
707 return bind_result;
708 }
709
710 Err(ErrorCode::InvalidInputSyntax(
711 "failed to parse the input query and extract the udf expression,
712 please recheck the syntax"
713 .to_owned(),
714 )
715 .into())
716 }
717
718 pub(in crate::binder) fn bind_function_expr_arg(
719 &mut self,
720 arg_expr: FunctionArgExpr,
721 ) -> Result<Vec<ExprImpl>> {
722 match arg_expr {
723 FunctionArgExpr::Expr(expr) => Ok(vec![self.bind_expr_inner(expr)?]),
724 FunctionArgExpr::QualifiedWildcard(_, _)
725 | FunctionArgExpr::ExprQualifiedWildcard(_, _) => Err(ErrorCode::InvalidInputSyntax(
726 format!("unexpected wildcard {}", arg_expr),
727 )
728 .into()),
729 FunctionArgExpr::Wildcard(None) => Ok(vec![]),
730 FunctionArgExpr::Wildcard(Some(_)) => unreachable!(),
731 }
732 }
733
734 pub(in crate::binder) fn bind_function_arg(
735 &mut self,
736 arg: FunctionArg,
737 ) -> Result<Vec<ExprImpl>> {
738 match arg {
739 FunctionArg::Unnamed(expr) => self.bind_function_expr_arg(expr),
740 FunctionArg::Named { .. } => todo!(),
741 }
742 }
743}