1use std::collections::{HashMap, HashSet};
16use std::str::FromStr;
17use std::sync::Arc;
18
19use anyhow::Context;
20use itertools::Itertools;
21use risingwave_common::acl::AclMode;
22use risingwave_common::bail_not_implemented;
23use risingwave_common::catalog::INFORMATION_SCHEMA_SCHEMA_NAME;
24use risingwave_common::types::{DataType, MapType};
25use risingwave_expr::aggregate::AggType;
26use risingwave_expr::window_function::WindowFuncKind;
27use risingwave_pb::user::grant_privilege::PbObject;
28use risingwave_sqlparser::ast::{
29 self, Expr as AstExpr, Function, FunctionArg, FunctionArgExpr, FunctionArgList, Ident,
30 OrderByExpr, Statement, Window,
31};
32use risingwave_sqlparser::parser::Parser;
33
34use crate::binder::Binder;
35use crate::binder::bind_context::Clause;
36use crate::catalog::function_catalog::FunctionCatalog;
37use crate::error::{ErrorCode, Result, RwError};
38use crate::expr::{
39 Expr, ExprImpl, ExprType, FunctionCallWithLambda, InputRef, TableFunction, TableFunctionType,
40 UserDefinedFunction,
41};
42use crate::handler::privilege::ObjectCheckItem;
43
44mod aggregate;
45mod builtin_scalar;
46mod window;
47
48const SYS_FUNCTION_WITHOUT_ARGS: &[&str] = &[
50 "session_user",
51 "user",
52 "current_user",
53 "current_role",
54 "current_catalog",
55 "current_schema",
56 "current_timestamp",
57];
58
59pub(super) fn is_sys_function_without_args(ident: &Ident) -> bool {
60 SYS_FUNCTION_WITHOUT_ARGS
61 .iter()
62 .any(|e| ident.real_value().as_str() == *e && ident.quote_style().is_none())
63}
64
65macro_rules! reject_syntax {
66 ($pred:expr, $msg:expr) => {
67 if $pred {
68 return Err(ErrorCode::InvalidInputSyntax($msg.to_string()).into());
69 }
70 };
71
72 ($pred:expr, $fmt:expr, $($arg:tt)*) => {
73 if $pred {
74 return Err(ErrorCode::InvalidInputSyntax(
75 format!($fmt, $($arg)*)
76 ).into());
77 }
78 };
79}
80
81impl Binder {
82 pub(in crate::binder) fn bind_function(
83 &mut self,
84 Function {
85 scalar_as_agg,
86 name,
87 arg_list,
88 within_group,
89 filter,
90 over,
91 }: &Function,
92 ) -> Result<ExprImpl> {
93 let (schema_name, func_name) = match name.0.as_slice() {
94 [name] => (None, name.real_value()),
95 [schema, name] => {
96 let schema_name = schema.real_value();
97 let func_name = if schema_name == INFORMATION_SCHEMA_SCHEMA_NAME {
98 let function_name = name.real_value();
102 if function_name != "_pg_expandarray" {
103 bail_not_implemented!(
104 issue = 12422,
105 "Unsupported function name under schema: {}",
106 schema_name
107 );
108 }
109 function_name
110 } else {
111 name.real_value()
112 };
113 (Some(schema_name), func_name)
114 }
115 [database, schema, name] => {
116 let database_name = database.real_value();
118 if database_name != self.db_name {
119 return Err(ErrorCode::BindError(format!(
120 "Cross-database function call is not supported: {}",
121 name
122 ))
123 .into());
124 }
125 let schema_name = schema.real_value();
126 let func_name = name.real_value();
127 (Some(schema_name), func_name)
128 }
129 _ => bail_not_implemented!(issue = 112, "qualified function {}", name),
130 };
131
132 if func_name == "obj_description" || func_name == "col_description" {
139 return Ok(ExprImpl::literal_varchar("".to_owned()));
140 }
141
142 if func_name == "array_transform" || func_name == "map_filter" {
144 return self.validate_and_bind_special_function_params(
145 &func_name,
146 *scalar_as_agg,
147 arg_list,
148 within_group.as_deref(),
149 filter.as_deref(),
150 over.as_ref(),
151 );
152 }
153
154 let mut args: Vec<_> = arg_list
155 .args
156 .iter()
157 .map(|arg| self.bind_function_arg(arg))
158 .flatten_ok()
159 .try_collect()?;
160
161 let mut referred_udfs = HashSet::new();
162
163 let wrapped_agg_type = if *scalar_as_agg {
164 let mut array_args = args
167 .iter()
168 .enumerate()
169 .map(|(i, expr)| {
170 InputRef::new(i, DataType::List(Box::new(expr.return_type()))).into()
171 })
172 .collect_vec();
173 let schema_path = self.bind_schema_path(schema_name.as_deref());
174 let scalar_func_expr = if let Ok((func, _)) = self.catalog.get_function_by_name_inputs(
175 &self.db_name,
176 schema_path,
177 &func_name,
178 &mut array_args,
179 ) {
180 referred_udfs.insert(func.id);
182 self.check_privilege(
183 ObjectCheckItem::new(
184 func.owner,
185 AclMode::Execute,
186 func.name.clone(),
187 PbObject::FunctionId(func.id.function_id()),
188 ),
189 self.database_id,
190 )?;
191
192 if !func.kind.is_scalar() {
193 return Err(ErrorCode::InvalidInputSyntax(
194 "expect a scalar function after `AGGREGATE:`".to_owned(),
195 )
196 .into());
197 }
198
199 if func.language == "sql" {
200 self.bind_sql_udf(func.clone(), array_args)?
201 } else {
202 UserDefinedFunction::new(func.clone(), array_args).into()
203 }
204 } else {
205 self.bind_builtin_scalar_function(&func_name, array_args, arg_list.variadic)?
206 };
207
208 let expr_node = match scalar_func_expr.try_to_expr_proto() {
211 Ok(expr_node) => expr_node,
212 Err(e) => {
213 return Err(ErrorCode::InvalidInputSyntax(format!(
214 "function {func_name} cannot be used after `AGGREGATE:`: {e}",
215 ))
216 .into());
217 }
218 };
219
220 Some(AggType::WrapScalar(expr_node))
222 } else {
223 None
224 };
225
226 let schema_path = self.bind_schema_path(schema_name.as_deref());
227 let udf = if wrapped_agg_type.is_none()
228 && let Ok((func, _)) = self.catalog.get_function_by_name_inputs(
229 &self.db_name,
230 schema_path,
231 &func_name,
232 &mut args,
233 ) {
234 referred_udfs.insert(func.id);
236 self.check_privilege(
237 ObjectCheckItem::new(
238 func.owner,
239 AclMode::Execute,
240 func.name.clone(),
241 PbObject::FunctionId(func.id.function_id()),
242 ),
243 self.database_id,
244 )?;
245 Some(func.clone())
246 } else {
247 None
248 };
249
250 self.included_udfs.extend(referred_udfs);
251
252 let agg_type = if wrapped_agg_type.is_some() {
253 wrapped_agg_type
254 } else if let Some(ref udf) = udf
255 && udf.kind.is_aggregate()
256 {
257 assert_ne!(udf.language, "sql", "SQL UDAF is not supported yet");
258 Some(AggType::UserDefined(udf.as_ref().into()))
259 } else {
260 AggType::from_str(&func_name).ok()
261 };
262
263 if let Some(over) = over {
265 reject_syntax!(
266 arg_list.distinct,
267 "`DISTINCT` is not allowed in window function call"
268 );
269 reject_syntax!(
270 arg_list.variadic,
271 "`VARIADIC` is not allowed in window function call"
272 );
273 reject_syntax!(
274 !arg_list.order_by.is_empty(),
275 "`ORDER BY` is not allowed in window function call argument list"
276 );
277 reject_syntax!(
278 within_group.is_some(),
279 "`WITHIN GROUP` is not allowed in window function call"
280 );
281
282 let kind = if let Some(agg_type) = agg_type {
283 WindowFuncKind::Aggregate(agg_type)
285 } else if let Ok(kind) = WindowFuncKind::from_str(&func_name) {
286 kind
287 } else {
288 bail_not_implemented!(issue = 8961, "Unrecognized window function: {}", func_name);
289 };
290 return self.bind_window_function(
291 kind,
292 args,
293 arg_list.ignore_nulls,
294 filter.as_deref(),
295 over,
296 );
297 }
298
299 reject_syntax!(
301 arg_list.ignore_nulls,
302 "`IGNORE NULLS` is not allowed in aggregate/scalar/table function call"
303 );
304
305 if let Some(agg_type) = agg_type {
307 reject_syntax!(
308 arg_list.variadic,
309 "`VARIADIC` is not allowed in aggregate function call"
310 );
311 return self.bind_aggregate_function(
312 agg_type,
313 arg_list.distinct,
314 args,
315 &arg_list.order_by,
316 within_group.as_deref(),
317 filter.as_deref(),
318 );
319 }
320
321 reject_syntax!(
323 arg_list.distinct,
324 "`DISTINCT` is not allowed in scalar/table function call"
325 );
326 reject_syntax!(
327 !arg_list.order_by.is_empty(),
328 "`ORDER BY` is not allowed in scalar/table function call"
329 );
330 reject_syntax!(
331 within_group.is_some(),
332 "`WITHIN GROUP` is not allowed in scalar/table function call"
333 );
334 reject_syntax!(
335 filter.is_some(),
336 "`FILTER` is not allowed in scalar/table function call"
337 );
338
339 {
341 if func_name.eq_ignore_ascii_case("file_scan") {
343 reject_syntax!(
344 arg_list.variadic,
345 "`VARIADIC` is not allowed in table function call"
346 );
347 self.ensure_table_function_allowed()?;
348 return Ok(TableFunction::new_file_scan(args)?.into());
349 }
350 if func_name.eq("postgres_query") {
352 reject_syntax!(
353 arg_list.variadic,
354 "`VARIADIC` is not allowed in table function call"
355 );
356 self.ensure_table_function_allowed()?;
357 return Ok(TableFunction::new_postgres_query(
358 &self.catalog,
359 &self.db_name,
360 self.bind_schema_path(schema_name.as_deref()),
361 args,
362 )
363 .context("postgres_query error")?
364 .into());
365 }
366 if func_name.eq("mysql_query") {
368 reject_syntax!(
369 arg_list.variadic,
370 "`VARIADIC` is not allowed in table function call"
371 );
372 self.ensure_table_function_allowed()?;
373 return Ok(TableFunction::new_mysql_query(
374 &self.catalog,
375 &self.db_name,
376 self.bind_schema_path(schema_name.as_deref()),
377 args,
378 )
379 .context("mysql_query error")?
380 .into());
381 }
382 if func_name.eq("internal_backfill_progress") {
384 reject_syntax!(
385 arg_list.variadic,
386 "`VARIADIC` is not allowed in table function call"
387 );
388 self.ensure_table_function_allowed()?;
389 return Ok(TableFunction::new_internal_backfill_progress().into());
390 }
391 if func_name.eq("internal_source_backfill_progress") {
393 reject_syntax!(
394 arg_list.variadic,
395 "`VARIADIC` is not allowed in table function call"
396 );
397 self.ensure_table_function_allowed()?;
398 return Ok(TableFunction::new_internal_source_backfill_progress().into());
399 }
400 if let Some(ref udf) = udf
402 && udf.kind.is_table()
403 {
404 reject_syntax!(
405 arg_list.variadic,
406 "`VARIADIC` is not allowed in table function call"
407 );
408 self.ensure_table_function_allowed()?;
409 if udf.language == "sql" {
410 return self.bind_sql_udf(udf.clone(), args);
411 }
412 return Ok(TableFunction::new_user_defined(udf.clone(), args).into());
413 }
414 if let Ok(function_type) = TableFunctionType::from_str(&func_name) {
416 reject_syntax!(
417 arg_list.variadic,
418 "`VARIADIC` is not allowed in table function call"
419 );
420 self.ensure_table_function_allowed()?;
421 return Ok(TableFunction::new(function_type, args)?.into());
422 }
423 }
424
425 if let Some(ref udf) = udf {
427 assert!(udf.kind.is_scalar());
428 reject_syntax!(
429 arg_list.variadic,
430 "`VARIADIC` is not allowed in user-defined function call"
431 );
432 if udf.language == "sql" {
433 return self.bind_sql_udf(udf.clone(), args);
434 }
435 return Ok(UserDefinedFunction::new(udf.clone(), args).into());
436 }
437
438 self.bind_builtin_scalar_function(&func_name, args, arg_list.variadic)
439 }
440
441 fn validate_and_bind_special_function_params(
442 &mut self,
443 func_name: &str,
444 scalar_as_agg: bool,
445 arg_list: &FunctionArgList,
446 within_group: Option<&OrderByExpr>,
447 filter: Option<&risingwave_sqlparser::ast::Expr>,
448 over: Option<&Window>,
449 ) -> Result<ExprImpl> {
450 assert!(["array_transform", "map_filter"].contains(&func_name));
451
452 reject_syntax!(
453 scalar_as_agg,
454 "`AGGREGATE:` prefix is not allowed for `{}`",
455 func_name
456 );
457 reject_syntax!(
458 !arg_list.is_args_only(),
459 "keywords like `DISTINCT`, `ORDER BY` are not allowed in `{}` argument list",
460 func_name
461 );
462 reject_syntax!(
463 within_group.is_some(),
464 "`WITHIN GROUP` is not allowed in `{}` call",
465 func_name
466 );
467 reject_syntax!(
468 filter.is_some(),
469 "`FILTER` is not allowed in `{}` call",
470 func_name
471 );
472 reject_syntax!(
473 over.is_some(),
474 "`OVER` is not allowed in `{}` call",
475 func_name
476 );
477 if func_name == "array_transform" {
478 self.bind_array_transform(&arg_list.args)
479 } else {
480 self.bind_map_filter(&arg_list.args)
481 }
482 }
483
484 fn bind_array_transform(&mut self, args: &[FunctionArg]) -> Result<ExprImpl> {
485 let [array, lambda] = args else {
486 return Err(ErrorCode::BindError(format!(
487 "`array_transform` expect two inputs `array` and `lambda`, but {} were given",
488 args.len()
489 ))
490 .into());
491 };
492
493 let bound_array = self.bind_function_arg(array)?;
494 let [bound_array] = <[ExprImpl; 1]>::try_from(bound_array).map_err(|bound_array| -> RwError {
495 ErrorCode::BindError(format!("The `array` argument for `array_transform` should be bound to one argument, but {} were got", bound_array.len()))
496 .into()
497 })?;
498
499 let inner_ty = match bound_array.return_type() {
500 DataType::List(ty) => *ty,
501 real_type => return Err(ErrorCode::BindError(format!(
502 "The `array` argument for `array_transform` should be an array, but {} were got",
503 real_type
504 ))
505 .into()),
506 };
507
508 let ast::FunctionArgExpr::Expr(ast::Expr::LambdaFunction {
509 args: lambda_args,
510 body: lambda_body,
511 }) = lambda.get_expr()
512 else {
513 return Err(ErrorCode::BindError(
514 "The `lambda` argument for `array_transform` should be a lambda function"
515 .to_owned(),
516 )
517 .into());
518 };
519
520 let [lambda_arg] = <[Ident; 1]>::try_from(lambda_args).map_err(|args| -> RwError {
521 ErrorCode::BindError(format!(
522 "The `lambda` argument for `array_transform` should be a lambda function with one argument, but {} were given",
523 args.len()
524 ))
525 .into()
526 })?;
527
528 let bound_lambda = self.bind_unary_lambda_function(inner_ty, lambda_arg, *lambda_body)?;
529
530 let lambda_ret_type = bound_lambda.return_type();
531 let transform_ret_type = DataType::List(Box::new(lambda_ret_type));
532
533 Ok(ExprImpl::FunctionCallWithLambda(Box::new(
534 FunctionCallWithLambda::new_unchecked(
535 ExprType::ArrayTransform,
536 vec![bound_array],
537 bound_lambda,
538 transform_ret_type,
539 ),
540 )))
541 }
542
543 fn bind_unary_lambda_function(
544 &mut self,
545 input_ty: DataType,
546 arg: Ident,
547 body: ast::Expr,
548 ) -> Result<ExprImpl> {
549 let lambda_args = HashMap::from([(arg.real_value(), (0usize, input_ty))]);
550 let orig_lambda_args = self.context.lambda_args.replace(lambda_args);
551 let body = self.bind_expr_inner(&body)?;
552 self.context.lambda_args = orig_lambda_args;
553
554 Ok(body)
555 }
556
557 fn bind_map_filter(&mut self, args: &[FunctionArg]) -> Result<ExprImpl> {
558 let [input, lambda] = args else {
559 return Err(ErrorCode::BindError(format!(
560 "`map_filter` requires two arguments (input_map and lambda), got {}",
561 args.len()
562 ))
563 .into());
564 };
565
566 let bound_input = self.bind_function_arg(input)?;
567 let [bound_input] = <[ExprImpl; 1]>::try_from(bound_input).map_err(|e| {
568 ErrorCode::BindError(format!(
569 "Input argument should resolve to single expression, got {}",
570 e.len()
571 ))
572 })?;
573
574 let (key_type, value_type) = match bound_input.return_type() {
575 DataType::Map(map_type) => (map_type.key().clone(), map_type.value().clone()),
576 t => {
577 return Err(
578 ErrorCode::BindError(format!("Input must be Map type, got {}", t)).into(),
579 );
580 }
581 };
582
583 let ast::FunctionArgExpr::Expr(ast::Expr::LambdaFunction {
584 args: lambda_args,
585 body: lambda_body,
586 }) = lambda.get_expr()
587 else {
588 return Err(ErrorCode::BindError(
589 "Second argument must be a lambda function".to_owned(),
590 )
591 .into());
592 };
593
594 let [key_arg, value_arg] = <[Ident; 2]>::try_from(lambda_args).map_err(|args| {
595 ErrorCode::BindError(format!(
596 "Lambda must have exactly two parameters (key, value), got {}",
597 args.len()
598 ))
599 })?;
600
601 let bound_lambda = self.bind_binary_lambda_function(
602 key_arg,
603 key_type.clone(),
604 value_arg,
605 value_type.clone(),
606 *lambda_body,
607 )?;
608
609 let lambda_ret_type = bound_lambda.return_type();
610 if lambda_ret_type != DataType::Boolean {
611 return Err(ErrorCode::BindError(format!(
612 "Lambda must return Boolean type, got {}",
613 lambda_ret_type
614 ))
615 .into());
616 }
617
618 let map_type = MapType::from_kv(key_type, value_type);
619 let return_type = DataType::Map(map_type);
620
621 Ok(ExprImpl::FunctionCallWithLambda(Box::new(
622 FunctionCallWithLambda::new_unchecked(
623 ExprType::MapFilter,
624 vec![bound_input],
625 bound_lambda,
626 return_type,
627 ),
628 )))
629 }
630
631 fn bind_binary_lambda_function(
632 &mut self,
633 first_arg: Ident,
634 first_ty: DataType,
635 second_arg: Ident,
636 second_ty: DataType,
637 body: ast::Expr,
638 ) -> Result<ExprImpl> {
639 let lambda_args = HashMap::from([
640 (first_arg.real_value(), (0usize, first_ty)),
641 (second_arg.real_value(), (1usize, second_ty)),
642 ]);
643
644 let orig_ctx = self.context.lambda_args.replace(lambda_args);
645 let bound_body = self.bind_expr_inner(&body)?;
646 self.context.lambda_args = orig_ctx;
647
648 Ok(bound_body)
649 }
650
651 fn ensure_table_function_allowed(&self) -> Result<()> {
652 if let Some(clause) = self.context.clause {
653 match clause {
654 Clause::JoinOn
655 | Clause::Where
656 | Clause::Having
657 | Clause::Filter
658 | Clause::Values
659 | Clause::Insert
660 | Clause::GeneratedColumn => {
661 return Err(ErrorCode::InvalidInputSyntax(format!(
662 "table functions are not allowed in {}",
663 clause
664 ))
665 .into());
666 }
667 Clause::GroupBy | Clause::From => {}
668 }
669 }
670 Ok(())
671 }
672
673 pub(crate) fn extract_udf_expr(ast: Vec<Statement>) -> Result<AstExpr> {
675 if ast.len() != 1 {
676 return Err(ErrorCode::InvalidInputSyntax(
677 "the query for sql udf should contain only one statement".to_owned(),
678 )
679 .into());
680 }
681
682 let Statement::Query(query) = ast.into_iter().next().unwrap() else {
684 return Err(ErrorCode::InvalidInputSyntax(
685 "invalid function definition, please recheck the syntax".to_owned(),
686 )
687 .into());
688 };
689
690 if let Some(expr) = query.as_single_select_item() {
691 Ok(expr.clone())
693 } else {
694 Ok(AstExpr::Subquery(query))
696 }
697 }
698
699 pub fn bind_sql_udf_inner(
700 &mut self,
701 body: &str,
702 arg_names: &[String],
703 args: Vec<ExprImpl>,
704 ) -> Result<ExprImpl> {
705 let ast = Parser::parse_sql(body)?;
707
708 let stashed_arguments = self.context.sql_udf_arguments.take();
712
713 let mut arguments = HashMap::new();
715 for (i, arg) in args.into_iter().enumerate() {
716 if arg_names[i].is_empty() {
717 arguments.insert(format!("${}", i + 1), arg);
719 } else {
720 arguments.insert(arg_names[i].clone(), arg);
722 }
723 }
724 self.context.sql_udf_arguments = Some(arguments);
725
726 let Ok(expr) = Self::extract_udf_expr(ast) else {
727 return Err(ErrorCode::InvalidInputSyntax(
728 "failed to parse the input query and extract the udf expression, \
729 please recheck the syntax"
730 .to_owned(),
731 )
732 .into());
733 };
734
735 let bind_result = self.bind_expr(&expr);
736 self.context.sql_udf_arguments = stashed_arguments;
738
739 bind_result
740 }
741
742 fn bind_sql_udf(
743 &mut self,
744 func: Arc<FunctionCatalog>,
745 args: Vec<ExprImpl>,
746 ) -> Result<ExprImpl> {
747 let Some(body) = &func.body else {
748 return Err(
749 ErrorCode::InvalidInputSyntax("`body` must exist for sql udf".to_owned()).into(),
750 );
751 };
752
753 self.bind_sql_udf_inner(body, &func.arg_names, args)
754 }
755
756 pub(in crate::binder) fn bind_function_expr_arg(
757 &mut self,
758 arg_expr: &FunctionArgExpr,
759 ) -> Result<Vec<ExprImpl>> {
760 match arg_expr {
761 FunctionArgExpr::Expr(expr) => Ok(vec![self.bind_expr_inner(expr)?]),
762 FunctionArgExpr::QualifiedWildcard(_, _)
763 | FunctionArgExpr::ExprQualifiedWildcard(_, _) => Err(ErrorCode::InvalidInputSyntax(
764 format!("unexpected wildcard {}", arg_expr),
765 )
766 .into()),
767 FunctionArgExpr::Wildcard(None) => Ok(vec![]),
768 FunctionArgExpr::Wildcard(Some(_)) => unreachable!(),
769 }
770 }
771
772 pub(in crate::binder) fn bind_function_arg(
773 &mut self,
774 arg: &FunctionArg,
775 ) -> Result<Vec<ExprImpl>> {
776 match arg {
777 FunctionArg::Unnamed(expr) => self.bind_function_expr_arg(expr),
778 FunctionArg::Named { .. } => todo!(),
779 }
780 }
781}