1use std::collections::{HashMap, HashSet};
16use std::str::FromStr;
17use std::sync::Arc;
18
19use anyhow::Context;
20use itertools::Itertools;
21use risingwave_common::acl::AclMode;
22use risingwave_common::bail_not_implemented;
23use risingwave_common::catalog::INFORMATION_SCHEMA_SCHEMA_NAME;
24use risingwave_common::types::{DataType, MapType};
25use risingwave_expr::aggregate::AggType;
26use risingwave_expr::window_function::WindowFuncKind;
27use risingwave_pb::user::grant_privilege::PbObject;
28use risingwave_sqlparser::ast::{
29 self, Expr as AstExpr, Function, FunctionArg, FunctionArgExpr, FunctionArgList, Ident,
30 OrderByExpr, Statement, Window,
31};
32use risingwave_sqlparser::parser::Parser;
33
34use crate::binder::Binder;
35use crate::binder::bind_context::Clause;
36use crate::catalog::function_catalog::FunctionCatalog;
37use crate::error::{ErrorCode, Result, RwError};
38use crate::expr::{
39 Expr, ExprImpl, ExprType, FunctionCallWithLambda, InputRef, TableFunction, TableFunctionType,
40 UserDefinedFunction,
41};
42use crate::handler::privilege::ObjectCheckItem;
43
44mod aggregate;
45mod builtin_scalar;
46mod window;
47
48const SYS_FUNCTION_WITHOUT_ARGS: &[&str] = &[
50 "session_user",
51 "user",
52 "current_user",
53 "current_role",
54 "current_catalog",
55 "current_schema",
56 "current_timestamp",
57];
58
59pub(super) fn is_sys_function_without_args(ident: &Ident) -> bool {
60 SYS_FUNCTION_WITHOUT_ARGS
61 .iter()
62 .any(|e| ident.real_value().as_str() == *e && ident.quote_style().is_none())
63}
64
65macro_rules! reject_syntax {
66 ($pred:expr, $msg:expr) => {
67 if $pred {
68 return Err(ErrorCode::InvalidInputSyntax($msg.to_string()).into());
69 }
70 };
71
72 ($pred:expr, $fmt:expr, $($arg:tt)*) => {
73 if $pred {
74 return Err(ErrorCode::InvalidInputSyntax(
75 format!($fmt, $($arg)*)
76 ).into());
77 }
78 };
79}
80
81impl Binder {
82 pub(in crate::binder) fn bind_function(
83 &mut self,
84 Function {
85 scalar_as_agg,
86 name,
87 arg_list,
88 within_group,
89 filter,
90 over,
91 }: &Function,
92 ) -> Result<ExprImpl> {
93 let (schema_name, func_name) = match name.0.as_slice() {
94 [name] => (None, name.real_value()),
95 [schema, name] => {
96 let schema_name = schema.real_value();
97 let func_name = if schema_name == INFORMATION_SCHEMA_SCHEMA_NAME {
98 let function_name = name.real_value();
102 if function_name != "_pg_expandarray" {
103 bail_not_implemented!(
104 issue = 12422,
105 "Unsupported function name under schema: {}",
106 schema_name
107 );
108 }
109 function_name
110 } else {
111 name.real_value()
112 };
113 (Some(schema_name), func_name)
114 }
115 _ => bail_not_implemented!(issue = 112, "qualified function {}", name),
116 };
117
118 if func_name == "obj_description" || func_name == "col_description" {
125 return Ok(ExprImpl::literal_varchar("".to_owned()));
126 }
127
128 if func_name == "array_transform" || func_name == "map_filter" {
130 return self.validate_and_bind_special_function_params(
131 &func_name,
132 *scalar_as_agg,
133 arg_list,
134 within_group.as_deref(),
135 filter.as_deref(),
136 over.as_ref(),
137 );
138 }
139
140 let mut args: Vec<_> = arg_list
141 .args
142 .iter()
143 .map(|arg| self.bind_function_arg(arg))
144 .flatten_ok()
145 .try_collect()?;
146
147 let mut referred_udfs = HashSet::new();
148
149 let wrapped_agg_type = if *scalar_as_agg {
150 let mut array_args = args
153 .iter()
154 .enumerate()
155 .map(|(i, expr)| {
156 InputRef::new(i, DataType::List(Box::new(expr.return_type()))).into()
157 })
158 .collect_vec();
159 let schema_path = self.bind_schema_path(schema_name.as_deref());
160 let scalar_func_expr = if let Ok((func, _)) = self.catalog.get_function_by_name_inputs(
161 &self.db_name,
162 schema_path,
163 &func_name,
164 &mut array_args,
165 ) {
166 referred_udfs.insert(func.id);
168 self.check_privilege(
169 ObjectCheckItem::new(
170 func.owner,
171 AclMode::Execute,
172 func.name.clone(),
173 PbObject::FunctionId(func.id.function_id()),
174 ),
175 self.database_id,
176 )?;
177
178 if !func.kind.is_scalar() {
179 return Err(ErrorCode::InvalidInputSyntax(
180 "expect a scalar function after `AGGREGATE:`".to_owned(),
181 )
182 .into());
183 }
184
185 if func.language == "sql" {
186 self.bind_sql_udf(func.clone(), array_args)?
187 } else {
188 UserDefinedFunction::new(func.clone(), array_args).into()
189 }
190 } else {
191 self.bind_builtin_scalar_function(&func_name, array_args, arg_list.variadic)?
192 };
193
194 let expr_node = match scalar_func_expr.try_to_expr_proto() {
197 Ok(expr_node) => expr_node,
198 Err(e) => {
199 return Err(ErrorCode::InvalidInputSyntax(format!(
200 "function {func_name} cannot be used after `AGGREGATE:`: {e}",
201 ))
202 .into());
203 }
204 };
205
206 Some(AggType::WrapScalar(expr_node))
208 } else {
209 None
210 };
211
212 let schema_path = self.bind_schema_path(schema_name.as_deref());
213 let udf = if wrapped_agg_type.is_none()
214 && let Ok((func, _)) = self.catalog.get_function_by_name_inputs(
215 &self.db_name,
216 schema_path,
217 &func_name,
218 &mut args,
219 ) {
220 referred_udfs.insert(func.id);
222 self.check_privilege(
223 ObjectCheckItem::new(
224 func.owner,
225 AclMode::Execute,
226 func.name.clone(),
227 PbObject::FunctionId(func.id.function_id()),
228 ),
229 self.database_id,
230 )?;
231 Some(func.clone())
232 } else {
233 None
234 };
235
236 self.included_udfs.extend(referred_udfs);
237
238 let agg_type = if wrapped_agg_type.is_some() {
239 wrapped_agg_type
240 } else if let Some(ref udf) = udf
241 && udf.kind.is_aggregate()
242 {
243 assert_ne!(udf.language, "sql", "SQL UDAF is not supported yet");
244 Some(AggType::UserDefined(udf.as_ref().into()))
245 } else {
246 AggType::from_str(&func_name).ok()
247 };
248
249 if let Some(over) = over {
251 reject_syntax!(
252 arg_list.distinct,
253 "`DISTINCT` is not allowed in window function call"
254 );
255 reject_syntax!(
256 arg_list.variadic,
257 "`VARIADIC` is not allowed in window function call"
258 );
259 reject_syntax!(
260 !arg_list.order_by.is_empty(),
261 "`ORDER BY` is not allowed in window function call argument list"
262 );
263 reject_syntax!(
264 within_group.is_some(),
265 "`WITHIN GROUP` is not allowed in window function call"
266 );
267
268 let kind = if let Some(agg_type) = agg_type {
269 WindowFuncKind::Aggregate(agg_type)
271 } else if let Ok(kind) = WindowFuncKind::from_str(&func_name) {
272 kind
273 } else {
274 bail_not_implemented!(issue = 8961, "Unrecognized window function: {}", func_name);
275 };
276 return self.bind_window_function(
277 kind,
278 args,
279 arg_list.ignore_nulls,
280 filter.as_deref(),
281 over,
282 );
283 }
284
285 reject_syntax!(
287 arg_list.ignore_nulls,
288 "`IGNORE NULLS` is not allowed in aggregate/scalar/table function call"
289 );
290
291 if let Some(agg_type) = agg_type {
293 reject_syntax!(
294 arg_list.variadic,
295 "`VARIADIC` is not allowed in aggregate function call"
296 );
297 return self.bind_aggregate_function(
298 agg_type,
299 arg_list.distinct,
300 args,
301 &arg_list.order_by,
302 within_group.as_deref(),
303 filter.as_deref(),
304 );
305 }
306
307 reject_syntax!(
309 arg_list.distinct,
310 "`DISTINCT` is not allowed in scalar/table function call"
311 );
312 reject_syntax!(
313 !arg_list.order_by.is_empty(),
314 "`ORDER BY` is not allowed in scalar/table function call"
315 );
316 reject_syntax!(
317 within_group.is_some(),
318 "`WITHIN GROUP` is not allowed in scalar/table function call"
319 );
320 reject_syntax!(
321 filter.is_some(),
322 "`FILTER` is not allowed in scalar/table function call"
323 );
324
325 {
327 if func_name.eq_ignore_ascii_case("file_scan") {
329 reject_syntax!(
330 arg_list.variadic,
331 "`VARIADIC` is not allowed in table function call"
332 );
333 self.ensure_table_function_allowed()?;
334 return Ok(TableFunction::new_file_scan(args)?.into());
335 }
336 if func_name.eq("postgres_query") {
338 reject_syntax!(
339 arg_list.variadic,
340 "`VARIADIC` is not allowed in table function call"
341 );
342 self.ensure_table_function_allowed()?;
343 return Ok(TableFunction::new_postgres_query(
344 &self.catalog,
345 &self.db_name,
346 self.bind_schema_path(schema_name.as_deref()),
347 args,
348 )
349 .context("postgres_query error")?
350 .into());
351 }
352 if func_name.eq("mysql_query") {
354 reject_syntax!(
355 arg_list.variadic,
356 "`VARIADIC` is not allowed in table function call"
357 );
358 self.ensure_table_function_allowed()?;
359 return Ok(TableFunction::new_mysql_query(
360 &self.catalog,
361 &self.db_name,
362 self.bind_schema_path(schema_name.as_deref()),
363 args,
364 )
365 .context("mysql_query error")?
366 .into());
367 }
368 if func_name.eq("internal_backfill_progress") {
370 reject_syntax!(
371 arg_list.variadic,
372 "`VARIADIC` is not allowed in table function call"
373 );
374 self.ensure_table_function_allowed()?;
375 return Ok(TableFunction::new_internal_backfill_progress().into());
376 }
377 if func_name.eq("internal_source_backfill_progress") {
379 reject_syntax!(
380 arg_list.variadic,
381 "`VARIADIC` is not allowed in table function call"
382 );
383 self.ensure_table_function_allowed()?;
384 return Ok(TableFunction::new_internal_source_backfill_progress().into());
385 }
386 if let Some(ref udf) = udf
388 && udf.kind.is_table()
389 {
390 reject_syntax!(
391 arg_list.variadic,
392 "`VARIADIC` is not allowed in table function call"
393 );
394 self.ensure_table_function_allowed()?;
395 if udf.language == "sql" {
396 return self.bind_sql_udf(udf.clone(), args);
397 }
398 return Ok(TableFunction::new_user_defined(udf.clone(), args).into());
399 }
400 if let Ok(function_type) = TableFunctionType::from_str(&func_name) {
402 reject_syntax!(
403 arg_list.variadic,
404 "`VARIADIC` is not allowed in table function call"
405 );
406 self.ensure_table_function_allowed()?;
407 return Ok(TableFunction::new(function_type, args)?.into());
408 }
409 }
410
411 if let Some(ref udf) = udf {
413 assert!(udf.kind.is_scalar());
414 reject_syntax!(
415 arg_list.variadic,
416 "`VARIADIC` is not allowed in user-defined function call"
417 );
418 if udf.language == "sql" {
419 return self.bind_sql_udf(udf.clone(), args);
420 }
421 return Ok(UserDefinedFunction::new(udf.clone(), args).into());
422 }
423
424 self.bind_builtin_scalar_function(&func_name, args, arg_list.variadic)
425 }
426
427 fn validate_and_bind_special_function_params(
428 &mut self,
429 func_name: &str,
430 scalar_as_agg: bool,
431 arg_list: &FunctionArgList,
432 within_group: Option<&OrderByExpr>,
433 filter: Option<&risingwave_sqlparser::ast::Expr>,
434 over: Option<&Window>,
435 ) -> Result<ExprImpl> {
436 assert!(["array_transform", "map_filter"].contains(&func_name));
437
438 reject_syntax!(
439 scalar_as_agg,
440 "`AGGREGATE:` prefix is not allowed for `{}`",
441 func_name
442 );
443 reject_syntax!(
444 !arg_list.is_args_only(),
445 "keywords like `DISTINCT`, `ORDER BY` are not allowed in `{}` argument list",
446 func_name
447 );
448 reject_syntax!(
449 within_group.is_some(),
450 "`WITHIN GROUP` is not allowed in `{}` call",
451 func_name
452 );
453 reject_syntax!(
454 filter.is_some(),
455 "`FILTER` is not allowed in `{}` call",
456 func_name
457 );
458 reject_syntax!(
459 over.is_some(),
460 "`OVER` is not allowed in `{}` call",
461 func_name
462 );
463 if func_name == "array_transform" {
464 self.bind_array_transform(&arg_list.args)
465 } else {
466 self.bind_map_filter(&arg_list.args)
467 }
468 }
469
470 fn bind_array_transform(&mut self, args: &[FunctionArg]) -> Result<ExprImpl> {
471 let [array, lambda] = args else {
472 return Err(ErrorCode::BindError(format!(
473 "`array_transform` expect two inputs `array` and `lambda`, but {} were given",
474 args.len()
475 ))
476 .into());
477 };
478
479 let bound_array = self.bind_function_arg(array)?;
480 let [bound_array] = <[ExprImpl; 1]>::try_from(bound_array).map_err(|bound_array| -> RwError {
481 ErrorCode::BindError(format!("The `array` argument for `array_transform` should be bound to one argument, but {} were got", bound_array.len()))
482 .into()
483 })?;
484
485 let inner_ty = match bound_array.return_type() {
486 DataType::List(ty) => *ty,
487 real_type => return Err(ErrorCode::BindError(format!(
488 "The `array` argument for `array_transform` should be an array, but {} were got",
489 real_type
490 ))
491 .into()),
492 };
493
494 let ast::FunctionArgExpr::Expr(ast::Expr::LambdaFunction {
495 args: lambda_args,
496 body: lambda_body,
497 }) = lambda.get_expr()
498 else {
499 return Err(ErrorCode::BindError(
500 "The `lambda` argument for `array_transform` should be a lambda function"
501 .to_owned(),
502 )
503 .into());
504 };
505
506 let [lambda_arg] = <[Ident; 1]>::try_from(lambda_args).map_err(|args| -> RwError {
507 ErrorCode::BindError(format!(
508 "The `lambda` argument for `array_transform` should be a lambda function with one argument, but {} were given",
509 args.len()
510 ))
511 .into()
512 })?;
513
514 let bound_lambda = self.bind_unary_lambda_function(inner_ty, lambda_arg, *lambda_body)?;
515
516 let lambda_ret_type = bound_lambda.return_type();
517 let transform_ret_type = DataType::List(Box::new(lambda_ret_type));
518
519 Ok(ExprImpl::FunctionCallWithLambda(Box::new(
520 FunctionCallWithLambda::new_unchecked(
521 ExprType::ArrayTransform,
522 vec![bound_array],
523 bound_lambda,
524 transform_ret_type,
525 ),
526 )))
527 }
528
529 fn bind_unary_lambda_function(
530 &mut self,
531 input_ty: DataType,
532 arg: Ident,
533 body: ast::Expr,
534 ) -> Result<ExprImpl> {
535 let lambda_args = HashMap::from([(arg.real_value(), (0usize, input_ty))]);
536 let orig_lambda_args = self.context.lambda_args.replace(lambda_args);
537 let body = self.bind_expr_inner(&body)?;
538 self.context.lambda_args = orig_lambda_args;
539
540 Ok(body)
541 }
542
543 fn bind_map_filter(&mut self, args: &[FunctionArg]) -> Result<ExprImpl> {
544 let [input, lambda] = args else {
545 return Err(ErrorCode::BindError(format!(
546 "`map_filter` requires two arguments (input_map and lambda), got {}",
547 args.len()
548 ))
549 .into());
550 };
551
552 let bound_input = self.bind_function_arg(input)?;
553 let [bound_input] = <[ExprImpl; 1]>::try_from(bound_input).map_err(|e| {
554 ErrorCode::BindError(format!(
555 "Input argument should resolve to single expression, got {}",
556 e.len()
557 ))
558 })?;
559
560 let (key_type, value_type) = match bound_input.return_type() {
561 DataType::Map(map_type) => (map_type.key().clone(), map_type.value().clone()),
562 t => {
563 return Err(
564 ErrorCode::BindError(format!("Input must be Map type, got {}", t)).into(),
565 );
566 }
567 };
568
569 let ast::FunctionArgExpr::Expr(ast::Expr::LambdaFunction {
570 args: lambda_args,
571 body: lambda_body,
572 }) = lambda.get_expr()
573 else {
574 return Err(ErrorCode::BindError(
575 "Second argument must be a lambda function".to_owned(),
576 )
577 .into());
578 };
579
580 let [key_arg, value_arg] = <[Ident; 2]>::try_from(lambda_args).map_err(|args| {
581 ErrorCode::BindError(format!(
582 "Lambda must have exactly two parameters (key, value), got {}",
583 args.len()
584 ))
585 })?;
586
587 let bound_lambda = self.bind_binary_lambda_function(
588 key_arg,
589 key_type.clone(),
590 value_arg,
591 value_type.clone(),
592 *lambda_body,
593 )?;
594
595 let lambda_ret_type = bound_lambda.return_type();
596 if lambda_ret_type != DataType::Boolean {
597 return Err(ErrorCode::BindError(format!(
598 "Lambda must return Boolean type, got {}",
599 lambda_ret_type
600 ))
601 .into());
602 }
603
604 let map_type = MapType::from_kv(key_type, value_type);
605 let return_type = DataType::Map(map_type);
606
607 Ok(ExprImpl::FunctionCallWithLambda(Box::new(
608 FunctionCallWithLambda::new_unchecked(
609 ExprType::MapFilter,
610 vec![bound_input],
611 bound_lambda,
612 return_type,
613 ),
614 )))
615 }
616
617 fn bind_binary_lambda_function(
618 &mut self,
619 first_arg: Ident,
620 first_ty: DataType,
621 second_arg: Ident,
622 second_ty: DataType,
623 body: ast::Expr,
624 ) -> Result<ExprImpl> {
625 let lambda_args = HashMap::from([
626 (first_arg.real_value(), (0usize, first_ty)),
627 (second_arg.real_value(), (1usize, second_ty)),
628 ]);
629
630 let orig_ctx = self.context.lambda_args.replace(lambda_args);
631 let bound_body = self.bind_expr_inner(&body)?;
632 self.context.lambda_args = orig_ctx;
633
634 Ok(bound_body)
635 }
636
637 fn ensure_table_function_allowed(&self) -> Result<()> {
638 if let Some(clause) = self.context.clause {
639 match clause {
640 Clause::JoinOn
641 | Clause::Where
642 | Clause::Having
643 | Clause::Filter
644 | Clause::Values
645 | Clause::Insert
646 | Clause::GeneratedColumn => {
647 return Err(ErrorCode::InvalidInputSyntax(format!(
648 "table functions are not allowed in {}",
649 clause
650 ))
651 .into());
652 }
653 Clause::GroupBy | Clause::From => {}
654 }
655 }
656 Ok(())
657 }
658
659 pub(crate) fn extract_udf_expr(ast: Vec<Statement>) -> Result<AstExpr> {
661 if ast.len() != 1 {
662 return Err(ErrorCode::InvalidInputSyntax(
663 "the query for sql udf should contain only one statement".to_owned(),
664 )
665 .into());
666 }
667
668 let Statement::Query(query) = ast.into_iter().next().unwrap() else {
670 return Err(ErrorCode::InvalidInputSyntax(
671 "invalid function definition, please recheck the syntax".to_owned(),
672 )
673 .into());
674 };
675
676 if let Some(expr) = query.as_single_select_item() {
677 Ok(expr.clone())
679 } else {
680 Ok(AstExpr::Subquery(query))
682 }
683 }
684
685 pub fn bind_sql_udf_inner(
686 &mut self,
687 body: &str,
688 arg_names: &[String],
689 args: Vec<ExprImpl>,
690 ) -> Result<ExprImpl> {
691 let ast = Parser::parse_sql(body)?;
693
694 let stashed_arguments = self.context.sql_udf_arguments.take();
698
699 let mut arguments = HashMap::new();
701 for (i, arg) in args.into_iter().enumerate() {
702 if arg_names[i].is_empty() {
703 arguments.insert(format!("${}", i + 1), arg);
705 } else {
706 arguments.insert(arg_names[i].clone(), arg);
708 }
709 }
710 self.context.sql_udf_arguments = Some(arguments);
711
712 let Ok(expr) = Self::extract_udf_expr(ast) else {
713 return Err(ErrorCode::InvalidInputSyntax(
714 "failed to parse the input query and extract the udf expression, \
715 please recheck the syntax"
716 .to_owned(),
717 )
718 .into());
719 };
720
721 let bind_result = self.bind_expr(&expr);
722 self.context.sql_udf_arguments = stashed_arguments;
724
725 bind_result
726 }
727
728 fn bind_sql_udf(
729 &mut self,
730 func: Arc<FunctionCatalog>,
731 args: Vec<ExprImpl>,
732 ) -> Result<ExprImpl> {
733 let Some(body) = &func.body else {
734 return Err(
735 ErrorCode::InvalidInputSyntax("`body` must exist for sql udf".to_owned()).into(),
736 );
737 };
738
739 self.bind_sql_udf_inner(body, &func.arg_names, args)
740 }
741
742 pub(in crate::binder) fn bind_function_expr_arg(
743 &mut self,
744 arg_expr: &FunctionArgExpr,
745 ) -> Result<Vec<ExprImpl>> {
746 match arg_expr {
747 FunctionArgExpr::Expr(expr) => Ok(vec![self.bind_expr_inner(expr)?]),
748 FunctionArgExpr::QualifiedWildcard(_, _)
749 | FunctionArgExpr::ExprQualifiedWildcard(_, _) => Err(ErrorCode::InvalidInputSyntax(
750 format!("unexpected wildcard {}", arg_expr),
751 )
752 .into()),
753 FunctionArgExpr::Wildcard(None) => Ok(vec![]),
754 FunctionArgExpr::Wildcard(Some(_)) => unreachable!(),
755 }
756 }
757
758 pub(in crate::binder) fn bind_function_arg(
759 &mut self,
760 arg: &FunctionArg,
761 ) -> Result<Vec<ExprImpl>> {
762 match arg {
763 FunctionArg::Unnamed(expr) => self.bind_function_expr_arg(expr),
764 FunctionArg::Named { .. } => todo!(),
765 }
766 }
767}