1use std::collections::{HashMap, HashSet};
16use std::str::FromStr;
17use std::sync::Arc;
18
19use anyhow::Context;
20use itertools::Itertools;
21use risingwave_common::acl::AclMode;
22use risingwave_common::bail_not_implemented;
23use risingwave_common::catalog::INFORMATION_SCHEMA_SCHEMA_NAME;
24use risingwave_common::types::{DataType, MapType, StructType};
25use risingwave_common::util::iter_util::ZipEqFast;
26use risingwave_expr::aggregate::AggType;
27use risingwave_expr::window_function::WindowFuncKind;
28use risingwave_sqlparser::ast::{
29 self, Expr as AstExpr, Function, FunctionArg, FunctionArgExpr, FunctionArgList, Ident,
30 OrderByExpr, SecretRefAsType, Statement, Window,
31};
32use risingwave_sqlparser::parser::Parser;
33
34use crate::binder::Binder;
35use crate::binder::bind_context::Clause;
36use crate::catalog::OwnedByUserCatalog;
37use crate::catalog::function_catalog::FunctionCatalog;
38use crate::error::{ErrorCode, Result, RwError};
39use crate::expr::{
40 Expr, ExprImpl, ExprType, FunctionCall, FunctionCallWithLambda, InputRef, TableFunction,
41 TableFunctionType, UserDefinedFunction,
42};
43use crate::handler::privilege::ObjectCheckItem;
44
45mod aggregate;
46mod builtin_scalar;
47mod window;
48
49const SYS_FUNCTION_WITHOUT_ARGS: &[&str] = &[
51 "session_user",
52 "user",
53 "current_user",
54 "current_role",
55 "current_catalog",
56 "current_schema",
57 "current_timestamp",
58];
59
60pub(super) fn is_sys_function_without_args(ident: &Ident) -> bool {
61 SYS_FUNCTION_WITHOUT_ARGS
62 .iter()
63 .any(|e| ident.real_value().as_str() == *e && ident.quote_style().is_none())
64}
65
66macro_rules! reject_syntax {
67 ($pred:expr, $msg:expr) => {
68 if $pred {
69 return Err(ErrorCode::InvalidInputSyntax($msg.to_string()).into());
70 }
71 };
72
73 ($pred:expr, $fmt:expr, $($arg:tt)*) => {
74 if $pred {
75 return Err(ErrorCode::InvalidInputSyntax(
76 format!($fmt, $($arg)*)
77 ).into());
78 }
79 };
80}
81
82impl Binder {
83 pub(in crate::binder) fn bind_function(
84 &mut self,
85 Function {
86 scalar_as_agg,
87 name,
88 arg_list,
89 within_group,
90 filter,
91 over,
92 }: &Function,
93 ) -> Result<ExprImpl> {
94 let (schema_name, func_name) = match name.0.as_slice() {
95 [name] => (None, name.real_value()),
96 [schema, name] => {
97 let schema_name = schema.real_value();
98 let func_name = if schema_name == INFORMATION_SCHEMA_SCHEMA_NAME {
99 let function_name = name.real_value();
103 if function_name != "_pg_expandarray" {
104 bail_not_implemented!(
105 issue = 12422,
106 "Unsupported function name under schema: {}",
107 schema_name
108 );
109 }
110 function_name
111 } else {
112 name.real_value()
113 };
114 (Some(schema_name), func_name)
115 }
116 [database, schema, name] => {
117 let database_name = database.real_value();
119 if database_name != self.db_name {
120 return Err(ErrorCode::BindError(format!(
121 "Cross-database function call is not supported: {}",
122 name
123 ))
124 .into());
125 }
126 let schema_name = schema.real_value();
127 let func_name = name.real_value();
128 (Some(schema_name), func_name)
129 }
130 _ => bail_not_implemented!(issue = 112, "qualified function {}", name),
131 };
132
133 if func_name == "obj_description" || func_name == "col_description" {
140 return Ok(ExprImpl::literal_varchar("".to_owned()));
141 }
142
143 if func_name == "array_transform" || func_name == "map_filter" {
145 return self.validate_and_bind_special_function_params(
146 &func_name,
147 *scalar_as_agg,
148 arg_list,
149 within_group.as_deref(),
150 filter.as_deref(),
151 over.as_ref(),
152 );
153 }
154
155 let has_secret_ref_arg = arg_list.args.iter().any(|arg| {
158 matches!(
159 arg,
160 FunctionArg::Unnamed(FunctionArgExpr::SecretRef(_))
161 | FunctionArg::Named {
162 arg: FunctionArgExpr::SecretRef(_),
163 ..
164 }
165 )
166 });
167
168 if has_secret_ref_arg {
171 let has_udf_candidate = self
172 .catalog
173 .get_functions_by_name(
174 &self.db_name,
175 self.bind_schema_path(schema_name.as_deref()),
176 &func_name,
177 )
178 .map(|(funcs, _)| !funcs.is_empty())
179 .unwrap_or(false);
180 if !has_udf_candidate {
181 return Err(ErrorCode::InvalidInputSyntax(
182 "secret reference is only allowed in user-defined function arguments"
183 .to_owned(),
184 )
185 .into());
186 }
187 }
188
189 let bind_arg = if func_name.eq_ignore_ascii_case("jsonb_agg") {
190 Self::bind_jsonb_agg_arg
191 } else {
192 Self::bind_function_arg
193 };
194 let mut args: Vec<ExprImpl> = arg_list
195 .args
196 .iter()
197 .map(|arg| bind_arg(self, arg))
198 .flatten_ok()
199 .try_collect()?;
200
201 let mut referred_udfs = HashSet::new();
202 let mut is_udf_call = false;
203
204 let wrapped_agg_type = if *scalar_as_agg {
205 let mut array_args = args
208 .iter()
209 .enumerate()
210 .map(|(i, expr)| InputRef::new(i, DataType::list(expr.return_type())).into())
211 .collect_vec();
212 let schema_path = self.bind_schema_path(schema_name.as_deref());
213 let scalar_func_expr = if let Ok((func, _)) = self.catalog.get_function_by_name_inputs(
214 &self.db_name,
215 schema_path,
216 &func_name,
217 &mut array_args,
218 ) {
219 referred_udfs.insert(func.id);
221 is_udf_call = true;
222 self.check_privilege(
223 ObjectCheckItem::new(func.owner, AclMode::Execute, func.name.clone(), func.id),
224 self.database_id,
225 )?;
226
227 if !func.kind.is_scalar() {
228 return Err(ErrorCode::InvalidInputSyntax(
229 "expect a scalar function after `AGGREGATE:`".to_owned(),
230 )
231 .into());
232 }
233
234 if func.language == "sql" {
235 self.bind_sql_udf(func.clone(), array_args)?
236 } else {
237 UserDefinedFunction::new(func.clone(), array_args).into()
238 }
239 } else {
240 self.bind_builtin_scalar_function(&func_name, array_args, arg_list.variadic)?
241 };
242
243 let expr_node = match scalar_func_expr.try_to_expr_proto() {
246 Ok(expr_node) => expr_node,
247 Err(e) => {
248 return Err(ErrorCode::InvalidInputSyntax(format!(
249 "function {func_name} cannot be used after `AGGREGATE:`: {e}",
250 ))
251 .into());
252 }
253 };
254
255 Some(AggType::WrapScalar(expr_node))
257 } else {
258 None
259 };
260
261 let schema_path = self.bind_schema_path(schema_name.as_deref());
262 let udf = if wrapped_agg_type.is_none()
263 && let Ok((func, _)) = self.catalog.get_function_by_name_inputs(
264 &self.db_name,
265 schema_path,
266 &func_name,
267 &mut args,
268 ) {
269 referred_udfs.insert(func.id);
271 is_udf_call = true;
272 self.check_privilege(
273 ObjectCheckItem::new(func.owner, AclMode::Execute, func.name.clone(), func.id),
274 self.database_id,
275 )?;
276 Some(func.clone())
277 } else {
278 None
279 };
280
281 if has_secret_ref_arg && !is_udf_call {
282 return Err(ErrorCode::InvalidInputSyntax(
283 "secret reference is only allowed in user-defined function arguments".to_owned(),
284 )
285 .into());
286 }
287
288 self.included_udfs.extend(referred_udfs);
289
290 let agg_type = if wrapped_agg_type.is_some() {
291 wrapped_agg_type
292 } else if let Some(ref udf) = udf
293 && udf.kind.is_aggregate()
294 {
295 assert_ne!(udf.language, "sql", "SQL UDAF is not supported yet");
296 Some(AggType::UserDefined(udf.as_ref().into()))
297 } else {
298 AggType::from_str(&func_name).ok()
299 };
300
301 if let Some(over) = over {
303 reject_syntax!(
304 arg_list.distinct,
305 "`DISTINCT` is not allowed in window function call"
306 );
307 reject_syntax!(
308 arg_list.variadic,
309 "`VARIADIC` is not allowed in window function call"
310 );
311 reject_syntax!(
312 !arg_list.order_by.is_empty(),
313 "`ORDER BY` is not allowed in window function call argument list"
314 );
315 reject_syntax!(
316 within_group.is_some(),
317 "`WITHIN GROUP` is not allowed in window function call"
318 );
319
320 let kind = if let Some(agg_type) = agg_type {
321 WindowFuncKind::Aggregate(agg_type)
323 } else if let Ok(kind) = WindowFuncKind::from_str(&func_name) {
324 kind
325 } else {
326 bail_not_implemented!(issue = 8961, "Unrecognized window function: {}", func_name);
327 };
328 return self.bind_window_function(
329 kind,
330 args,
331 arg_list.ignore_nulls,
332 filter.as_deref(),
333 over,
334 );
335 }
336
337 reject_syntax!(
339 arg_list.ignore_nulls,
340 "`IGNORE NULLS` is not allowed in aggregate/scalar/table function call"
341 );
342
343 if let Some(agg_type) = agg_type {
345 reject_syntax!(
346 arg_list.variadic,
347 "`VARIADIC` is not allowed in aggregate function call"
348 );
349 return self.bind_aggregate_function(
350 agg_type,
351 arg_list.distinct,
352 args,
353 &arg_list.order_by,
354 within_group.as_deref(),
355 filter.as_deref(),
356 );
357 }
358
359 reject_syntax!(
361 arg_list.distinct,
362 "`DISTINCT` is not allowed in scalar/table function call"
363 );
364 reject_syntax!(
365 !arg_list.order_by.is_empty(),
366 "`ORDER BY` is not allowed in scalar/table function call"
367 );
368 reject_syntax!(
369 within_group.is_some(),
370 "`WITHIN GROUP` is not allowed in scalar/table function call"
371 );
372 reject_syntax!(
373 filter.is_some(),
374 "`FILTER` is not allowed in scalar/table function call"
375 );
376
377 {
379 if func_name.eq_ignore_ascii_case("file_scan") {
381 reject_syntax!(
382 arg_list.variadic,
383 "`VARIADIC` is not allowed in table function call"
384 );
385 self.ensure_table_function_allowed()?;
386 return Ok(TableFunction::new_file_scan(args)?.into());
387 }
388 if func_name.eq("postgres_query") {
390 reject_syntax!(
391 arg_list.variadic,
392 "`VARIADIC` is not allowed in table function call"
393 );
394 self.ensure_table_function_allowed()?;
395 return Ok(TableFunction::new_postgres_query(
396 &self.catalog,
397 &self.db_name,
398 self.bind_schema_path(schema_name.as_deref()),
399 args,
400 )
401 .context("postgres_query error")?
402 .into());
403 }
404 if func_name.eq("mysql_query") {
406 reject_syntax!(
407 arg_list.variadic,
408 "`VARIADIC` is not allowed in table function call"
409 );
410 self.ensure_table_function_allowed()?;
411 return Ok(TableFunction::new_mysql_query(
412 &self.catalog,
413 &self.db_name,
414 self.bind_schema_path(schema_name.as_deref()),
415 args,
416 )
417 .context("mysql_query error")?
418 .into());
419 }
420 if func_name.eq("internal_backfill_progress") {
422 reject_syntax!(
423 arg_list.variadic,
424 "`VARIADIC` is not allowed in table function call"
425 );
426 self.ensure_table_function_allowed()?;
427 return Ok(TableFunction::new_internal_backfill_progress().into());
428 }
429 if func_name.eq("internal_source_backfill_progress") {
431 reject_syntax!(
432 arg_list.variadic,
433 "`VARIADIC` is not allowed in table function call"
434 );
435 self.ensure_table_function_allowed()?;
436 return Ok(TableFunction::new_internal_source_backfill_progress().into());
437 }
438 if func_name.eq("internal_get_channel_delta_stats") {
440 reject_syntax!(
441 arg_list.variadic,
442 "`VARIADIC` is not allowed in table function call"
443 );
444 self.ensure_table_function_allowed()?;
445
446 return Ok(TableFunction::new_internal_get_channel_delta_stats(args).into());
447 }
448 if let Some(ref udf) = udf
450 && udf.kind.is_table()
451 {
452 reject_syntax!(
453 arg_list.variadic,
454 "`VARIADIC` is not allowed in table function call"
455 );
456 self.ensure_table_function_allowed()?;
457 if udf.language == "sql" {
458 return self.bind_sql_udf(udf.clone(), args);
459 }
460 return Ok(TableFunction::new_user_defined(udf.clone(), args).into());
461 }
462 if let Ok(function_type) = TableFunctionType::from_str(&func_name) {
464 reject_syntax!(
465 arg_list.variadic,
466 "`VARIADIC` is not allowed in table function call"
467 );
468 self.ensure_table_function_allowed()?;
469 return Ok(TableFunction::new(function_type, args)?.into());
470 }
471 }
472
473 if let Some(ref udf) = udf {
475 assert!(udf.kind.is_scalar());
476 reject_syntax!(
477 arg_list.variadic,
478 "`VARIADIC` is not allowed in user-defined function call"
479 );
480 if udf.language == "sql" {
481 return self.bind_sql_udf(udf.clone(), args);
482 }
483 return Ok(UserDefinedFunction::new(udf.clone(), args).into());
484 }
485
486 self.bind_builtin_scalar_function(&func_name, args, arg_list.variadic)
487 }
488
489 fn validate_and_bind_special_function_params(
490 &mut self,
491 func_name: &str,
492 scalar_as_agg: bool,
493 arg_list: &FunctionArgList,
494 within_group: Option<&OrderByExpr>,
495 filter: Option<&risingwave_sqlparser::ast::Expr>,
496 over: Option<&Window>,
497 ) -> Result<ExprImpl> {
498 assert!(["array_transform", "map_filter"].contains(&func_name));
499
500 reject_syntax!(
501 scalar_as_agg,
502 "`AGGREGATE:` prefix is not allowed for `{}`",
503 func_name
504 );
505 reject_syntax!(
506 !arg_list.is_args_only(),
507 "keywords like `DISTINCT`, `ORDER BY` are not allowed in `{}` argument list",
508 func_name
509 );
510 reject_syntax!(
511 within_group.is_some(),
512 "`WITHIN GROUP` is not allowed in `{}` call",
513 func_name
514 );
515 reject_syntax!(
516 filter.is_some(),
517 "`FILTER` is not allowed in `{}` call",
518 func_name
519 );
520 reject_syntax!(
521 over.is_some(),
522 "`OVER` is not allowed in `{}` call",
523 func_name
524 );
525 if func_name == "array_transform" {
526 self.bind_array_transform(&arg_list.args)
527 } else {
528 self.bind_map_filter(&arg_list.args)
529 }
530 }
531
532 fn bind_array_transform(&mut self, args: &[FunctionArg]) -> Result<ExprImpl> {
533 let [array, lambda] = args else {
534 return Err(ErrorCode::BindError(format!(
535 "`array_transform` expect two inputs `array` and `lambda`, but {} were given",
536 args.len()
537 ))
538 .into());
539 };
540
541 let bound_array = self.bind_function_arg(array)?;
542 let [bound_array] = <[ExprImpl; 1]>::try_from(bound_array).map_err(|bound_array| -> RwError {
543 ErrorCode::BindError(format!("The `array` argument for `array_transform` should be bound to one argument, but {} were got", bound_array.len()))
544 .into()
545 })?;
546
547 let inner_ty = match bound_array.return_type() {
548 DataType::List(ty) => ty.into_elem(),
549 real_type => return Err(ErrorCode::BindError(format!(
550 "The `array` argument for `array_transform` should be an array, but {} were got",
551 real_type
552 ))
553 .into()),
554 };
555
556 let ast::FunctionArgExpr::Expr(ast::Expr::LambdaFunction {
557 args: lambda_args,
558 body: lambda_body,
559 }) = lambda.get_expr()
560 else {
561 return Err(ErrorCode::BindError(
562 "The `lambda` argument for `array_transform` should be a lambda function"
563 .to_owned(),
564 )
565 .into());
566 };
567
568 let [lambda_arg] = <[Ident; 1]>::try_from(lambda_args).map_err(|args| -> RwError {
569 ErrorCode::BindError(format!(
570 "The `lambda` argument for `array_transform` should be a lambda function with one argument, but {} were given",
571 args.len()
572 ))
573 .into()
574 })?;
575
576 let bound_lambda = self.bind_unary_lambda_function(inner_ty, lambda_arg, *lambda_body)?;
577
578 let lambda_ret_type = bound_lambda.return_type();
579 let transform_ret_type = DataType::list(lambda_ret_type);
580
581 Ok(ExprImpl::FunctionCallWithLambda(Box::new(
582 FunctionCallWithLambda::new_unchecked(
583 ExprType::ArrayTransform,
584 vec![bound_array],
585 bound_lambda,
586 transform_ret_type,
587 ),
588 )))
589 }
590
591 fn bind_unary_lambda_function(
592 &mut self,
593 input_ty: DataType,
594 arg: Ident,
595 body: ast::Expr,
596 ) -> Result<ExprImpl> {
597 let lambda_args = HashMap::from([(arg.real_value(), (0usize, input_ty))]);
598 let orig_lambda_args = self.context.lambda_args.replace(lambda_args);
599 let body = self.bind_expr_inner(&body)?;
600 self.context.lambda_args = orig_lambda_args;
601
602 Ok(body)
603 }
604
605 fn bind_map_filter(&mut self, args: &[FunctionArg]) -> Result<ExprImpl> {
606 let [input, lambda] = args else {
607 return Err(ErrorCode::BindError(format!(
608 "`map_filter` requires two arguments (input_map and lambda), got {}",
609 args.len()
610 ))
611 .into());
612 };
613
614 let bound_input = self.bind_function_arg(input)?;
615 let [bound_input] = <[ExprImpl; 1]>::try_from(bound_input).map_err(|e| {
616 ErrorCode::BindError(format!(
617 "Input argument should resolve to single expression, got {}",
618 e.len()
619 ))
620 })?;
621
622 let (key_type, value_type) = match bound_input.return_type() {
623 DataType::Map(map_type) => (map_type.key().clone(), map_type.value().clone()),
624 t => {
625 return Err(
626 ErrorCode::BindError(format!("Input must be Map type, got {}", t)).into(),
627 );
628 }
629 };
630
631 let ast::FunctionArgExpr::Expr(ast::Expr::LambdaFunction {
632 args: lambda_args,
633 body: lambda_body,
634 }) = lambda.get_expr()
635 else {
636 return Err(ErrorCode::BindError(
637 "Second argument must be a lambda function".to_owned(),
638 )
639 .into());
640 };
641
642 let [key_arg, value_arg] = <[Ident; 2]>::try_from(lambda_args).map_err(|args| {
643 ErrorCode::BindError(format!(
644 "Lambda must have exactly two parameters (key, value), got {}",
645 args.len()
646 ))
647 })?;
648
649 let bound_lambda = self.bind_binary_lambda_function(
650 key_arg,
651 key_type.clone(),
652 value_arg,
653 value_type.clone(),
654 *lambda_body,
655 )?;
656
657 let lambda_ret_type = bound_lambda.return_type();
658 if lambda_ret_type != DataType::Boolean {
659 return Err(ErrorCode::BindError(format!(
660 "Lambda must return Boolean type, got {}",
661 lambda_ret_type
662 ))
663 .into());
664 }
665
666 let map_type = MapType::from_kv(key_type, value_type);
667 let return_type = DataType::Map(map_type);
668
669 Ok(ExprImpl::FunctionCallWithLambda(Box::new(
670 FunctionCallWithLambda::new_unchecked(
671 ExprType::MapFilter,
672 vec![bound_input],
673 bound_lambda,
674 return_type,
675 ),
676 )))
677 }
678
679 fn bind_binary_lambda_function(
680 &mut self,
681 first_arg: Ident,
682 first_ty: DataType,
683 second_arg: Ident,
684 second_ty: DataType,
685 body: ast::Expr,
686 ) -> Result<ExprImpl> {
687 let lambda_args = HashMap::from([
688 (first_arg.real_value(), (0usize, first_ty)),
689 (second_arg.real_value(), (1usize, second_ty)),
690 ]);
691
692 let orig_ctx = self.context.lambda_args.replace(lambda_args);
693 let bound_body = self.bind_expr_inner(&body)?;
694 self.context.lambda_args = orig_ctx;
695
696 Ok(bound_body)
697 }
698
699 fn ensure_table_function_allowed(&self) -> Result<()> {
700 if let Some(clause) = self.context.clause {
701 match clause {
702 Clause::JoinOn
703 | Clause::Where
704 | Clause::Having
705 | Clause::Filter
706 | Clause::Values
707 | Clause::Insert
708 | Clause::GeneratedColumn => {
709 return Err(ErrorCode::InvalidInputSyntax(format!(
710 "table functions are not allowed in {}",
711 clause
712 ))
713 .into());
714 }
715 Clause::GroupBy | Clause::From => {}
716 }
717 }
718 Ok(())
719 }
720
721 pub(crate) fn extract_udf_expr(ast: Vec<Statement>) -> Result<AstExpr> {
723 if ast.len() != 1 {
724 return Err(ErrorCode::InvalidInputSyntax(
725 "the query for sql udf should contain only one statement".to_owned(),
726 )
727 .into());
728 }
729
730 let Statement::Query(query) = ast.into_iter().next().unwrap() else {
732 return Err(ErrorCode::InvalidInputSyntax(
733 "invalid function definition, please recheck the syntax".to_owned(),
734 )
735 .into());
736 };
737
738 if let Some(expr) = query.as_single_select_item() {
739 Ok(expr.clone())
741 } else {
742 Ok(AstExpr::Subquery(query))
744 }
745 }
746
747 pub fn bind_sql_udf_inner(
748 &mut self,
749 body: &str,
750 arg_names: &[String],
751 args: Vec<ExprImpl>,
752 ) -> Result<ExprImpl> {
753 let ast = Parser::parse_sql(body)?;
755
756 let stashed_arguments = self.context.sql_udf_arguments.take();
760
761 let mut arguments = HashMap::new();
763 for (i, arg) in args.into_iter().enumerate() {
764 if arg_names[i].is_empty() {
765 arguments.insert(format!("${}", i + 1), arg);
767 } else {
768 arguments.insert(arg_names[i].clone(), arg);
770 }
771 }
772 self.context.sql_udf_arguments = Some(arguments);
773
774 let Ok(expr) = Self::extract_udf_expr(ast) else {
775 return Err(ErrorCode::InvalidInputSyntax(
776 "failed to parse the input query and extract the udf expression, \
777 please recheck the syntax"
778 .to_owned(),
779 )
780 .into());
781 };
782
783 let bind_result = self.bind_expr(&expr);
784 self.context.sql_udf_arguments = stashed_arguments;
786
787 bind_result
788 }
789
790 fn bind_sql_udf(
791 &mut self,
792 func: Arc<FunctionCatalog>,
793 args: Vec<ExprImpl>,
794 ) -> Result<ExprImpl> {
795 let Some(body) = &func.body else {
796 return Err(
797 ErrorCode::InvalidInputSyntax("`body` must exist for sql udf".to_owned()).into(),
798 );
799 };
800
801 self.bind_sql_udf_inner(body, &func.arg_names, args)
802 }
803
804 pub(in crate::binder) fn bind_function_expr_arg(
805 &mut self,
806 arg_expr: &FunctionArgExpr,
807 ) -> Result<Vec<ExprImpl>> {
808 match arg_expr {
809 FunctionArgExpr::Expr(expr) => Ok(vec![self.bind_expr_inner(expr)?]),
810 FunctionArgExpr::QualifiedWildcard(_, _)
811 | FunctionArgExpr::ExprQualifiedWildcard(_, _) => Err(ErrorCode::InvalidInputSyntax(
812 format!("unexpected wildcard {}", arg_expr),
813 )
814 .into()),
815 FunctionArgExpr::Wildcard(None) => Ok(vec![]),
816 FunctionArgExpr::Wildcard(Some(_)) => unreachable!(),
817 FunctionArgExpr::SecretRef(secret_ref_value) => {
818 let (schema_name, secret_name) = Binder::resolve_schema_qualified_name(
819 &self.db_name,
820 &secret_ref_value.secret_name,
821 )?;
822 let schema_path = self.bind_schema_path(schema_name.as_deref());
823 let (secret_catalog, _) =
824 self.catalog
825 .get_secret_by_name(&self.db_name, schema_path, &secret_name)?;
826
827 self.check_privilege(
828 ObjectCheckItem::new(
829 secret_catalog.owner(),
830 AclMode::Usage,
831 secret_catalog.name.clone(),
832 secret_catalog.id,
833 ),
834 self.database_id,
835 )?;
836
837 self.included_secrets.insert(secret_catalog.id);
838
839 let ref_as = match secret_ref_value.ref_as {
840 SecretRefAsType::Text => risingwave_pb::secret::secret_ref::RefAsType::Text,
841 SecretRefAsType::File => risingwave_pb::secret::secret_ref::RefAsType::File,
842 };
843
844 Ok(vec![
845 crate::expr::SecretRef {
846 secret_id: secret_catalog.id,
847 ref_as,
848 secret_name: secret_catalog.name.clone(),
849 }
850 .into(),
851 ])
852 }
853 }
854 }
855
856 pub(in crate::binder) fn bind_function_arg(
857 &mut self,
858 arg: &FunctionArg,
859 ) -> Result<Vec<ExprImpl>> {
860 match arg {
861 FunctionArg::Unnamed(expr) => self.bind_function_expr_arg(expr),
862 FunctionArg::Named { .. } => Err(ErrorCode::InvalidInputSyntax(
863 "named function arguments are not supported yet".to_owned(),
864 )
865 .into()),
866 }
867 }
868
869 fn bind_jsonb_agg_arg(&mut self, arg: &FunctionArg) -> Result<Vec<ExprImpl>> {
870 match arg {
871 FunctionArg::Unnamed(FunctionArgExpr::QualifiedWildcard(prefix, except)) => {
872 let relation_name = prefix.to_string();
873 let (schema_name, table_name) =
874 Binder::resolve_schema_qualified_name(&self.db_name, prefix)?;
875 let except_indices = self.generate_except_indices(except.as_deref())?;
876 let (begin, end) = self
877 .context
878 .range_of
879 .get(&(schema_name, table_name))
880 .ok_or_else(|| {
881 ErrorCode::ItemNotFound(format!("relation \"{}\"", relation_name))
882 })?;
883 let (exprs, names) = Self::iter_bound_columns(
884 self.context.columns[*begin..*end]
885 .iter()
886 .filter(|c| !c.is_hidden && !except_indices.contains(&c.index)),
887 );
888 self.wrap_wildcard_exprs_as_named_row(exprs, names)
889 }
890 FunctionArg::Unnamed(FunctionArgExpr::ExprQualifiedWildcard(expr, prefix)) => {
891 let (exprs, names) = self.bind_wildcard_field_column(expr, prefix)?;
892 self.wrap_wildcard_exprs_as_named_row(exprs, names)
893 }
894 _ => self.bind_function_arg(arg),
895 }
896 }
897
898 fn wrap_wildcard_exprs_as_named_row(
899 &self,
900 exprs: Vec<ExprImpl>,
901 names: Vec<Option<String>>,
902 ) -> Result<Vec<ExprImpl>> {
903 let return_type =
904 DataType::Struct(StructType::new(
905 names.into_iter().zip_eq_fast(exprs.iter()).enumerate().map(
906 |(idx, (name, expr))| {
907 (
908 name.unwrap_or_else(|| format!("f{}", idx + 1)),
909 expr.return_type(),
910 )
911 },
912 ),
913 ));
914 Ok(vec![
915 FunctionCall::new_unchecked(ExprType::Row, exprs, return_type).into(),
916 ])
917 }
918}