1use std::collections::{HashMap, HashSet};
16use std::str::FromStr;
17use std::sync::Arc;
18
19use anyhow::Context;
20use itertools::Itertools;
21use risingwave_common::acl::AclMode;
22use risingwave_common::bail_not_implemented;
23use risingwave_common::catalog::{INFORMATION_SCHEMA_SCHEMA_NAME, PG_CATALOG_SCHEMA_NAME};
24use risingwave_common::types::{DataType, MapType};
25use risingwave_expr::aggregate::AggType;
26use risingwave_expr::window_function::WindowFuncKind;
27use risingwave_pb::user::grant_privilege::PbObject;
28use risingwave_sqlparser::ast::{
29 self, Function, FunctionArg, FunctionArgExpr, FunctionArgList, Ident, OrderByExpr, Window,
30};
31use risingwave_sqlparser::parser::ParserError;
32
33use crate::binder::bind_context::Clause;
34use crate::binder::{Binder, UdfContext};
35use crate::catalog::function_catalog::FunctionCatalog;
36use crate::error::{ErrorCode, Result, RwError};
37use crate::expr::{
38 Expr, ExprImpl, ExprType, FunctionCallWithLambda, InputRef, TableFunction, TableFunctionType,
39 UserDefinedFunction,
40};
41use crate::handler::privilege::ObjectCheckItem;
42
43mod aggregate;
44mod builtin_scalar;
45mod window;
46
47const SYS_FUNCTION_WITHOUT_ARGS: &[&str] = &[
49 "session_user",
50 "user",
51 "current_user",
52 "current_role",
53 "current_catalog",
54 "current_schema",
55 "current_timestamp",
56];
57
58pub(super) fn is_sys_function_without_args(ident: &Ident) -> bool {
59 SYS_FUNCTION_WITHOUT_ARGS
60 .iter()
61 .any(|e| ident.real_value().as_str() == *e && ident.quote_style().is_none())
62}
63
64const SQL_UDF_MAX_CALLING_DEPTH: u32 = 16;
69
70macro_rules! reject_syntax {
71 ($pred:expr, $msg:expr) => {
72 if $pred {
73 return Err(ErrorCode::InvalidInputSyntax($msg.to_string()).into());
74 }
75 };
76
77 ($pred:expr, $fmt:expr, $($arg:tt)*) => {
78 if $pred {
79 return Err(ErrorCode::InvalidInputSyntax(
80 format!($fmt, $($arg)*)
81 ).into());
82 }
83 };
84}
85
86impl Binder {
87 pub(in crate::binder) fn bind_function(
88 &mut self,
89 Function {
90 scalar_as_agg,
91 name,
92 arg_list,
93 within_group,
94 filter,
95 over,
96 }: Function,
97 ) -> Result<ExprImpl> {
98 let (schema_name, func_name) = match name.0.as_slice() {
99 [name] => (None, name.real_value()),
100 [schema, name] => {
101 let schema_name = schema.real_value();
102 let func_name = if schema_name == PG_CATALOG_SCHEMA_NAME {
103 name.real_value()
106 } else if schema_name == INFORMATION_SCHEMA_SCHEMA_NAME {
107 let function_name = name.real_value();
111 if function_name != "_pg_expandarray" {
112 bail_not_implemented!(
113 issue = 12422,
114 "Unsupported function name under schema: {}",
115 schema_name
116 );
117 }
118 function_name
119 } else {
120 bail_not_implemented!(
121 issue = 12422,
122 "Unsupported function name under schema: {}",
123 schema_name
124 );
125 };
126 (Some(schema_name), func_name)
127 }
128 _ => bail_not_implemented!(issue = 112, "qualified function {}", name),
129 };
130
131 if func_name == "obj_description" || func_name == "col_description" {
138 return Ok(ExprImpl::literal_varchar("".to_owned()));
139 }
140
141 if func_name == "array_transform" || func_name == "map_filter" {
143 return self.validate_and_bind_special_function_params(
144 &func_name,
145 scalar_as_agg,
146 arg_list,
147 &within_group,
148 &filter,
149 &over,
150 );
151 }
152
153 let mut args: Vec<_> = arg_list
154 .args
155 .iter()
156 .map(|arg| self.bind_function_arg(arg.clone()))
157 .flatten_ok()
158 .try_collect()?;
159
160 let mut referred_udfs = HashSet::new();
161
162 let wrapped_agg_type = if scalar_as_agg {
163 let mut array_args = args
166 .iter()
167 .enumerate()
168 .map(|(i, expr)| {
169 InputRef::new(i, DataType::List(Box::new(expr.return_type()))).into()
170 })
171 .collect_vec();
172 let schema_path = self.bind_schema_path(schema_name.as_deref());
173 let scalar_func_expr = if let Ok((func, _)) = self.catalog.get_function_by_name_inputs(
174 &self.db_name,
175 schema_path,
176 &func_name,
177 &mut array_args,
178 ) {
179 referred_udfs.insert(func.id);
181 self.check_privilege(
182 ObjectCheckItem::new(
183 func.owner,
184 AclMode::Execute,
185 func.name.clone(),
186 PbObject::FunctionId(func.id.function_id()),
187 ),
188 self.database_id,
189 )?;
190
191 if !func.kind.is_scalar() {
192 return Err(ErrorCode::InvalidInputSyntax(
193 "expect a scalar function after `AGGREGATE:`".to_owned(),
194 )
195 .into());
196 }
197
198 if func.language == "sql" {
199 self.bind_sql_udf(func.clone(), array_args)?
200 } else {
201 UserDefinedFunction::new(func.clone(), array_args).into()
202 }
203 } else {
204 self.bind_builtin_scalar_function(&func_name, array_args, arg_list.variadic)?
205 };
206
207 Some(AggType::WrapScalar(scalar_func_expr.to_expr_proto()))
209 } else {
210 None
211 };
212
213 let schema_path = self.bind_schema_path(schema_name.as_deref());
214 let udf = if wrapped_agg_type.is_none()
215 && let Ok((func, _)) = self.catalog.get_function_by_name_inputs(
216 &self.db_name,
217 schema_path,
218 &func_name,
219 &mut args,
220 ) {
221 referred_udfs.insert(func.id);
223 self.check_privilege(
224 ObjectCheckItem::new(
225 func.owner,
226 AclMode::Execute,
227 func.name.clone(),
228 PbObject::FunctionId(func.id.function_id()),
229 ),
230 self.database_id,
231 )?;
232 Some(func.clone())
233 } else {
234 None
235 };
236
237 self.included_udfs.extend(referred_udfs);
238
239 let agg_type = if wrapped_agg_type.is_some() {
240 wrapped_agg_type
241 } else if let Some(ref udf) = udf
242 && udf.kind.is_aggregate()
243 {
244 assert_ne!(udf.language, "sql", "SQL UDAF is not supported yet");
245 Some(AggType::UserDefined(udf.as_ref().into()))
246 } else {
247 AggType::from_str(&func_name).ok()
248 };
249
250 if let Some(over) = over {
252 reject_syntax!(
253 arg_list.distinct,
254 "`DISTINCT` is not allowed in window function call"
255 );
256 reject_syntax!(
257 arg_list.variadic,
258 "`VARIADIC` is not allowed in window function call"
259 );
260 reject_syntax!(
261 !arg_list.order_by.is_empty(),
262 "`ORDER BY` is not allowed in window function call argument list"
263 );
264 reject_syntax!(
265 within_group.is_some(),
266 "`WITHIN GROUP` is not allowed in window function call"
267 );
268
269 let kind = if let Some(agg_type) = agg_type {
270 WindowFuncKind::Aggregate(agg_type)
272 } else if let Ok(kind) = WindowFuncKind::from_str(&func_name) {
273 kind
274 } else {
275 bail_not_implemented!(issue = 8961, "Unrecognized window function: {}", func_name);
276 };
277 return self.bind_window_function(kind, args, arg_list.ignore_nulls, filter, over);
278 }
279
280 reject_syntax!(
282 arg_list.ignore_nulls,
283 "`IGNORE NULLS` is not allowed in aggregate/scalar/table function call"
284 );
285
286 if let Some(agg_type) = agg_type {
288 reject_syntax!(
289 arg_list.variadic,
290 "`VARIADIC` is not allowed in aggregate function call"
291 );
292 return self.bind_aggregate_function(
293 agg_type,
294 arg_list.distinct,
295 args,
296 arg_list.order_by,
297 within_group,
298 filter,
299 );
300 }
301
302 reject_syntax!(
304 arg_list.distinct,
305 "`DISTINCT` is not allowed in scalar/table function call"
306 );
307 reject_syntax!(
308 !arg_list.order_by.is_empty(),
309 "`ORDER BY` is not allowed in scalar/table function call"
310 );
311 reject_syntax!(
312 within_group.is_some(),
313 "`WITHIN GROUP` is not allowed in scalar/table function call"
314 );
315 reject_syntax!(
316 filter.is_some(),
317 "`FILTER` is not allowed in scalar/table function call"
318 );
319
320 {
322 if func_name.eq_ignore_ascii_case("file_scan") {
324 reject_syntax!(
325 arg_list.variadic,
326 "`VARIADIC` is not allowed in table function call"
327 );
328 self.ensure_table_function_allowed()?;
329 return Ok(TableFunction::new_file_scan(args)?.into());
330 }
331 if func_name.eq("postgres_query") {
333 reject_syntax!(
334 arg_list.variadic,
335 "`VARIADIC` is not allowed in table function call"
336 );
337 self.ensure_table_function_allowed()?;
338 return Ok(TableFunction::new_postgres_query(
339 &self.catalog,
340 &self.db_name,
341 self.bind_schema_path(schema_name.as_deref()),
342 args,
343 )
344 .context("postgres_query error")?
345 .into());
346 }
347 if func_name.eq("mysql_query") {
349 reject_syntax!(
350 arg_list.variadic,
351 "`VARIADIC` is not allowed in table function call"
352 );
353 self.ensure_table_function_allowed()?;
354 return Ok(TableFunction::new_mysql_query(
355 &self.catalog,
356 &self.db_name,
357 self.bind_schema_path(schema_name.as_deref()),
358 args,
359 )
360 .context("mysql_query error")?
361 .into());
362 }
363 if func_name.eq("internal_backfill_progress") {
365 reject_syntax!(
366 arg_list.variadic,
367 "`VARIADIC` is not allowed in table function call"
368 );
369 self.ensure_table_function_allowed()?;
370 return Ok(TableFunction::new_internal_backfill_progress().into());
371 }
372 if func_name.eq("internal_source_backfill_progress") {
374 reject_syntax!(
375 arg_list.variadic,
376 "`VARIADIC` is not allowed in table function call"
377 );
378 self.ensure_table_function_allowed()?;
379 return Ok(TableFunction::new_internal_source_backfill_progress().into());
380 }
381 if let Some(ref udf) = udf
383 && udf.kind.is_table()
384 {
385 reject_syntax!(
386 arg_list.variadic,
387 "`VARIADIC` is not allowed in table function call"
388 );
389 self.ensure_table_function_allowed()?;
390 if udf.language == "sql" {
391 return self.bind_sql_udf(udf.clone(), args);
392 }
393 return Ok(TableFunction::new_user_defined(udf.clone(), args).into());
394 }
395 if let Ok(function_type) = TableFunctionType::from_str(&func_name) {
397 reject_syntax!(
398 arg_list.variadic,
399 "`VARIADIC` is not allowed in table function call"
400 );
401 self.ensure_table_function_allowed()?;
402 return Ok(TableFunction::new(function_type, args)?.into());
403 }
404 }
405
406 if let Some(ref udf) = udf {
408 assert!(udf.kind.is_scalar());
409 reject_syntax!(
410 arg_list.variadic,
411 "`VARIADIC` is not allowed in user-defined function call"
412 );
413 if udf.language == "sql" {
414 return self.bind_sql_udf(udf.clone(), args);
415 }
416 return Ok(UserDefinedFunction::new(udf.clone(), args).into());
417 }
418
419 self.bind_builtin_scalar_function(&func_name, args, arg_list.variadic)
420 }
421
422 fn validate_and_bind_special_function_params(
423 &mut self,
424 func_name: &str,
425 scalar_as_agg: bool,
426 arg_list: FunctionArgList,
427 within_group: &Option<Box<OrderByExpr>>,
428 filter: &Option<Box<risingwave_sqlparser::ast::Expr>>,
429 over: &Option<Window>,
430 ) -> Result<ExprImpl> {
431 assert!(["array_transform", "map_filter"].contains(&func_name));
432
433 reject_syntax!(
434 scalar_as_agg,
435 "`AGGREGATE:` prefix is not allowed for `{}`",
436 func_name
437 );
438 reject_syntax!(
439 !arg_list.is_args_only(),
440 "keywords like `DISTINCT`, `ORDER BY` are not allowed in `{}` argument list",
441 func_name
442 );
443 reject_syntax!(
444 within_group.is_some(),
445 "`WITHIN GROUP` is not allowed in `{}` call",
446 func_name
447 );
448 reject_syntax!(
449 filter.is_some(),
450 "`FILTER` is not allowed in `{}` call",
451 func_name
452 );
453 reject_syntax!(
454 over.is_some(),
455 "`OVER` is not allowed in `{}` call",
456 func_name
457 );
458 if func_name == "array_transform" {
459 self.bind_array_transform(arg_list.args)
460 } else {
461 self.bind_map_filter(arg_list.args)
462 }
463 }
464
465 fn bind_array_transform(&mut self, args: Vec<FunctionArg>) -> Result<ExprImpl> {
466 let [array, lambda] = <[FunctionArg; 2]>::try_from(args).map_err(|args| -> RwError {
467 ErrorCode::BindError(format!(
468 "`array_transform` expect two inputs `array` and `lambda`, but {} were given",
469 args.len()
470 ))
471 .into()
472 })?;
473
474 let bound_array = self.bind_function_arg(array)?;
475 let [bound_array] = <[ExprImpl; 1]>::try_from(bound_array).map_err(|bound_array| -> RwError {
476 ErrorCode::BindError(format!("The `array` argument for `array_transform` should be bound to one argument, but {} were got", bound_array.len()))
477 .into()
478 })?;
479
480 let inner_ty = match bound_array.return_type() {
481 DataType::List(ty) => *ty,
482 real_type => return Err(ErrorCode::BindError(format!(
483 "The `array` argument for `array_transform` should be an array, but {} were got",
484 real_type
485 ))
486 .into()),
487 };
488
489 let ast::FunctionArgExpr::Expr(ast::Expr::LambdaFunction {
490 args: lambda_args,
491 body: lambda_body,
492 }) = lambda.get_expr()
493 else {
494 return Err(ErrorCode::BindError(
495 "The `lambda` argument for `array_transform` should be a lambda function"
496 .to_owned(),
497 )
498 .into());
499 };
500
501 let [lambda_arg] = <[Ident; 1]>::try_from(lambda_args).map_err(|args| -> RwError {
502 ErrorCode::BindError(format!(
503 "The `lambda` argument for `array_transform` should be a lambda function with one argument, but {} were given",
504 args.len()
505 ))
506 .into()
507 })?;
508
509 let bound_lambda = self.bind_unary_lambda_function(inner_ty, lambda_arg, *lambda_body)?;
510
511 let lambda_ret_type = bound_lambda.return_type();
512 let transform_ret_type = DataType::List(Box::new(lambda_ret_type));
513
514 Ok(ExprImpl::FunctionCallWithLambda(Box::new(
515 FunctionCallWithLambda::new_unchecked(
516 ExprType::ArrayTransform,
517 vec![bound_array],
518 bound_lambda,
519 transform_ret_type,
520 ),
521 )))
522 }
523
524 fn bind_unary_lambda_function(
525 &mut self,
526 input_ty: DataType,
527 arg: Ident,
528 body: ast::Expr,
529 ) -> Result<ExprImpl> {
530 let lambda_args = HashMap::from([(arg.real_value(), (0usize, input_ty))]);
531 let orig_lambda_args = self.context.lambda_args.replace(lambda_args);
532 let body = self.bind_expr_inner(body)?;
533 self.context.lambda_args = orig_lambda_args;
534
535 Ok(body)
536 }
537
538 fn bind_map_filter(&mut self, args: Vec<FunctionArg>) -> Result<ExprImpl> {
539 let [input, lambda] = <[FunctionArg; 2]>::try_from(args).map_err(|args| {
540 ErrorCode::BindError(format!(
541 "`map_filter` requires two arguments (input_map and lambda), got {}",
542 args.len()
543 ))
544 })?;
545
546 let bound_input = self.bind_function_arg(input)?;
547 let [bound_input] = <[ExprImpl; 1]>::try_from(bound_input).map_err(|e| {
548 ErrorCode::BindError(format!(
549 "Input argument should resolve to single expression, got {}",
550 e.len()
551 ))
552 })?;
553
554 let (key_type, value_type) = match bound_input.return_type() {
555 DataType::Map(map_type) => (map_type.key().clone(), map_type.value().clone()),
556 t => {
557 return Err(
558 ErrorCode::BindError(format!("Input must be Map type, got {}", t)).into(),
559 );
560 }
561 };
562
563 let ast::FunctionArgExpr::Expr(ast::Expr::LambdaFunction {
564 args: lambda_args,
565 body: lambda_body,
566 }) = lambda.get_expr()
567 else {
568 return Err(ErrorCode::BindError(
569 "Second argument must be a lambda function".to_owned(),
570 )
571 .into());
572 };
573
574 let [key_arg, value_arg] = <[Ident; 2]>::try_from(lambda_args).map_err(|args| {
575 ErrorCode::BindError(format!(
576 "Lambda must have exactly two parameters (key, value), got {}",
577 args.len()
578 ))
579 })?;
580
581 let bound_lambda = self.bind_binary_lambda_function(
582 key_arg,
583 key_type.clone(),
584 value_arg,
585 value_type.clone(),
586 *lambda_body,
587 )?;
588
589 let lambda_ret_type = bound_lambda.return_type();
590 if lambda_ret_type != DataType::Boolean {
591 return Err(ErrorCode::BindError(format!(
592 "Lambda must return Boolean type, got {}",
593 lambda_ret_type
594 ))
595 .into());
596 }
597
598 let map_type = MapType::from_kv(key_type, value_type);
599 let return_type = DataType::Map(map_type);
600
601 Ok(ExprImpl::FunctionCallWithLambda(Box::new(
602 FunctionCallWithLambda::new_unchecked(
603 ExprType::MapFilter,
604 vec![bound_input],
605 bound_lambda,
606 return_type,
607 ),
608 )))
609 }
610
611 fn bind_binary_lambda_function(
612 &mut self,
613 first_arg: Ident,
614 first_ty: DataType,
615 second_arg: Ident,
616 second_ty: DataType,
617 body: ast::Expr,
618 ) -> Result<ExprImpl> {
619 let lambda_args = HashMap::from([
620 (first_arg.real_value(), (0usize, first_ty)),
621 (second_arg.real_value(), (1usize, second_ty)),
622 ]);
623
624 let orig_ctx = self.context.lambda_args.replace(lambda_args);
625 let bound_body = self.bind_expr_inner(body)?;
626 self.context.lambda_args = orig_ctx;
627
628 Ok(bound_body)
629 }
630
631 fn ensure_table_function_allowed(&self) -> Result<()> {
632 if let Some(clause) = self.context.clause {
633 match clause {
634 Clause::JoinOn
635 | Clause::Where
636 | Clause::Having
637 | Clause::Filter
638 | Clause::Values
639 | Clause::Insert
640 | Clause::GeneratedColumn => {
641 return Err(ErrorCode::InvalidInputSyntax(format!(
642 "table functions are not allowed in {}",
643 clause
644 ))
645 .into());
646 }
647 Clause::GroupBy | Clause::From => {}
648 }
649 }
650 Ok(())
651 }
652
653 fn bind_sql_udf(
654 &mut self,
655 func: Arc<FunctionCatalog>,
656 args: Vec<ExprImpl>,
657 ) -> Result<ExprImpl> {
658 if func.body.is_none() {
659 return Err(
660 ErrorCode::InvalidInputSyntax("`body` must exist for sql udf".to_owned()).into(),
661 );
662 }
663
664 let parse_result =
666 risingwave_sqlparser::parser::Parser::parse_sql(func.body.as_ref().unwrap().as_str());
667 if let Err(ParserError::ParserError(err)) | Err(ParserError::TokenizerError(err)) =
668 parse_result
669 {
670 return Err(ErrorCode::InvalidInputSyntax(err).into());
672 }
673
674 debug_assert!(parse_result.is_ok());
675
676 let ast = parse_result.unwrap();
678
679 let stashed_udf_context = self.udf_context.get_context();
685
686 let mut udf_context = HashMap::new();
689 for (i, arg) in args.into_iter().enumerate() {
690 if func.arg_names[i].is_empty() {
691 udf_context.insert(format!("${}", i + 1), arg);
693 } else {
694 udf_context.insert(func.arg_names[i].clone(), arg);
696 }
697 }
698 self.udf_context.update_context(udf_context);
699
700 if self.udf_context.global_count() >= SQL_UDF_MAX_CALLING_DEPTH {
702 return Err(ErrorCode::BindError(format!(
703 "function {} calling stack depth limit exceeded",
704 func.name
705 ))
706 .into());
707 } else {
708 self.udf_context.incr_global_count();
710 }
711
712 if let Ok(expr) = UdfContext::extract_udf_expression(ast) {
713 let bind_result = self.bind_expr(expr);
714
715 self.udf_context.decr_global_count();
719
720 self.udf_context.update_context(stashed_udf_context);
722
723 return bind_result;
724 }
725
726 Err(ErrorCode::InvalidInputSyntax(
727 "failed to parse the input query and extract the udf expression,
728 please recheck the syntax"
729 .to_owned(),
730 )
731 .into())
732 }
733
734 pub(in crate::binder) fn bind_function_expr_arg(
735 &mut self,
736 arg_expr: FunctionArgExpr,
737 ) -> Result<Vec<ExprImpl>> {
738 match arg_expr {
739 FunctionArgExpr::Expr(expr) => Ok(vec![self.bind_expr_inner(expr)?]),
740 FunctionArgExpr::QualifiedWildcard(_, _)
741 | FunctionArgExpr::ExprQualifiedWildcard(_, _) => Err(ErrorCode::InvalidInputSyntax(
742 format!("unexpected wildcard {}", arg_expr),
743 )
744 .into()),
745 FunctionArgExpr::Wildcard(None) => Ok(vec![]),
746 FunctionArgExpr::Wildcard(Some(_)) => unreachable!(),
747 }
748 }
749
750 pub(in crate::binder) fn bind_function_arg(
751 &mut self,
752 arg: FunctionArg,
753 ) -> Result<Vec<ExprImpl>> {
754 match arg {
755 FunctionArg::Unnamed(expr) => self.bind_function_expr_arg(expr),
756 FunctionArg::Named { .. } => todo!(),
757 }
758 }
759}