1use std::collections::{HashMap, HashSet};
16use std::str::FromStr;
17use std::sync::Arc;
18
19use anyhow::Context;
20use itertools::Itertools;
21use risingwave_common::acl::AclMode;
22use risingwave_common::bail_not_implemented;
23use risingwave_common::catalog::INFORMATION_SCHEMA_SCHEMA_NAME;
24use risingwave_common::types::{DataType, MapType};
25use risingwave_expr::aggregate::AggType;
26use risingwave_expr::window_function::WindowFuncKind;
27use risingwave_sqlparser::ast::{
28 self, Expr as AstExpr, Function, FunctionArg, FunctionArgExpr, FunctionArgList, Ident,
29 OrderByExpr, Statement, Window,
30};
31use risingwave_sqlparser::parser::Parser;
32
33use crate::binder::Binder;
34use crate::binder::bind_context::Clause;
35use crate::catalog::function_catalog::FunctionCatalog;
36use crate::error::{ErrorCode, Result, RwError};
37use crate::expr::{
38 Expr, ExprImpl, ExprType, FunctionCallWithLambda, InputRef, TableFunction, TableFunctionType,
39 UserDefinedFunction,
40};
41use crate::handler::privilege::ObjectCheckItem;
42
43mod aggregate;
44mod builtin_scalar;
45mod window;
46
47const SYS_FUNCTION_WITHOUT_ARGS: &[&str] = &[
49 "session_user",
50 "user",
51 "current_user",
52 "current_role",
53 "current_catalog",
54 "current_schema",
55 "current_timestamp",
56];
57
58pub(super) fn is_sys_function_without_args(ident: &Ident) -> bool {
59 SYS_FUNCTION_WITHOUT_ARGS
60 .iter()
61 .any(|e| ident.real_value().as_str() == *e && ident.quote_style().is_none())
62}
63
64macro_rules! reject_syntax {
65 ($pred:expr, $msg:expr) => {
66 if $pred {
67 return Err(ErrorCode::InvalidInputSyntax($msg.to_string()).into());
68 }
69 };
70
71 ($pred:expr, $fmt:expr, $($arg:tt)*) => {
72 if $pred {
73 return Err(ErrorCode::InvalidInputSyntax(
74 format!($fmt, $($arg)*)
75 ).into());
76 }
77 };
78}
79
80impl Binder {
81 pub(in crate::binder) fn bind_function(
82 &mut self,
83 Function {
84 scalar_as_agg,
85 name,
86 arg_list,
87 within_group,
88 filter,
89 over,
90 }: &Function,
91 ) -> Result<ExprImpl> {
92 let (schema_name, func_name) = match name.0.as_slice() {
93 [name] => (None, name.real_value()),
94 [schema, name] => {
95 let schema_name = schema.real_value();
96 let func_name = if schema_name == INFORMATION_SCHEMA_SCHEMA_NAME {
97 let function_name = name.real_value();
101 if function_name != "_pg_expandarray" {
102 bail_not_implemented!(
103 issue = 12422,
104 "Unsupported function name under schema: {}",
105 schema_name
106 );
107 }
108 function_name
109 } else {
110 name.real_value()
111 };
112 (Some(schema_name), func_name)
113 }
114 [database, schema, name] => {
115 let database_name = database.real_value();
117 if database_name != self.db_name {
118 return Err(ErrorCode::BindError(format!(
119 "Cross-database function call is not supported: {}",
120 name
121 ))
122 .into());
123 }
124 let schema_name = schema.real_value();
125 let func_name = name.real_value();
126 (Some(schema_name), func_name)
127 }
128 _ => bail_not_implemented!(issue = 112, "qualified function {}", name),
129 };
130
131 if func_name == "obj_description" || func_name == "col_description" {
138 return Ok(ExprImpl::literal_varchar("".to_owned()));
139 }
140
141 if func_name == "array_transform" || func_name == "map_filter" {
143 return self.validate_and_bind_special_function_params(
144 &func_name,
145 *scalar_as_agg,
146 arg_list,
147 within_group.as_deref(),
148 filter.as_deref(),
149 over.as_ref(),
150 );
151 }
152
153 let mut args: Vec<_> = arg_list
154 .args
155 .iter()
156 .map(|arg| self.bind_function_arg(arg))
157 .flatten_ok()
158 .try_collect()?;
159
160 let mut referred_udfs = HashSet::new();
161
162 let wrapped_agg_type = if *scalar_as_agg {
163 let mut array_args = args
166 .iter()
167 .enumerate()
168 .map(|(i, expr)| InputRef::new(i, DataType::list(expr.return_type())).into())
169 .collect_vec();
170 let schema_path = self.bind_schema_path(schema_name.as_deref());
171 let scalar_func_expr = if let Ok((func, _)) = self.catalog.get_function_by_name_inputs(
172 &self.db_name,
173 schema_path,
174 &func_name,
175 &mut array_args,
176 ) {
177 referred_udfs.insert(func.id);
179 self.check_privilege(
180 ObjectCheckItem::new(func.owner, AclMode::Execute, func.name.clone(), func.id),
181 self.database_id,
182 )?;
183
184 if !func.kind.is_scalar() {
185 return Err(ErrorCode::InvalidInputSyntax(
186 "expect a scalar function after `AGGREGATE:`".to_owned(),
187 )
188 .into());
189 }
190
191 if func.language == "sql" {
192 self.bind_sql_udf(func.clone(), array_args)?
193 } else {
194 UserDefinedFunction::new(func.clone(), array_args).into()
195 }
196 } else {
197 self.bind_builtin_scalar_function(&func_name, array_args, arg_list.variadic)?
198 };
199
200 let expr_node = match scalar_func_expr.try_to_expr_proto() {
203 Ok(expr_node) => expr_node,
204 Err(e) => {
205 return Err(ErrorCode::InvalidInputSyntax(format!(
206 "function {func_name} cannot be used after `AGGREGATE:`: {e}",
207 ))
208 .into());
209 }
210 };
211
212 Some(AggType::WrapScalar(expr_node))
214 } else {
215 None
216 };
217
218 let schema_path = self.bind_schema_path(schema_name.as_deref());
219 let udf = if wrapped_agg_type.is_none()
220 && let Ok((func, _)) = self.catalog.get_function_by_name_inputs(
221 &self.db_name,
222 schema_path,
223 &func_name,
224 &mut args,
225 ) {
226 referred_udfs.insert(func.id);
228 self.check_privilege(
229 ObjectCheckItem::new(func.owner, AclMode::Execute, func.name.clone(), func.id),
230 self.database_id,
231 )?;
232 Some(func.clone())
233 } else {
234 None
235 };
236
237 self.included_udfs.extend(referred_udfs);
238
239 let agg_type = if wrapped_agg_type.is_some() {
240 wrapped_agg_type
241 } else if let Some(ref udf) = udf
242 && udf.kind.is_aggregate()
243 {
244 assert_ne!(udf.language, "sql", "SQL UDAF is not supported yet");
245 Some(AggType::UserDefined(udf.as_ref().into()))
246 } else {
247 AggType::from_str(&func_name).ok()
248 };
249
250 if let Some(over) = over {
252 reject_syntax!(
253 arg_list.distinct,
254 "`DISTINCT` is not allowed in window function call"
255 );
256 reject_syntax!(
257 arg_list.variadic,
258 "`VARIADIC` is not allowed in window function call"
259 );
260 reject_syntax!(
261 !arg_list.order_by.is_empty(),
262 "`ORDER BY` is not allowed in window function call argument list"
263 );
264 reject_syntax!(
265 within_group.is_some(),
266 "`WITHIN GROUP` is not allowed in window function call"
267 );
268
269 let kind = if let Some(agg_type) = agg_type {
270 WindowFuncKind::Aggregate(agg_type)
272 } else if let Ok(kind) = WindowFuncKind::from_str(&func_name) {
273 kind
274 } else {
275 bail_not_implemented!(issue = 8961, "Unrecognized window function: {}", func_name);
276 };
277 return self.bind_window_function(
278 kind,
279 args,
280 arg_list.ignore_nulls,
281 filter.as_deref(),
282 over,
283 );
284 }
285
286 reject_syntax!(
288 arg_list.ignore_nulls,
289 "`IGNORE NULLS` is not allowed in aggregate/scalar/table function call"
290 );
291
292 if let Some(agg_type) = agg_type {
294 reject_syntax!(
295 arg_list.variadic,
296 "`VARIADIC` is not allowed in aggregate function call"
297 );
298 return self.bind_aggregate_function(
299 agg_type,
300 arg_list.distinct,
301 args,
302 &arg_list.order_by,
303 within_group.as_deref(),
304 filter.as_deref(),
305 );
306 }
307
308 reject_syntax!(
310 arg_list.distinct,
311 "`DISTINCT` is not allowed in scalar/table function call"
312 );
313 reject_syntax!(
314 !arg_list.order_by.is_empty(),
315 "`ORDER BY` is not allowed in scalar/table function call"
316 );
317 reject_syntax!(
318 within_group.is_some(),
319 "`WITHIN GROUP` is not allowed in scalar/table function call"
320 );
321 reject_syntax!(
322 filter.is_some(),
323 "`FILTER` is not allowed in scalar/table function call"
324 );
325
326 {
328 if func_name.eq_ignore_ascii_case("file_scan") {
330 reject_syntax!(
331 arg_list.variadic,
332 "`VARIADIC` is not allowed in table function call"
333 );
334 self.ensure_table_function_allowed()?;
335 return Ok(TableFunction::new_file_scan(args)?.into());
336 }
337 if func_name.eq("postgres_query") {
339 reject_syntax!(
340 arg_list.variadic,
341 "`VARIADIC` is not allowed in table function call"
342 );
343 self.ensure_table_function_allowed()?;
344 return Ok(TableFunction::new_postgres_query(
345 &self.catalog,
346 &self.db_name,
347 self.bind_schema_path(schema_name.as_deref()),
348 args,
349 )
350 .context("postgres_query error")?
351 .into());
352 }
353 if func_name.eq("mysql_query") {
355 reject_syntax!(
356 arg_list.variadic,
357 "`VARIADIC` is not allowed in table function call"
358 );
359 self.ensure_table_function_allowed()?;
360 return Ok(TableFunction::new_mysql_query(
361 &self.catalog,
362 &self.db_name,
363 self.bind_schema_path(schema_name.as_deref()),
364 args,
365 )
366 .context("mysql_query error")?
367 .into());
368 }
369 if func_name.eq("internal_backfill_progress") {
371 reject_syntax!(
372 arg_list.variadic,
373 "`VARIADIC` is not allowed in table function call"
374 );
375 self.ensure_table_function_allowed()?;
376 return Ok(TableFunction::new_internal_backfill_progress().into());
377 }
378 if func_name.eq("internal_source_backfill_progress") {
380 reject_syntax!(
381 arg_list.variadic,
382 "`VARIADIC` is not allowed in table function call"
383 );
384 self.ensure_table_function_allowed()?;
385 return Ok(TableFunction::new_internal_source_backfill_progress().into());
386 }
387 if func_name.eq("internal_get_channel_delta_stats") {
389 reject_syntax!(
390 arg_list.variadic,
391 "`VARIADIC` is not allowed in table function call"
392 );
393 self.ensure_table_function_allowed()?;
394
395 return Ok(TableFunction::new_internal_get_channel_delta_stats(args).into());
396 }
397 if let Some(ref udf) = udf
399 && udf.kind.is_table()
400 {
401 reject_syntax!(
402 arg_list.variadic,
403 "`VARIADIC` is not allowed in table function call"
404 );
405 self.ensure_table_function_allowed()?;
406 if udf.language == "sql" {
407 return self.bind_sql_udf(udf.clone(), args);
408 }
409 return Ok(TableFunction::new_user_defined(udf.clone(), args).into());
410 }
411 if let Ok(function_type) = TableFunctionType::from_str(&func_name) {
413 reject_syntax!(
414 arg_list.variadic,
415 "`VARIADIC` is not allowed in table function call"
416 );
417 self.ensure_table_function_allowed()?;
418 return Ok(TableFunction::new(function_type, args)?.into());
419 }
420 }
421
422 if let Some(ref udf) = udf {
424 assert!(udf.kind.is_scalar());
425 reject_syntax!(
426 arg_list.variadic,
427 "`VARIADIC` is not allowed in user-defined function call"
428 );
429 if udf.language == "sql" {
430 return self.bind_sql_udf(udf.clone(), args);
431 }
432 return Ok(UserDefinedFunction::new(udf.clone(), args).into());
433 }
434
435 self.bind_builtin_scalar_function(&func_name, args, arg_list.variadic)
436 }
437
438 fn validate_and_bind_special_function_params(
439 &mut self,
440 func_name: &str,
441 scalar_as_agg: bool,
442 arg_list: &FunctionArgList,
443 within_group: Option<&OrderByExpr>,
444 filter: Option<&risingwave_sqlparser::ast::Expr>,
445 over: Option<&Window>,
446 ) -> Result<ExprImpl> {
447 assert!(["array_transform", "map_filter"].contains(&func_name));
448
449 reject_syntax!(
450 scalar_as_agg,
451 "`AGGREGATE:` prefix is not allowed for `{}`",
452 func_name
453 );
454 reject_syntax!(
455 !arg_list.is_args_only(),
456 "keywords like `DISTINCT`, `ORDER BY` are not allowed in `{}` argument list",
457 func_name
458 );
459 reject_syntax!(
460 within_group.is_some(),
461 "`WITHIN GROUP` is not allowed in `{}` call",
462 func_name
463 );
464 reject_syntax!(
465 filter.is_some(),
466 "`FILTER` is not allowed in `{}` call",
467 func_name
468 );
469 reject_syntax!(
470 over.is_some(),
471 "`OVER` is not allowed in `{}` call",
472 func_name
473 );
474 if func_name == "array_transform" {
475 self.bind_array_transform(&arg_list.args)
476 } else {
477 self.bind_map_filter(&arg_list.args)
478 }
479 }
480
481 fn bind_array_transform(&mut self, args: &[FunctionArg]) -> Result<ExprImpl> {
482 let [array, lambda] = args else {
483 return Err(ErrorCode::BindError(format!(
484 "`array_transform` expect two inputs `array` and `lambda`, but {} were given",
485 args.len()
486 ))
487 .into());
488 };
489
490 let bound_array = self.bind_function_arg(array)?;
491 let [bound_array] = <[ExprImpl; 1]>::try_from(bound_array).map_err(|bound_array| -> RwError {
492 ErrorCode::BindError(format!("The `array` argument for `array_transform` should be bound to one argument, but {} were got", bound_array.len()))
493 .into()
494 })?;
495
496 let inner_ty = match bound_array.return_type() {
497 DataType::List(ty) => ty.into_elem(),
498 real_type => return Err(ErrorCode::BindError(format!(
499 "The `array` argument for `array_transform` should be an array, but {} were got",
500 real_type
501 ))
502 .into()),
503 };
504
505 let ast::FunctionArgExpr::Expr(ast::Expr::LambdaFunction {
506 args: lambda_args,
507 body: lambda_body,
508 }) = lambda.get_expr()
509 else {
510 return Err(ErrorCode::BindError(
511 "The `lambda` argument for `array_transform` should be a lambda function"
512 .to_owned(),
513 )
514 .into());
515 };
516
517 let [lambda_arg] = <[Ident; 1]>::try_from(lambda_args).map_err(|args| -> RwError {
518 ErrorCode::BindError(format!(
519 "The `lambda` argument for `array_transform` should be a lambda function with one argument, but {} were given",
520 args.len()
521 ))
522 .into()
523 })?;
524
525 let bound_lambda = self.bind_unary_lambda_function(inner_ty, lambda_arg, *lambda_body)?;
526
527 let lambda_ret_type = bound_lambda.return_type();
528 let transform_ret_type = DataType::list(lambda_ret_type);
529
530 Ok(ExprImpl::FunctionCallWithLambda(Box::new(
531 FunctionCallWithLambda::new_unchecked(
532 ExprType::ArrayTransform,
533 vec![bound_array],
534 bound_lambda,
535 transform_ret_type,
536 ),
537 )))
538 }
539
540 fn bind_unary_lambda_function(
541 &mut self,
542 input_ty: DataType,
543 arg: Ident,
544 body: ast::Expr,
545 ) -> Result<ExprImpl> {
546 let lambda_args = HashMap::from([(arg.real_value(), (0usize, input_ty))]);
547 let orig_lambda_args = self.context.lambda_args.replace(lambda_args);
548 let body = self.bind_expr_inner(&body)?;
549 self.context.lambda_args = orig_lambda_args;
550
551 Ok(body)
552 }
553
554 fn bind_map_filter(&mut self, args: &[FunctionArg]) -> Result<ExprImpl> {
555 let [input, lambda] = args else {
556 return Err(ErrorCode::BindError(format!(
557 "`map_filter` requires two arguments (input_map and lambda), got {}",
558 args.len()
559 ))
560 .into());
561 };
562
563 let bound_input = self.bind_function_arg(input)?;
564 let [bound_input] = <[ExprImpl; 1]>::try_from(bound_input).map_err(|e| {
565 ErrorCode::BindError(format!(
566 "Input argument should resolve to single expression, got {}",
567 e.len()
568 ))
569 })?;
570
571 let (key_type, value_type) = match bound_input.return_type() {
572 DataType::Map(map_type) => (map_type.key().clone(), map_type.value().clone()),
573 t => {
574 return Err(
575 ErrorCode::BindError(format!("Input must be Map type, got {}", t)).into(),
576 );
577 }
578 };
579
580 let ast::FunctionArgExpr::Expr(ast::Expr::LambdaFunction {
581 args: lambda_args,
582 body: lambda_body,
583 }) = lambda.get_expr()
584 else {
585 return Err(ErrorCode::BindError(
586 "Second argument must be a lambda function".to_owned(),
587 )
588 .into());
589 };
590
591 let [key_arg, value_arg] = <[Ident; 2]>::try_from(lambda_args).map_err(|args| {
592 ErrorCode::BindError(format!(
593 "Lambda must have exactly two parameters (key, value), got {}",
594 args.len()
595 ))
596 })?;
597
598 let bound_lambda = self.bind_binary_lambda_function(
599 key_arg,
600 key_type.clone(),
601 value_arg,
602 value_type.clone(),
603 *lambda_body,
604 )?;
605
606 let lambda_ret_type = bound_lambda.return_type();
607 if lambda_ret_type != DataType::Boolean {
608 return Err(ErrorCode::BindError(format!(
609 "Lambda must return Boolean type, got {}",
610 lambda_ret_type
611 ))
612 .into());
613 }
614
615 let map_type = MapType::from_kv(key_type, value_type);
616 let return_type = DataType::Map(map_type);
617
618 Ok(ExprImpl::FunctionCallWithLambda(Box::new(
619 FunctionCallWithLambda::new_unchecked(
620 ExprType::MapFilter,
621 vec![bound_input],
622 bound_lambda,
623 return_type,
624 ),
625 )))
626 }
627
628 fn bind_binary_lambda_function(
629 &mut self,
630 first_arg: Ident,
631 first_ty: DataType,
632 second_arg: Ident,
633 second_ty: DataType,
634 body: ast::Expr,
635 ) -> Result<ExprImpl> {
636 let lambda_args = HashMap::from([
637 (first_arg.real_value(), (0usize, first_ty)),
638 (second_arg.real_value(), (1usize, second_ty)),
639 ]);
640
641 let orig_ctx = self.context.lambda_args.replace(lambda_args);
642 let bound_body = self.bind_expr_inner(&body)?;
643 self.context.lambda_args = orig_ctx;
644
645 Ok(bound_body)
646 }
647
648 fn ensure_table_function_allowed(&self) -> Result<()> {
649 if let Some(clause) = self.context.clause {
650 match clause {
651 Clause::JoinOn
652 | Clause::Where
653 | Clause::Having
654 | Clause::Filter
655 | Clause::Values
656 | Clause::Insert
657 | Clause::GeneratedColumn => {
658 return Err(ErrorCode::InvalidInputSyntax(format!(
659 "table functions are not allowed in {}",
660 clause
661 ))
662 .into());
663 }
664 Clause::GroupBy | Clause::From => {}
665 }
666 }
667 Ok(())
668 }
669
670 pub(crate) fn extract_udf_expr(ast: Vec<Statement>) -> Result<AstExpr> {
672 if ast.len() != 1 {
673 return Err(ErrorCode::InvalidInputSyntax(
674 "the query for sql udf should contain only one statement".to_owned(),
675 )
676 .into());
677 }
678
679 let Statement::Query(query) = ast.into_iter().next().unwrap() else {
681 return Err(ErrorCode::InvalidInputSyntax(
682 "invalid function definition, please recheck the syntax".to_owned(),
683 )
684 .into());
685 };
686
687 if let Some(expr) = query.as_single_select_item() {
688 Ok(expr.clone())
690 } else {
691 Ok(AstExpr::Subquery(query))
693 }
694 }
695
696 pub fn bind_sql_udf_inner(
697 &mut self,
698 body: &str,
699 arg_names: &[String],
700 args: Vec<ExprImpl>,
701 ) -> Result<ExprImpl> {
702 let ast = Parser::parse_sql(body)?;
704
705 let stashed_arguments = self.context.sql_udf_arguments.take();
709
710 let mut arguments = HashMap::new();
712 for (i, arg) in args.into_iter().enumerate() {
713 if arg_names[i].is_empty() {
714 arguments.insert(format!("${}", i + 1), arg);
716 } else {
717 arguments.insert(arg_names[i].clone(), arg);
719 }
720 }
721 self.context.sql_udf_arguments = Some(arguments);
722
723 let Ok(expr) = Self::extract_udf_expr(ast) else {
724 return Err(ErrorCode::InvalidInputSyntax(
725 "failed to parse the input query and extract the udf expression, \
726 please recheck the syntax"
727 .to_owned(),
728 )
729 .into());
730 };
731
732 let bind_result = self.bind_expr(&expr);
733 self.context.sql_udf_arguments = stashed_arguments;
735
736 bind_result
737 }
738
739 fn bind_sql_udf(
740 &mut self,
741 func: Arc<FunctionCatalog>,
742 args: Vec<ExprImpl>,
743 ) -> Result<ExprImpl> {
744 let Some(body) = &func.body else {
745 return Err(
746 ErrorCode::InvalidInputSyntax("`body` must exist for sql udf".to_owned()).into(),
747 );
748 };
749
750 self.bind_sql_udf_inner(body, &func.arg_names, args)
751 }
752
753 pub(in crate::binder) fn bind_function_expr_arg(
754 &mut self,
755 arg_expr: &FunctionArgExpr,
756 ) -> Result<Vec<ExprImpl>> {
757 match arg_expr {
758 FunctionArgExpr::Expr(expr) => Ok(vec![self.bind_expr_inner(expr)?]),
759 FunctionArgExpr::QualifiedWildcard(_, _)
760 | FunctionArgExpr::ExprQualifiedWildcard(_, _) => Err(ErrorCode::InvalidInputSyntax(
761 format!("unexpected wildcard {}", arg_expr),
762 )
763 .into()),
764 FunctionArgExpr::Wildcard(None) => Ok(vec![]),
765 FunctionArgExpr::Wildcard(Some(_)) => unreachable!(),
766 }
767 }
768
769 pub(in crate::binder) fn bind_function_arg(
770 &mut self,
771 arg: &FunctionArg,
772 ) -> Result<Vec<ExprImpl>> {
773 match arg {
774 FunctionArg::Unnamed(expr) => self.bind_function_expr_arg(expr),
775 FunctionArg::Named { .. } => todo!(),
776 }
777 }
778}