1use std::collections::{HashMap, HashSet};
16use std::str::FromStr;
17use std::sync::Arc;
18
19use anyhow::Context;
20use itertools::Itertools;
21use risingwave_common::acl::AclMode;
22use risingwave_common::bail_not_implemented;
23use risingwave_common::catalog::{INFORMATION_SCHEMA_SCHEMA_NAME, PG_CATALOG_SCHEMA_NAME};
24use risingwave_common::types::DataType;
25use risingwave_expr::aggregate::AggType;
26use risingwave_expr::window_function::WindowFuncKind;
27use risingwave_pb::user::grant_privilege::PbObject;
28use risingwave_sqlparser::ast::{self, Function, FunctionArg, FunctionArgExpr, Ident};
29use risingwave_sqlparser::parser::ParserError;
30
31use crate::binder::bind_context::Clause;
32use crate::binder::{Binder, UdfContext};
33use crate::catalog::function_catalog::FunctionCatalog;
34use crate::error::{ErrorCode, Result, RwError};
35use crate::expr::{
36 Expr, ExprImpl, ExprType, FunctionCallWithLambda, InputRef, TableFunction, TableFunctionType,
37 UserDefinedFunction,
38};
39
40mod aggregate;
41mod builtin_scalar;
42mod window;
43
44const SYS_FUNCTION_WITHOUT_ARGS: &[&str] = &[
46 "session_user",
47 "user",
48 "current_user",
49 "current_role",
50 "current_catalog",
51 "current_schema",
52 "current_timestamp",
53];
54
55pub(super) fn is_sys_function_without_args(ident: &Ident) -> bool {
56 SYS_FUNCTION_WITHOUT_ARGS
57 .iter()
58 .any(|e| ident.real_value().as_str() == *e && ident.quote_style().is_none())
59}
60
61const SQL_UDF_MAX_CALLING_DEPTH: u32 = 16;
66
67macro_rules! reject_syntax {
68 ($pred:expr, $msg:expr) => {
69 if $pred {
70 return Err(ErrorCode::InvalidInputSyntax($msg.to_string()).into());
71 }
72 };
73}
74
75impl Binder {
76 pub(in crate::binder) fn bind_function(
77 &mut self,
78 Function {
79 scalar_as_agg,
80 name,
81 arg_list,
82 within_group,
83 filter,
84 over,
85 }: Function,
86 ) -> Result<ExprImpl> {
87 let (schema_name, func_name) = match name.0.as_slice() {
88 [name] => (None, name.real_value()),
89 [schema, name] => {
90 let schema_name = schema.real_value();
91 let func_name = if schema_name == PG_CATALOG_SCHEMA_NAME {
92 name.real_value()
95 } else if schema_name == INFORMATION_SCHEMA_SCHEMA_NAME {
96 let function_name = name.real_value();
100 if function_name != "_pg_expandarray" {
101 bail_not_implemented!(
102 issue = 12422,
103 "Unsupported function name under schema: {}",
104 schema_name
105 );
106 }
107 function_name
108 } else {
109 bail_not_implemented!(
110 issue = 12422,
111 "Unsupported function name under schema: {}",
112 schema_name
113 );
114 };
115 (Some(schema_name), func_name)
116 }
117 _ => bail_not_implemented!(issue = 112, "qualified function {}", name),
118 };
119
120 if func_name == "obj_description" || func_name == "col_description" {
127 return Ok(ExprImpl::literal_varchar("".to_owned()));
128 }
129
130 if func_name == "array_transform" {
132 reject_syntax!(
134 scalar_as_agg,
135 "`AGGREGATE:` prefix is not allowed for `array_transform`"
136 );
137 reject_syntax!(
138 !arg_list.is_args_only(),
139 "keywords like `DISTINCT`, `ORDER BY` are not allowed in `array_transform` argument list"
140 );
141 reject_syntax!(
142 within_group.is_some(),
143 "`WITHIN GROUP` is not allowed in `array_transform` call"
144 );
145 reject_syntax!(
146 filter.is_some(),
147 "`FILTER` is not allowed in `array_transform` call"
148 );
149 reject_syntax!(
150 over.is_some(),
151 "`OVER` is not allowed in `array_transform` call"
152 );
153 return self.bind_array_transform(arg_list.args);
154 }
155
156 let mut args: Vec<_> = arg_list
157 .args
158 .iter()
159 .map(|arg| self.bind_function_arg(arg.clone()))
160 .flatten_ok()
161 .try_collect()?;
162
163 let mut referred_udfs = HashSet::new();
164
165 let wrapped_agg_type = if scalar_as_agg {
166 let mut array_args = args
169 .iter()
170 .enumerate()
171 .map(|(i, expr)| {
172 InputRef::new(i, DataType::List(Box::new(expr.return_type()))).into()
173 })
174 .collect_vec();
175 let schema_path = self.bind_schema_path(schema_name.as_deref());
176 let scalar_func_expr = if let Ok((func, _)) = self.catalog.get_function_by_name_inputs(
177 &self.db_name,
178 schema_path,
179 &func_name,
180 &mut array_args,
181 ) {
182 referred_udfs.insert(func.id);
184 self.check_privilege(
185 PbObject::FunctionId(func.id.function_id()),
186 self.database_id,
187 AclMode::Execute,
188 func.owner,
189 )?;
190
191 if !func.kind.is_scalar() {
192 return Err(ErrorCode::InvalidInputSyntax(
193 "expect a scalar function after `AGGREGATE:`".to_owned(),
194 )
195 .into());
196 }
197
198 if func.language == "sql" {
199 self.bind_sql_udf(func.clone(), array_args)?
200 } else {
201 UserDefinedFunction::new(func.clone(), array_args).into()
202 }
203 } else {
204 self.bind_builtin_scalar_function(&func_name, array_args, arg_list.variadic)?
205 };
206
207 Some(AggType::WrapScalar(scalar_func_expr.to_expr_proto()))
209 } else {
210 None
211 };
212
213 let schema_path = self.bind_schema_path(schema_name.as_deref());
214 let udf = if wrapped_agg_type.is_none()
215 && let Ok((func, _)) = self.catalog.get_function_by_name_inputs(
216 &self.db_name,
217 schema_path,
218 &func_name,
219 &mut args,
220 ) {
221 referred_udfs.insert(func.id);
223 self.check_privilege(
224 PbObject::FunctionId(func.id.function_id()),
225 self.database_id,
226 AclMode::Execute,
227 func.owner,
228 )?;
229 Some(func.clone())
230 } else {
231 None
232 };
233
234 self.included_udfs.extend(referred_udfs);
235
236 let agg_type = if wrapped_agg_type.is_some() {
237 wrapped_agg_type
238 } else if let Some(ref udf) = udf
239 && udf.kind.is_aggregate()
240 {
241 assert_ne!(udf.language, "sql", "SQL UDAF is not supported yet");
242 Some(AggType::UserDefined(udf.as_ref().into()))
243 } else {
244 AggType::from_str(&func_name).ok()
245 };
246
247 if let Some(over) = over {
249 reject_syntax!(
250 arg_list.distinct,
251 "`DISTINCT` is not allowed in window function call"
252 );
253 reject_syntax!(
254 arg_list.variadic,
255 "`VARIADIC` is not allowed in window function call"
256 );
257 reject_syntax!(
258 !arg_list.order_by.is_empty(),
259 "`ORDER BY` is not allowed in window function call argument list"
260 );
261 reject_syntax!(
262 within_group.is_some(),
263 "`WITHIN GROUP` is not allowed in window function call"
264 );
265
266 let kind = if let Some(agg_type) = agg_type {
267 WindowFuncKind::Aggregate(agg_type)
269 } else if let Ok(kind) = WindowFuncKind::from_str(&func_name) {
270 kind
271 } else {
272 bail_not_implemented!(issue = 8961, "Unrecognized window function: {}", func_name);
273 };
274 return self.bind_window_function(kind, args, arg_list.ignore_nulls, filter, over);
275 }
276
277 reject_syntax!(
279 arg_list.ignore_nulls,
280 "`IGNORE NULLS` is not allowed in aggregate/scalar/table function call"
281 );
282
283 if let Some(agg_type) = agg_type {
285 reject_syntax!(
286 arg_list.variadic,
287 "`VARIADIC` is not allowed in aggregate function call"
288 );
289 return self.bind_aggregate_function(
290 agg_type,
291 arg_list.distinct,
292 args,
293 arg_list.order_by,
294 within_group,
295 filter,
296 );
297 }
298
299 reject_syntax!(
301 arg_list.distinct,
302 "`DISTINCT` is not allowed in scalar/table function call"
303 );
304 reject_syntax!(
305 !arg_list.order_by.is_empty(),
306 "`ORDER BY` is not allowed in scalar/table function call"
307 );
308 reject_syntax!(
309 within_group.is_some(),
310 "`WITHIN GROUP` is not allowed in scalar/table function call"
311 );
312 reject_syntax!(
313 filter.is_some(),
314 "`FILTER` is not allowed in scalar/table function call"
315 );
316
317 {
319 if func_name.eq_ignore_ascii_case("file_scan") {
321 reject_syntax!(
322 arg_list.variadic,
323 "`VARIADIC` is not allowed in table function call"
324 );
325 self.ensure_table_function_allowed()?;
326 return Ok(TableFunction::new_file_scan(args)?.into());
327 }
328 if func_name.eq("postgres_query") {
330 reject_syntax!(
331 arg_list.variadic,
332 "`VARIADIC` is not allowed in table function call"
333 );
334 self.ensure_table_function_allowed()?;
335 return Ok(TableFunction::new_postgres_query(
336 &self.catalog,
337 &self.db_name,
338 self.bind_schema_path(schema_name.as_deref()),
339 args,
340 )
341 .context("postgres_query error")?
342 .into());
343 }
344 if func_name.eq("mysql_query") {
346 reject_syntax!(
347 arg_list.variadic,
348 "`VARIADIC` is not allowed in table function call"
349 );
350 self.ensure_table_function_allowed()?;
351 return Ok(TableFunction::new_mysql_query(
352 &self.catalog,
353 &self.db_name,
354 self.bind_schema_path(schema_name.as_deref()),
355 args,
356 )
357 .context("mysql_query error")?
358 .into());
359 }
360 if let Some(ref udf) = udf
362 && udf.kind.is_table()
363 {
364 reject_syntax!(
365 arg_list.variadic,
366 "`VARIADIC` is not allowed in table function call"
367 );
368 self.ensure_table_function_allowed()?;
369 if udf.language == "sql" {
370 return self.bind_sql_udf(udf.clone(), args);
371 }
372 return Ok(TableFunction::new_user_defined(udf.clone(), args).into());
373 }
374 if let Ok(function_type) = TableFunctionType::from_str(&func_name) {
376 reject_syntax!(
377 arg_list.variadic,
378 "`VARIADIC` is not allowed in table function call"
379 );
380 self.ensure_table_function_allowed()?;
381 return Ok(TableFunction::new(function_type, args)?.into());
382 }
383 }
384
385 if let Some(ref udf) = udf {
387 assert!(udf.kind.is_scalar());
388 reject_syntax!(
389 arg_list.variadic,
390 "`VARIADIC` is not allowed in user-defined function call"
391 );
392 if udf.language == "sql" {
393 return self.bind_sql_udf(udf.clone(), args);
394 }
395 return Ok(UserDefinedFunction::new(udf.clone(), args).into());
396 }
397
398 self.bind_builtin_scalar_function(&func_name, args, arg_list.variadic)
399 }
400
401 fn bind_array_transform(&mut self, args: Vec<FunctionArg>) -> Result<ExprImpl> {
402 let [array, lambda] = <[FunctionArg; 2]>::try_from(args).map_err(|args| -> RwError {
403 ErrorCode::BindError(format!(
404 "`array_transform` expect two inputs `array` and `lambda`, but {} were given",
405 args.len()
406 ))
407 .into()
408 })?;
409
410 let bound_array = self.bind_function_arg(array)?;
411 let [bound_array] = <[ExprImpl; 1]>::try_from(bound_array).map_err(|bound_array| -> RwError {
412 ErrorCode::BindError(format!("The `array` argument for `array_transform` should be bound to one argument, but {} were got", bound_array.len()))
413 .into()
414 })?;
415
416 let inner_ty = match bound_array.return_type() {
417 DataType::List(ty) => *ty,
418 real_type => return Err(ErrorCode::BindError(format!(
419 "The `array` argument for `array_transform` should be an array, but {} were got",
420 real_type
421 ))
422 .into()),
423 };
424
425 let ast::FunctionArgExpr::Expr(ast::Expr::LambdaFunction {
426 args: lambda_args,
427 body: lambda_body,
428 }) = lambda.get_expr()
429 else {
430 return Err(ErrorCode::BindError(
431 "The `lambda` argument for `array_transform` should be a lambda function"
432 .to_owned(),
433 )
434 .into());
435 };
436
437 let [lambda_arg] = <[Ident; 1]>::try_from(lambda_args).map_err(|args| -> RwError {
438 ErrorCode::BindError(format!(
439 "The `lambda` argument for `array_transform` should be a lambda function with one argument, but {} were given",
440 args.len()
441 ))
442 .into()
443 })?;
444
445 let bound_lambda = self.bind_unary_lambda_function(inner_ty, lambda_arg, *lambda_body)?;
446
447 let lambda_ret_type = bound_lambda.return_type();
448 let transform_ret_type = DataType::List(Box::new(lambda_ret_type));
449
450 Ok(ExprImpl::FunctionCallWithLambda(Box::new(
451 FunctionCallWithLambda::new_unchecked(
452 ExprType::ArrayTransform,
453 vec![bound_array],
454 bound_lambda,
455 transform_ret_type,
456 ),
457 )))
458 }
459
460 fn bind_unary_lambda_function(
461 &mut self,
462 input_ty: DataType,
463 arg: Ident,
464 body: ast::Expr,
465 ) -> Result<ExprImpl> {
466 let lambda_args = HashMap::from([(arg.real_value(), (0usize, input_ty))]);
467 let orig_lambda_args = self.context.lambda_args.replace(lambda_args);
468 let body = self.bind_expr_inner(body)?;
469 self.context.lambda_args = orig_lambda_args;
470
471 Ok(body)
472 }
473
474 fn ensure_table_function_allowed(&self) -> Result<()> {
475 if let Some(clause) = self.context.clause {
476 match clause {
477 Clause::JoinOn
478 | Clause::Where
479 | Clause::Having
480 | Clause::Filter
481 | Clause::Values
482 | Clause::Insert
483 | Clause::GeneratedColumn => {
484 return Err(ErrorCode::InvalidInputSyntax(format!(
485 "table functions are not allowed in {}",
486 clause
487 ))
488 .into());
489 }
490 Clause::GroupBy | Clause::From => {}
491 }
492 }
493 Ok(())
494 }
495
496 fn bind_sql_udf(
497 &mut self,
498 func: Arc<FunctionCatalog>,
499 args: Vec<ExprImpl>,
500 ) -> Result<ExprImpl> {
501 if func.body.is_none() {
502 return Err(
503 ErrorCode::InvalidInputSyntax("`body` must exist for sql udf".to_owned()).into(),
504 );
505 }
506
507 let parse_result =
509 risingwave_sqlparser::parser::Parser::parse_sql(func.body.as_ref().unwrap().as_str());
510 if let Err(ParserError::ParserError(err)) | Err(ParserError::TokenizerError(err)) =
511 parse_result
512 {
513 return Err(ErrorCode::InvalidInputSyntax(err).into());
515 }
516
517 debug_assert!(parse_result.is_ok());
518
519 let ast = parse_result.unwrap();
521
522 let stashed_udf_context = self.udf_context.get_context();
528
529 let mut udf_context = HashMap::new();
532 for (i, arg) in args.into_iter().enumerate() {
533 if func.arg_names[i].is_empty() {
534 udf_context.insert(format!("${}", i + 1), arg);
536 } else {
537 udf_context.insert(func.arg_names[i].clone(), arg);
539 }
540 }
541 self.udf_context.update_context(udf_context);
542
543 if self.udf_context.global_count() >= SQL_UDF_MAX_CALLING_DEPTH {
545 return Err(ErrorCode::BindError(format!(
546 "function {} calling stack depth limit exceeded",
547 func.name
548 ))
549 .into());
550 } else {
551 self.udf_context.incr_global_count();
553 }
554
555 if let Ok(expr) = UdfContext::extract_udf_expression(ast) {
556 let bind_result = self.bind_expr(expr);
557
558 self.udf_context.decr_global_count();
562
563 self.udf_context.update_context(stashed_udf_context);
565
566 return bind_result;
567 }
568
569 Err(ErrorCode::InvalidInputSyntax(
570 "failed to parse the input query and extract the udf expression,
571 please recheck the syntax"
572 .to_owned(),
573 )
574 .into())
575 }
576
577 pub(in crate::binder) fn bind_function_expr_arg(
578 &mut self,
579 arg_expr: FunctionArgExpr,
580 ) -> Result<Vec<ExprImpl>> {
581 match arg_expr {
582 FunctionArgExpr::Expr(expr) => Ok(vec![self.bind_expr_inner(expr)?]),
583 FunctionArgExpr::QualifiedWildcard(_, _)
584 | FunctionArgExpr::ExprQualifiedWildcard(_, _) => Err(ErrorCode::InvalidInputSyntax(
585 format!("unexpected wildcard {}", arg_expr),
586 )
587 .into()),
588 FunctionArgExpr::Wildcard(None) => Ok(vec![]),
589 FunctionArgExpr::Wildcard(Some(_)) => unreachable!(),
590 }
591 }
592
593 pub(in crate::binder) fn bind_function_arg(
594 &mut self,
595 arg: FunctionArg,
596 ) -> Result<Vec<ExprImpl>> {
597 match arg {
598 FunctionArg::Unnamed(expr) => self.bind_function_expr_arg(expr),
599 FunctionArg::Named { .. } => todo!(),
600 }
601 }
602}