1use std::collections::{HashMap, HashSet};
16use std::str::FromStr;
17use std::sync::Arc;
18
19use anyhow::Context;
20use itertools::Itertools;
21use risingwave_common::acl::AclMode;
22use risingwave_common::bail_not_implemented;
23use risingwave_common::catalog::INFORMATION_SCHEMA_SCHEMA_NAME;
24use risingwave_common::types::{DataType, MapType};
25use risingwave_expr::aggregate::AggType;
26use risingwave_expr::window_function::WindowFuncKind;
27use risingwave_pb::user::grant_privilege::PbObject;
28use risingwave_sqlparser::ast::{
29 self, Expr as AstExpr, Function, FunctionArg, FunctionArgExpr, FunctionArgList, Ident,
30 OrderByExpr, Statement, Window,
31};
32use risingwave_sqlparser::parser::Parser;
33
34use crate::binder::Binder;
35use crate::binder::bind_context::Clause;
36use crate::catalog::function_catalog::FunctionCatalog;
37use crate::error::{ErrorCode, Result, RwError};
38use crate::expr::{
39 Expr, ExprImpl, ExprType, FunctionCallWithLambda, InputRef, TableFunction, TableFunctionType,
40 UserDefinedFunction,
41};
42use crate::handler::privilege::ObjectCheckItem;
43
44mod aggregate;
45mod builtin_scalar;
46mod window;
47
48const SYS_FUNCTION_WITHOUT_ARGS: &[&str] = &[
50 "session_user",
51 "user",
52 "current_user",
53 "current_role",
54 "current_catalog",
55 "current_schema",
56 "current_timestamp",
57];
58
59pub(super) fn is_sys_function_without_args(ident: &Ident) -> bool {
60 SYS_FUNCTION_WITHOUT_ARGS
61 .iter()
62 .any(|e| ident.real_value().as_str() == *e && ident.quote_style().is_none())
63}
64
65macro_rules! reject_syntax {
66 ($pred:expr, $msg:expr) => {
67 if $pred {
68 return Err(ErrorCode::InvalidInputSyntax($msg.to_string()).into());
69 }
70 };
71
72 ($pred:expr, $fmt:expr, $($arg:tt)*) => {
73 if $pred {
74 return Err(ErrorCode::InvalidInputSyntax(
75 format!($fmt, $($arg)*)
76 ).into());
77 }
78 };
79}
80
81impl Binder {
82 pub(in crate::binder) fn bind_function(
83 &mut self,
84 Function {
85 scalar_as_agg,
86 name,
87 arg_list,
88 within_group,
89 filter,
90 over,
91 }: &Function,
92 ) -> Result<ExprImpl> {
93 let (schema_name, func_name) = match name.0.as_slice() {
94 [name] => (None, name.real_value()),
95 [schema, name] => {
96 let schema_name = schema.real_value();
97 let func_name = if schema_name == INFORMATION_SCHEMA_SCHEMA_NAME {
98 let function_name = name.real_value();
102 if function_name != "_pg_expandarray" {
103 bail_not_implemented!(
104 issue = 12422,
105 "Unsupported function name under schema: {}",
106 schema_name
107 );
108 }
109 function_name
110 } else {
111 name.real_value()
112 };
113 (Some(schema_name), func_name)
114 }
115 [database, schema, name] => {
116 let database_name = database.real_value();
118 if database_name != self.db_name {
119 return Err(ErrorCode::BindError(format!(
120 "Cross-database function call is not supported: {}",
121 name
122 ))
123 .into());
124 }
125 let schema_name = schema.real_value();
126 let func_name = name.real_value();
127 (Some(schema_name), func_name)
128 }
129 _ => bail_not_implemented!(issue = 112, "qualified function {}", name),
130 };
131
132 if func_name == "obj_description" || func_name == "col_description" {
139 return Ok(ExprImpl::literal_varchar("".to_owned()));
140 }
141
142 if func_name == "array_transform" || func_name == "map_filter" {
144 return self.validate_and_bind_special_function_params(
145 &func_name,
146 *scalar_as_agg,
147 arg_list,
148 within_group.as_deref(),
149 filter.as_deref(),
150 over.as_ref(),
151 );
152 }
153
154 let mut args: Vec<_> = arg_list
155 .args
156 .iter()
157 .map(|arg| self.bind_function_arg(arg))
158 .flatten_ok()
159 .try_collect()?;
160
161 let mut referred_udfs = HashSet::new();
162
163 let wrapped_agg_type = if *scalar_as_agg {
164 let mut array_args = args
167 .iter()
168 .enumerate()
169 .map(|(i, expr)| InputRef::new(i, DataType::list(expr.return_type())).into())
170 .collect_vec();
171 let schema_path = self.bind_schema_path(schema_name.as_deref());
172 let scalar_func_expr = if let Ok((func, _)) = self.catalog.get_function_by_name_inputs(
173 &self.db_name,
174 schema_path,
175 &func_name,
176 &mut array_args,
177 ) {
178 referred_udfs.insert(func.id);
180 self.check_privilege(
181 ObjectCheckItem::new(
182 func.owner,
183 AclMode::Execute,
184 func.name.clone(),
185 PbObject::FunctionId(func.id.function_id()),
186 ),
187 self.database_id,
188 )?;
189
190 if !func.kind.is_scalar() {
191 return Err(ErrorCode::InvalidInputSyntax(
192 "expect a scalar function after `AGGREGATE:`".to_owned(),
193 )
194 .into());
195 }
196
197 if func.language == "sql" {
198 self.bind_sql_udf(func.clone(), array_args)?
199 } else {
200 UserDefinedFunction::new(func.clone(), array_args).into()
201 }
202 } else {
203 self.bind_builtin_scalar_function(&func_name, array_args, arg_list.variadic)?
204 };
205
206 let expr_node = match scalar_func_expr.try_to_expr_proto() {
209 Ok(expr_node) => expr_node,
210 Err(e) => {
211 return Err(ErrorCode::InvalidInputSyntax(format!(
212 "function {func_name} cannot be used after `AGGREGATE:`: {e}",
213 ))
214 .into());
215 }
216 };
217
218 Some(AggType::WrapScalar(expr_node))
220 } else {
221 None
222 };
223
224 let schema_path = self.bind_schema_path(schema_name.as_deref());
225 let udf = if wrapped_agg_type.is_none()
226 && let Ok((func, _)) = self.catalog.get_function_by_name_inputs(
227 &self.db_name,
228 schema_path,
229 &func_name,
230 &mut args,
231 ) {
232 referred_udfs.insert(func.id);
234 self.check_privilege(
235 ObjectCheckItem::new(
236 func.owner,
237 AclMode::Execute,
238 func.name.clone(),
239 PbObject::FunctionId(func.id.function_id()),
240 ),
241 self.database_id,
242 )?;
243 Some(func.clone())
244 } else {
245 None
246 };
247
248 self.included_udfs.extend(referred_udfs);
249
250 let agg_type = if wrapped_agg_type.is_some() {
251 wrapped_agg_type
252 } else if let Some(ref udf) = udf
253 && udf.kind.is_aggregate()
254 {
255 assert_ne!(udf.language, "sql", "SQL UDAF is not supported yet");
256 Some(AggType::UserDefined(udf.as_ref().into()))
257 } else {
258 AggType::from_str(&func_name).ok()
259 };
260
261 if let Some(over) = over {
263 reject_syntax!(
264 arg_list.distinct,
265 "`DISTINCT` is not allowed in window function call"
266 );
267 reject_syntax!(
268 arg_list.variadic,
269 "`VARIADIC` is not allowed in window function call"
270 );
271 reject_syntax!(
272 !arg_list.order_by.is_empty(),
273 "`ORDER BY` is not allowed in window function call argument list"
274 );
275 reject_syntax!(
276 within_group.is_some(),
277 "`WITHIN GROUP` is not allowed in window function call"
278 );
279
280 let kind = if let Some(agg_type) = agg_type {
281 WindowFuncKind::Aggregate(agg_type)
283 } else if let Ok(kind) = WindowFuncKind::from_str(&func_name) {
284 kind
285 } else {
286 bail_not_implemented!(issue = 8961, "Unrecognized window function: {}", func_name);
287 };
288 return self.bind_window_function(
289 kind,
290 args,
291 arg_list.ignore_nulls,
292 filter.as_deref(),
293 over,
294 );
295 }
296
297 reject_syntax!(
299 arg_list.ignore_nulls,
300 "`IGNORE NULLS` is not allowed in aggregate/scalar/table function call"
301 );
302
303 if let Some(agg_type) = agg_type {
305 reject_syntax!(
306 arg_list.variadic,
307 "`VARIADIC` is not allowed in aggregate function call"
308 );
309 return self.bind_aggregate_function(
310 agg_type,
311 arg_list.distinct,
312 args,
313 &arg_list.order_by,
314 within_group.as_deref(),
315 filter.as_deref(),
316 );
317 }
318
319 reject_syntax!(
321 arg_list.distinct,
322 "`DISTINCT` is not allowed in scalar/table function call"
323 );
324 reject_syntax!(
325 !arg_list.order_by.is_empty(),
326 "`ORDER BY` is not allowed in scalar/table function call"
327 );
328 reject_syntax!(
329 within_group.is_some(),
330 "`WITHIN GROUP` is not allowed in scalar/table function call"
331 );
332 reject_syntax!(
333 filter.is_some(),
334 "`FILTER` is not allowed in scalar/table function call"
335 );
336
337 {
339 if func_name.eq_ignore_ascii_case("file_scan") {
341 reject_syntax!(
342 arg_list.variadic,
343 "`VARIADIC` is not allowed in table function call"
344 );
345 self.ensure_table_function_allowed()?;
346 return Ok(TableFunction::new_file_scan(args)?.into());
347 }
348 if func_name.eq("postgres_query") {
350 reject_syntax!(
351 arg_list.variadic,
352 "`VARIADIC` is not allowed in table function call"
353 );
354 self.ensure_table_function_allowed()?;
355 return Ok(TableFunction::new_postgres_query(
356 &self.catalog,
357 &self.db_name,
358 self.bind_schema_path(schema_name.as_deref()),
359 args,
360 )
361 .context("postgres_query error")?
362 .into());
363 }
364 if func_name.eq("mysql_query") {
366 reject_syntax!(
367 arg_list.variadic,
368 "`VARIADIC` is not allowed in table function call"
369 );
370 self.ensure_table_function_allowed()?;
371 return Ok(TableFunction::new_mysql_query(
372 &self.catalog,
373 &self.db_name,
374 self.bind_schema_path(schema_name.as_deref()),
375 args,
376 )
377 .context("mysql_query error")?
378 .into());
379 }
380 if func_name.eq("internal_backfill_progress") {
382 reject_syntax!(
383 arg_list.variadic,
384 "`VARIADIC` is not allowed in table function call"
385 );
386 self.ensure_table_function_allowed()?;
387 return Ok(TableFunction::new_internal_backfill_progress().into());
388 }
389 if func_name.eq("internal_source_backfill_progress") {
391 reject_syntax!(
392 arg_list.variadic,
393 "`VARIADIC` is not allowed in table function call"
394 );
395 self.ensure_table_function_allowed()?;
396 return Ok(TableFunction::new_internal_source_backfill_progress().into());
397 }
398 if func_name.eq("internal_get_channel_delta_stats") {
400 reject_syntax!(
401 arg_list.variadic,
402 "`VARIADIC` is not allowed in table function call"
403 );
404 self.ensure_table_function_allowed()?;
405
406 return Ok(TableFunction::new_internal_get_channel_delta_stats(args).into());
407 }
408 if let Some(ref udf) = udf
410 && udf.kind.is_table()
411 {
412 reject_syntax!(
413 arg_list.variadic,
414 "`VARIADIC` is not allowed in table function call"
415 );
416 self.ensure_table_function_allowed()?;
417 if udf.language == "sql" {
418 return self.bind_sql_udf(udf.clone(), args);
419 }
420 return Ok(TableFunction::new_user_defined(udf.clone(), args).into());
421 }
422 if let Ok(function_type) = TableFunctionType::from_str(&func_name) {
424 reject_syntax!(
425 arg_list.variadic,
426 "`VARIADIC` is not allowed in table function call"
427 );
428 self.ensure_table_function_allowed()?;
429 return Ok(TableFunction::new(function_type, args)?.into());
430 }
431 }
432
433 if let Some(ref udf) = udf {
435 assert!(udf.kind.is_scalar());
436 reject_syntax!(
437 arg_list.variadic,
438 "`VARIADIC` is not allowed in user-defined function call"
439 );
440 if udf.language == "sql" {
441 return self.bind_sql_udf(udf.clone(), args);
442 }
443 return Ok(UserDefinedFunction::new(udf.clone(), args).into());
444 }
445
446 self.bind_builtin_scalar_function(&func_name, args, arg_list.variadic)
447 }
448
449 fn validate_and_bind_special_function_params(
450 &mut self,
451 func_name: &str,
452 scalar_as_agg: bool,
453 arg_list: &FunctionArgList,
454 within_group: Option<&OrderByExpr>,
455 filter: Option<&risingwave_sqlparser::ast::Expr>,
456 over: Option<&Window>,
457 ) -> Result<ExprImpl> {
458 assert!(["array_transform", "map_filter"].contains(&func_name));
459
460 reject_syntax!(
461 scalar_as_agg,
462 "`AGGREGATE:` prefix is not allowed for `{}`",
463 func_name
464 );
465 reject_syntax!(
466 !arg_list.is_args_only(),
467 "keywords like `DISTINCT`, `ORDER BY` are not allowed in `{}` argument list",
468 func_name
469 );
470 reject_syntax!(
471 within_group.is_some(),
472 "`WITHIN GROUP` is not allowed in `{}` call",
473 func_name
474 );
475 reject_syntax!(
476 filter.is_some(),
477 "`FILTER` is not allowed in `{}` call",
478 func_name
479 );
480 reject_syntax!(
481 over.is_some(),
482 "`OVER` is not allowed in `{}` call",
483 func_name
484 );
485 if func_name == "array_transform" {
486 self.bind_array_transform(&arg_list.args)
487 } else {
488 self.bind_map_filter(&arg_list.args)
489 }
490 }
491
492 fn bind_array_transform(&mut self, args: &[FunctionArg]) -> Result<ExprImpl> {
493 let [array, lambda] = args else {
494 return Err(ErrorCode::BindError(format!(
495 "`array_transform` expect two inputs `array` and `lambda`, but {} were given",
496 args.len()
497 ))
498 .into());
499 };
500
501 let bound_array = self.bind_function_arg(array)?;
502 let [bound_array] = <[ExprImpl; 1]>::try_from(bound_array).map_err(|bound_array| -> RwError {
503 ErrorCode::BindError(format!("The `array` argument for `array_transform` should be bound to one argument, but {} were got", bound_array.len()))
504 .into()
505 })?;
506
507 let inner_ty = match bound_array.return_type() {
508 DataType::List(ty) => ty.into_elem(),
509 real_type => return Err(ErrorCode::BindError(format!(
510 "The `array` argument for `array_transform` should be an array, but {} were got",
511 real_type
512 ))
513 .into()),
514 };
515
516 let ast::FunctionArgExpr::Expr(ast::Expr::LambdaFunction {
517 args: lambda_args,
518 body: lambda_body,
519 }) = lambda.get_expr()
520 else {
521 return Err(ErrorCode::BindError(
522 "The `lambda` argument for `array_transform` should be a lambda function"
523 .to_owned(),
524 )
525 .into());
526 };
527
528 let [lambda_arg] = <[Ident; 1]>::try_from(lambda_args).map_err(|args| -> RwError {
529 ErrorCode::BindError(format!(
530 "The `lambda` argument for `array_transform` should be a lambda function with one argument, but {} were given",
531 args.len()
532 ))
533 .into()
534 })?;
535
536 let bound_lambda = self.bind_unary_lambda_function(inner_ty, lambda_arg, *lambda_body)?;
537
538 let lambda_ret_type = bound_lambda.return_type();
539 let transform_ret_type = DataType::list(lambda_ret_type);
540
541 Ok(ExprImpl::FunctionCallWithLambda(Box::new(
542 FunctionCallWithLambda::new_unchecked(
543 ExprType::ArrayTransform,
544 vec![bound_array],
545 bound_lambda,
546 transform_ret_type,
547 ),
548 )))
549 }
550
551 fn bind_unary_lambda_function(
552 &mut self,
553 input_ty: DataType,
554 arg: Ident,
555 body: ast::Expr,
556 ) -> Result<ExprImpl> {
557 let lambda_args = HashMap::from([(arg.real_value(), (0usize, input_ty))]);
558 let orig_lambda_args = self.context.lambda_args.replace(lambda_args);
559 let body = self.bind_expr_inner(&body)?;
560 self.context.lambda_args = orig_lambda_args;
561
562 Ok(body)
563 }
564
565 fn bind_map_filter(&mut self, args: &[FunctionArg]) -> Result<ExprImpl> {
566 let [input, lambda] = args else {
567 return Err(ErrorCode::BindError(format!(
568 "`map_filter` requires two arguments (input_map and lambda), got {}",
569 args.len()
570 ))
571 .into());
572 };
573
574 let bound_input = self.bind_function_arg(input)?;
575 let [bound_input] = <[ExprImpl; 1]>::try_from(bound_input).map_err(|e| {
576 ErrorCode::BindError(format!(
577 "Input argument should resolve to single expression, got {}",
578 e.len()
579 ))
580 })?;
581
582 let (key_type, value_type) = match bound_input.return_type() {
583 DataType::Map(map_type) => (map_type.key().clone(), map_type.value().clone()),
584 t => {
585 return Err(
586 ErrorCode::BindError(format!("Input must be Map type, got {}", t)).into(),
587 );
588 }
589 };
590
591 let ast::FunctionArgExpr::Expr(ast::Expr::LambdaFunction {
592 args: lambda_args,
593 body: lambda_body,
594 }) = lambda.get_expr()
595 else {
596 return Err(ErrorCode::BindError(
597 "Second argument must be a lambda function".to_owned(),
598 )
599 .into());
600 };
601
602 let [key_arg, value_arg] = <[Ident; 2]>::try_from(lambda_args).map_err(|args| {
603 ErrorCode::BindError(format!(
604 "Lambda must have exactly two parameters (key, value), got {}",
605 args.len()
606 ))
607 })?;
608
609 let bound_lambda = self.bind_binary_lambda_function(
610 key_arg,
611 key_type.clone(),
612 value_arg,
613 value_type.clone(),
614 *lambda_body,
615 )?;
616
617 let lambda_ret_type = bound_lambda.return_type();
618 if lambda_ret_type != DataType::Boolean {
619 return Err(ErrorCode::BindError(format!(
620 "Lambda must return Boolean type, got {}",
621 lambda_ret_type
622 ))
623 .into());
624 }
625
626 let map_type = MapType::from_kv(key_type, value_type);
627 let return_type = DataType::Map(map_type);
628
629 Ok(ExprImpl::FunctionCallWithLambda(Box::new(
630 FunctionCallWithLambda::new_unchecked(
631 ExprType::MapFilter,
632 vec![bound_input],
633 bound_lambda,
634 return_type,
635 ),
636 )))
637 }
638
639 fn bind_binary_lambda_function(
640 &mut self,
641 first_arg: Ident,
642 first_ty: DataType,
643 second_arg: Ident,
644 second_ty: DataType,
645 body: ast::Expr,
646 ) -> Result<ExprImpl> {
647 let lambda_args = HashMap::from([
648 (first_arg.real_value(), (0usize, first_ty)),
649 (second_arg.real_value(), (1usize, second_ty)),
650 ]);
651
652 let orig_ctx = self.context.lambda_args.replace(lambda_args);
653 let bound_body = self.bind_expr_inner(&body)?;
654 self.context.lambda_args = orig_ctx;
655
656 Ok(bound_body)
657 }
658
659 fn ensure_table_function_allowed(&self) -> Result<()> {
660 if let Some(clause) = self.context.clause {
661 match clause {
662 Clause::JoinOn
663 | Clause::Where
664 | Clause::Having
665 | Clause::Filter
666 | Clause::Values
667 | Clause::Insert
668 | Clause::GeneratedColumn => {
669 return Err(ErrorCode::InvalidInputSyntax(format!(
670 "table functions are not allowed in {}",
671 clause
672 ))
673 .into());
674 }
675 Clause::GroupBy | Clause::From => {}
676 }
677 }
678 Ok(())
679 }
680
681 pub(crate) fn extract_udf_expr(ast: Vec<Statement>) -> Result<AstExpr> {
683 if ast.len() != 1 {
684 return Err(ErrorCode::InvalidInputSyntax(
685 "the query for sql udf should contain only one statement".to_owned(),
686 )
687 .into());
688 }
689
690 let Statement::Query(query) = ast.into_iter().next().unwrap() else {
692 return Err(ErrorCode::InvalidInputSyntax(
693 "invalid function definition, please recheck the syntax".to_owned(),
694 )
695 .into());
696 };
697
698 if let Some(expr) = query.as_single_select_item() {
699 Ok(expr.clone())
701 } else {
702 Ok(AstExpr::Subquery(query))
704 }
705 }
706
707 pub fn bind_sql_udf_inner(
708 &mut self,
709 body: &str,
710 arg_names: &[String],
711 args: Vec<ExprImpl>,
712 ) -> Result<ExprImpl> {
713 let ast = Parser::parse_sql(body)?;
715
716 let stashed_arguments = self.context.sql_udf_arguments.take();
720
721 let mut arguments = HashMap::new();
723 for (i, arg) in args.into_iter().enumerate() {
724 if arg_names[i].is_empty() {
725 arguments.insert(format!("${}", i + 1), arg);
727 } else {
728 arguments.insert(arg_names[i].clone(), arg);
730 }
731 }
732 self.context.sql_udf_arguments = Some(arguments);
733
734 let Ok(expr) = Self::extract_udf_expr(ast) else {
735 return Err(ErrorCode::InvalidInputSyntax(
736 "failed to parse the input query and extract the udf expression, \
737 please recheck the syntax"
738 .to_owned(),
739 )
740 .into());
741 };
742
743 let bind_result = self.bind_expr(&expr);
744 self.context.sql_udf_arguments = stashed_arguments;
746
747 bind_result
748 }
749
750 fn bind_sql_udf(
751 &mut self,
752 func: Arc<FunctionCatalog>,
753 args: Vec<ExprImpl>,
754 ) -> Result<ExprImpl> {
755 let Some(body) = &func.body else {
756 return Err(
757 ErrorCode::InvalidInputSyntax("`body` must exist for sql udf".to_owned()).into(),
758 );
759 };
760
761 self.bind_sql_udf_inner(body, &func.arg_names, args)
762 }
763
764 pub(in crate::binder) fn bind_function_expr_arg(
765 &mut self,
766 arg_expr: &FunctionArgExpr,
767 ) -> Result<Vec<ExprImpl>> {
768 match arg_expr {
769 FunctionArgExpr::Expr(expr) => Ok(vec![self.bind_expr_inner(expr)?]),
770 FunctionArgExpr::QualifiedWildcard(_, _)
771 | FunctionArgExpr::ExprQualifiedWildcard(_, _) => Err(ErrorCode::InvalidInputSyntax(
772 format!("unexpected wildcard {}", arg_expr),
773 )
774 .into()),
775 FunctionArgExpr::Wildcard(None) => Ok(vec![]),
776 FunctionArgExpr::Wildcard(Some(_)) => unreachable!(),
777 }
778 }
779
780 pub(in crate::binder) fn bind_function_arg(
781 &mut self,
782 arg: &FunctionArg,
783 ) -> Result<Vec<ExprImpl>> {
784 match arg {
785 FunctionArg::Unnamed(expr) => self.bind_function_expr_arg(expr),
786 FunctionArg::Named { .. } => todo!(),
787 }
788 }
789}