risingwave_frontend/binder/expr/function/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::str::FromStr;
17use std::sync::Arc;
18
19use anyhow::Context;
20use itertools::Itertools;
21use risingwave_common::acl::AclMode;
22use risingwave_common::bail_not_implemented;
23use risingwave_common::catalog::INFORMATION_SCHEMA_SCHEMA_NAME;
24use risingwave_common::types::{DataType, MapType};
25use risingwave_expr::aggregate::AggType;
26use risingwave_expr::window_function::WindowFuncKind;
27use risingwave_pb::user::grant_privilege::PbObject;
28use risingwave_sqlparser::ast::{
29    self, Expr as AstExpr, Function, FunctionArg, FunctionArgExpr, FunctionArgList, Ident,
30    OrderByExpr, Statement, Window,
31};
32use risingwave_sqlparser::parser::Parser;
33
34use crate::binder::Binder;
35use crate::binder::bind_context::Clause;
36use crate::catalog::function_catalog::FunctionCatalog;
37use crate::error::{ErrorCode, Result, RwError};
38use crate::expr::{
39    Expr, ExprImpl, ExprType, FunctionCallWithLambda, InputRef, TableFunction, TableFunctionType,
40    UserDefinedFunction,
41};
42use crate::handler::privilege::ObjectCheckItem;
43
44mod aggregate;
45mod builtin_scalar;
46mod window;
47
48// Defines system functions that without args, ref: https://www.postgresql.org/docs/current/functions-info.html
49const SYS_FUNCTION_WITHOUT_ARGS: &[&str] = &[
50    "session_user",
51    "user",
52    "current_user",
53    "current_role",
54    "current_catalog",
55    "current_schema",
56    "current_timestamp",
57];
58
59pub(super) fn is_sys_function_without_args(ident: &Ident) -> bool {
60    SYS_FUNCTION_WITHOUT_ARGS
61        .iter()
62        .any(|e| ident.real_value().as_str() == *e && ident.quote_style().is_none())
63}
64
65macro_rules! reject_syntax {
66    ($pred:expr, $msg:expr) => {
67        if $pred {
68            return Err(ErrorCode::InvalidInputSyntax($msg.to_string()).into());
69        }
70    };
71
72    ($pred:expr, $fmt:expr, $($arg:tt)*) => {
73        if $pred {
74            return Err(ErrorCode::InvalidInputSyntax(
75                format!($fmt, $($arg)*)
76            ).into());
77        }
78    };
79}
80
81impl Binder {
82    pub(in crate::binder) fn bind_function(
83        &mut self,
84        Function {
85            scalar_as_agg,
86            name,
87            arg_list,
88            within_group,
89            filter,
90            over,
91        }: &Function,
92    ) -> Result<ExprImpl> {
93        let (schema_name, func_name) = match name.0.as_slice() {
94            [name] => (None, name.real_value()),
95            [schema, name] => {
96                let schema_name = schema.real_value();
97                let func_name = if schema_name == INFORMATION_SCHEMA_SCHEMA_NAME {
98                    // definition of information_schema: https://github.com/postgres/postgres/blob/e0b2eed047df9045664da6f724cb42c10f8b12f0/src/backend/catalog/information_schema.sql
99                    //
100                    // FIXME: handle schema correctly, so that the functions are hidden if the schema is not in the search path.
101                    let function_name = name.real_value();
102                    if function_name != "_pg_expandarray" {
103                        bail_not_implemented!(
104                            issue = 12422,
105                            "Unsupported function name under schema: {}",
106                            schema_name
107                        );
108                    }
109                    function_name
110                } else {
111                    name.real_value()
112                };
113                (Some(schema_name), func_name)
114            }
115            _ => bail_not_implemented!(issue = 112, "qualified function {}", name),
116        };
117
118        // FIXME: This is a hack to support [Bytebase queries](https://github.com/TennyZhuang/bytebase/blob/4a26f7c62b80e86e58ad2f77063138dc2f420623/backend/plugin/db/pg/sync.go#L549).
119        // Bytebase widely used the pattern like `obj_description(format('%s.%s',
120        // quote_ident(idx.schemaname), quote_ident(idx.indexname))::regclass) AS comment` to
121        // retrieve object comment, however we don't support casting a non-literal expression to
122        // regclass. We just hack the `obj_description` and `col_description` here, to disable it to
123        // bind its arguments.
124        if func_name == "obj_description" || func_name == "col_description" {
125            return Ok(ExprImpl::literal_varchar("".to_owned()));
126        }
127
128        // special binding logic for `array_transform` and `map_filter`
129        if func_name == "array_transform" || func_name == "map_filter" {
130            return self.validate_and_bind_special_function_params(
131                &func_name,
132                *scalar_as_agg,
133                arg_list,
134                within_group.as_deref(),
135                filter.as_deref(),
136                over.as_ref(),
137            );
138        }
139
140        let mut args: Vec<_> = arg_list
141            .args
142            .iter()
143            .map(|arg| self.bind_function_arg(arg))
144            .flatten_ok()
145            .try_collect()?;
146
147        let mut referred_udfs = HashSet::new();
148
149        let wrapped_agg_type = if *scalar_as_agg {
150            // Let's firstly try to apply the `AGGREGATE:` prefix.
151            // We will reject functions that are not able to be wrapped as aggregate function.
152            let mut array_args = args
153                .iter()
154                .enumerate()
155                .map(|(i, expr)| {
156                    InputRef::new(i, DataType::List(Box::new(expr.return_type()))).into()
157                })
158                .collect_vec();
159            let schema_path = self.bind_schema_path(schema_name.as_deref());
160            let scalar_func_expr = if let Ok((func, _)) = self.catalog.get_function_by_name_inputs(
161                &self.db_name,
162                schema_path,
163                &func_name,
164                &mut array_args,
165            ) {
166                // record the dependency upon the UDF
167                referred_udfs.insert(func.id);
168                self.check_privilege(
169                    ObjectCheckItem::new(
170                        func.owner,
171                        AclMode::Execute,
172                        func.name.clone(),
173                        PbObject::FunctionId(func.id.function_id()),
174                    ),
175                    self.database_id,
176                )?;
177
178                if !func.kind.is_scalar() {
179                    return Err(ErrorCode::InvalidInputSyntax(
180                        "expect a scalar function after `AGGREGATE:`".to_owned(),
181                    )
182                    .into());
183                }
184
185                if func.language == "sql" {
186                    self.bind_sql_udf(func.clone(), array_args)?
187                } else {
188                    UserDefinedFunction::new(func.clone(), array_args).into()
189                }
190            } else {
191                self.bind_builtin_scalar_function(&func_name, array_args, arg_list.variadic)?
192            };
193
194            // `AggType::WrapScalar` requires the inner expression to be convertible to `ExprNode`
195            // and directly executable. If there's any subquery in the expression, this will fail.
196            let expr_node = match scalar_func_expr.try_to_expr_proto() {
197                Ok(expr_node) => expr_node,
198                Err(e) => {
199                    return Err(ErrorCode::InvalidInputSyntax(format!(
200                        "function {func_name} cannot be used after `AGGREGATE:`: {e}",
201                    ))
202                    .into());
203                }
204            };
205
206            // now this is either an aggregate/window function call
207            Some(AggType::WrapScalar(expr_node))
208        } else {
209            None
210        };
211
212        let schema_path = self.bind_schema_path(schema_name.as_deref());
213        let udf = if wrapped_agg_type.is_none()
214            && let Ok((func, _)) = self.catalog.get_function_by_name_inputs(
215                &self.db_name,
216                schema_path,
217                &func_name,
218                &mut args,
219            ) {
220            // record the dependency upon the UDF
221            referred_udfs.insert(func.id);
222            self.check_privilege(
223                ObjectCheckItem::new(
224                    func.owner,
225                    AclMode::Execute,
226                    func.name.clone(),
227                    PbObject::FunctionId(func.id.function_id()),
228                ),
229                self.database_id,
230            )?;
231            Some(func.clone())
232        } else {
233            None
234        };
235
236        self.included_udfs.extend(referred_udfs);
237
238        let agg_type = if wrapped_agg_type.is_some() {
239            wrapped_agg_type
240        } else if let Some(ref udf) = udf
241            && udf.kind.is_aggregate()
242        {
243            assert_ne!(udf.language, "sql", "SQL UDAF is not supported yet");
244            Some(AggType::UserDefined(udf.as_ref().into()))
245        } else {
246            AggType::from_str(&func_name).ok()
247        };
248
249        // try to bind it as a window function call
250        if let Some(over) = over {
251            reject_syntax!(
252                arg_list.distinct,
253                "`DISTINCT` is not allowed in window function call"
254            );
255            reject_syntax!(
256                arg_list.variadic,
257                "`VARIADIC` is not allowed in window function call"
258            );
259            reject_syntax!(
260                !arg_list.order_by.is_empty(),
261                "`ORDER BY` is not allowed in window function call argument list"
262            );
263            reject_syntax!(
264                within_group.is_some(),
265                "`WITHIN GROUP` is not allowed in window function call"
266            );
267
268            let kind = if let Some(agg_type) = agg_type {
269                // aggregate as window function
270                WindowFuncKind::Aggregate(agg_type)
271            } else if let Ok(kind) = WindowFuncKind::from_str(&func_name) {
272                kind
273            } else {
274                bail_not_implemented!(issue = 8961, "Unrecognized window function: {}", func_name);
275            };
276            return self.bind_window_function(
277                kind,
278                args,
279                arg_list.ignore_nulls,
280                filter.as_deref(),
281                over,
282            );
283        }
284
285        // now it's an aggregate/scalar/table function call
286        reject_syntax!(
287            arg_list.ignore_nulls,
288            "`IGNORE NULLS` is not allowed in aggregate/scalar/table function call"
289        );
290
291        // try to bind it as an aggregate function call
292        if let Some(agg_type) = agg_type {
293            reject_syntax!(
294                arg_list.variadic,
295                "`VARIADIC` is not allowed in aggregate function call"
296            );
297            return self.bind_aggregate_function(
298                agg_type,
299                arg_list.distinct,
300                args,
301                &arg_list.order_by,
302                within_group.as_deref(),
303                filter.as_deref(),
304            );
305        }
306
307        // now it's a scalar/table function call
308        reject_syntax!(
309            arg_list.distinct,
310            "`DISTINCT` is not allowed in scalar/table function call"
311        );
312        reject_syntax!(
313            !arg_list.order_by.is_empty(),
314            "`ORDER BY` is not allowed in scalar/table function call"
315        );
316        reject_syntax!(
317            within_group.is_some(),
318            "`WITHIN GROUP` is not allowed in scalar/table function call"
319        );
320        reject_syntax!(
321            filter.is_some(),
322            "`FILTER` is not allowed in scalar/table function call"
323        );
324
325        // try to bind it as a table function call
326        {
327            // `file_scan` table function
328            if func_name.eq_ignore_ascii_case("file_scan") {
329                reject_syntax!(
330                    arg_list.variadic,
331                    "`VARIADIC` is not allowed in table function call"
332                );
333                self.ensure_table_function_allowed()?;
334                return Ok(TableFunction::new_file_scan(args)?.into());
335            }
336            // `postgres_query` table function
337            if func_name.eq("postgres_query") {
338                reject_syntax!(
339                    arg_list.variadic,
340                    "`VARIADIC` is not allowed in table function call"
341                );
342                self.ensure_table_function_allowed()?;
343                return Ok(TableFunction::new_postgres_query(
344                    &self.catalog,
345                    &self.db_name,
346                    self.bind_schema_path(schema_name.as_deref()),
347                    args,
348                )
349                .context("postgres_query error")?
350                .into());
351            }
352            // `mysql_query` table function
353            if func_name.eq("mysql_query") {
354                reject_syntax!(
355                    arg_list.variadic,
356                    "`VARIADIC` is not allowed in table function call"
357                );
358                self.ensure_table_function_allowed()?;
359                return Ok(TableFunction::new_mysql_query(
360                    &self.catalog,
361                    &self.db_name,
362                    self.bind_schema_path(schema_name.as_deref()),
363                    args,
364                )
365                .context("mysql_query error")?
366                .into());
367            }
368            // `internal_backfill_progress` table function
369            if func_name.eq("internal_backfill_progress") {
370                reject_syntax!(
371                    arg_list.variadic,
372                    "`VARIADIC` is not allowed in table function call"
373                );
374                self.ensure_table_function_allowed()?;
375                return Ok(TableFunction::new_internal_backfill_progress().into());
376            }
377            // `internal_source_backfill_progress` table function
378            if func_name.eq("internal_source_backfill_progress") {
379                reject_syntax!(
380                    arg_list.variadic,
381                    "`VARIADIC` is not allowed in table function call"
382                );
383                self.ensure_table_function_allowed()?;
384                return Ok(TableFunction::new_internal_source_backfill_progress().into());
385            }
386            // UDTF
387            if let Some(ref udf) = udf
388                && udf.kind.is_table()
389            {
390                reject_syntax!(
391                    arg_list.variadic,
392                    "`VARIADIC` is not allowed in table function call"
393                );
394                self.ensure_table_function_allowed()?;
395                if udf.language == "sql" {
396                    return self.bind_sql_udf(udf.clone(), args);
397                }
398                return Ok(TableFunction::new_user_defined(udf.clone(), args).into());
399            }
400            // builtin table function
401            if let Ok(function_type) = TableFunctionType::from_str(&func_name) {
402                reject_syntax!(
403                    arg_list.variadic,
404                    "`VARIADIC` is not allowed in table function call"
405                );
406                self.ensure_table_function_allowed()?;
407                return Ok(TableFunction::new(function_type, args)?.into());
408            }
409        }
410
411        // try to bind it as a scalar function call
412        if let Some(ref udf) = udf {
413            assert!(udf.kind.is_scalar());
414            reject_syntax!(
415                arg_list.variadic,
416                "`VARIADIC` is not allowed in user-defined function call"
417            );
418            if udf.language == "sql" {
419                return self.bind_sql_udf(udf.clone(), args);
420            }
421            return Ok(UserDefinedFunction::new(udf.clone(), args).into());
422        }
423
424        self.bind_builtin_scalar_function(&func_name, args, arg_list.variadic)
425    }
426
427    fn validate_and_bind_special_function_params(
428        &mut self,
429        func_name: &str,
430        scalar_as_agg: bool,
431        arg_list: &FunctionArgList,
432        within_group: Option<&OrderByExpr>,
433        filter: Option<&risingwave_sqlparser::ast::Expr>,
434        over: Option<&Window>,
435    ) -> Result<ExprImpl> {
436        assert!(["array_transform", "map_filter"].contains(&func_name));
437
438        reject_syntax!(
439            scalar_as_agg,
440            "`AGGREGATE:` prefix is not allowed for `{}`",
441            func_name
442        );
443        reject_syntax!(
444            !arg_list.is_args_only(),
445            "keywords like `DISTINCT`, `ORDER BY` are not allowed in `{}` argument list",
446            func_name
447        );
448        reject_syntax!(
449            within_group.is_some(),
450            "`WITHIN GROUP` is not allowed in `{}` call",
451            func_name
452        );
453        reject_syntax!(
454            filter.is_some(),
455            "`FILTER` is not allowed in `{}` call",
456            func_name
457        );
458        reject_syntax!(
459            over.is_some(),
460            "`OVER` is not allowed in `{}` call",
461            func_name
462        );
463        if func_name == "array_transform" {
464            self.bind_array_transform(&arg_list.args)
465        } else {
466            self.bind_map_filter(&arg_list.args)
467        }
468    }
469
470    fn bind_array_transform(&mut self, args: &[FunctionArg]) -> Result<ExprImpl> {
471        let [array, lambda] = args else {
472            return Err(ErrorCode::BindError(format!(
473                "`array_transform` expect two inputs `array` and `lambda`, but {} were given",
474                args.len()
475            ))
476            .into());
477        };
478
479        let bound_array = self.bind_function_arg(array)?;
480        let [bound_array] = <[ExprImpl; 1]>::try_from(bound_array).map_err(|bound_array| -> RwError {
481            ErrorCode::BindError(format!("The `array` argument for `array_transform` should be bound to one argument, but {} were got", bound_array.len()))
482                .into()
483        })?;
484
485        let inner_ty = match bound_array.return_type() {
486            DataType::List(ty) => *ty,
487            real_type => return Err(ErrorCode::BindError(format!(
488                "The `array` argument for `array_transform` should be an array, but {} were got",
489                real_type
490            ))
491            .into()),
492        };
493
494        let ast::FunctionArgExpr::Expr(ast::Expr::LambdaFunction {
495            args: lambda_args,
496            body: lambda_body,
497        }) = lambda.get_expr()
498        else {
499            return Err(ErrorCode::BindError(
500                "The `lambda` argument for `array_transform` should be a lambda function"
501                    .to_owned(),
502            )
503            .into());
504        };
505
506        let [lambda_arg] = <[Ident; 1]>::try_from(lambda_args).map_err(|args| -> RwError {
507            ErrorCode::BindError(format!(
508                "The `lambda` argument for `array_transform` should be a lambda function with one argument, but {} were given",
509                args.len()
510            ))
511            .into()
512        })?;
513
514        let bound_lambda = self.bind_unary_lambda_function(inner_ty, lambda_arg, *lambda_body)?;
515
516        let lambda_ret_type = bound_lambda.return_type();
517        let transform_ret_type = DataType::List(Box::new(lambda_ret_type));
518
519        Ok(ExprImpl::FunctionCallWithLambda(Box::new(
520            FunctionCallWithLambda::new_unchecked(
521                ExprType::ArrayTransform,
522                vec![bound_array],
523                bound_lambda,
524                transform_ret_type,
525            ),
526        )))
527    }
528
529    fn bind_unary_lambda_function(
530        &mut self,
531        input_ty: DataType,
532        arg: Ident,
533        body: ast::Expr,
534    ) -> Result<ExprImpl> {
535        let lambda_args = HashMap::from([(arg.real_value(), (0usize, input_ty))]);
536        let orig_lambda_args = self.context.lambda_args.replace(lambda_args);
537        let body = self.bind_expr_inner(&body)?;
538        self.context.lambda_args = orig_lambda_args;
539
540        Ok(body)
541    }
542
543    fn bind_map_filter(&mut self, args: &[FunctionArg]) -> Result<ExprImpl> {
544        let [input, lambda] = args else {
545            return Err(ErrorCode::BindError(format!(
546                "`map_filter` requires two arguments (input_map and lambda), got {}",
547                args.len()
548            ))
549            .into());
550        };
551
552        let bound_input = self.bind_function_arg(input)?;
553        let [bound_input] = <[ExprImpl; 1]>::try_from(bound_input).map_err(|e| {
554            ErrorCode::BindError(format!(
555                "Input argument should resolve to single expression, got {}",
556                e.len()
557            ))
558        })?;
559
560        let (key_type, value_type) = match bound_input.return_type() {
561            DataType::Map(map_type) => (map_type.key().clone(), map_type.value().clone()),
562            t => {
563                return Err(
564                    ErrorCode::BindError(format!("Input must be Map type, got {}", t)).into(),
565                );
566            }
567        };
568
569        let ast::FunctionArgExpr::Expr(ast::Expr::LambdaFunction {
570            args: lambda_args,
571            body: lambda_body,
572        }) = lambda.get_expr()
573        else {
574            return Err(ErrorCode::BindError(
575                "Second argument must be a lambda function".to_owned(),
576            )
577            .into());
578        };
579
580        let [key_arg, value_arg] = <[Ident; 2]>::try_from(lambda_args).map_err(|args| {
581            ErrorCode::BindError(format!(
582                "Lambda must have exactly two parameters (key, value), got {}",
583                args.len()
584            ))
585        })?;
586
587        let bound_lambda = self.bind_binary_lambda_function(
588            key_arg,
589            key_type.clone(),
590            value_arg,
591            value_type.clone(),
592            *lambda_body,
593        )?;
594
595        let lambda_ret_type = bound_lambda.return_type();
596        if lambda_ret_type != DataType::Boolean {
597            return Err(ErrorCode::BindError(format!(
598                "Lambda must return Boolean type, got {}",
599                lambda_ret_type
600            ))
601            .into());
602        }
603
604        let map_type = MapType::from_kv(key_type, value_type);
605        let return_type = DataType::Map(map_type);
606
607        Ok(ExprImpl::FunctionCallWithLambda(Box::new(
608            FunctionCallWithLambda::new_unchecked(
609                ExprType::MapFilter,
610                vec![bound_input],
611                bound_lambda,
612                return_type,
613            ),
614        )))
615    }
616
617    fn bind_binary_lambda_function(
618        &mut self,
619        first_arg: Ident,
620        first_ty: DataType,
621        second_arg: Ident,
622        second_ty: DataType,
623        body: ast::Expr,
624    ) -> Result<ExprImpl> {
625        let lambda_args = HashMap::from([
626            (first_arg.real_value(), (0usize, first_ty)),
627            (second_arg.real_value(), (1usize, second_ty)),
628        ]);
629
630        let orig_ctx = self.context.lambda_args.replace(lambda_args);
631        let bound_body = self.bind_expr_inner(&body)?;
632        self.context.lambda_args = orig_ctx;
633
634        Ok(bound_body)
635    }
636
637    fn ensure_table_function_allowed(&self) -> Result<()> {
638        if let Some(clause) = self.context.clause {
639            match clause {
640                Clause::JoinOn
641                | Clause::Where
642                | Clause::Having
643                | Clause::Filter
644                | Clause::Values
645                | Clause::Insert
646                | Clause::GeneratedColumn => {
647                    return Err(ErrorCode::InvalidInputSyntax(format!(
648                        "table functions are not allowed in {}",
649                        clause
650                    ))
651                    .into());
652                }
653                Clause::GroupBy | Clause::From => {}
654            }
655        }
656        Ok(())
657    }
658
659    /// A common utility function to extract sql udf expression out from the input `ast`.
660    pub(crate) fn extract_udf_expr(ast: Vec<Statement>) -> Result<AstExpr> {
661        if ast.len() != 1 {
662            return Err(ErrorCode::InvalidInputSyntax(
663                "the query for sql udf should contain only one statement".to_owned(),
664            )
665            .into());
666        }
667
668        // Extract the expression out
669        let Statement::Query(query) = ast.into_iter().next().unwrap() else {
670            return Err(ErrorCode::InvalidInputSyntax(
671                "invalid function definition, please recheck the syntax".to_owned(),
672            )
673            .into());
674        };
675
676        if let Some(expr) = query.as_single_select_item() {
677            // Inline SQL UDF.
678            Ok(expr.clone())
679        } else {
680            // Subquery SQL UDF.
681            Ok(AstExpr::Subquery(query))
682        }
683    }
684
685    pub fn bind_sql_udf_inner(
686        &mut self,
687        body: &str,
688        arg_names: &[String],
689        args: Vec<ExprImpl>,
690    ) -> Result<ExprImpl> {
691        // This represents the current user defined function is `language sql`
692        let ast = Parser::parse_sql(body)?;
693
694        // Stash the current arguments.
695        // For subquery SQL UDF, as we always push a new context, there should be no arguments to stash.
696        // For inline SQL UDF, we need to stash the arguments in case of nesting.
697        let stashed_arguments = self.context.sql_udf_arguments.take();
698
699        // The actual inline logic for sql udf.
700        let mut arguments = HashMap::new();
701        for (i, arg) in args.into_iter().enumerate() {
702            if arg_names[i].is_empty() {
703                // unnamed argument, use `$1`, `$2` as the name
704                arguments.insert(format!("${}", i + 1), arg);
705            } else {
706                // named argument
707                arguments.insert(arg_names[i].clone(), arg);
708            }
709        }
710        self.context.sql_udf_arguments = Some(arguments);
711
712        let Ok(expr) = Self::extract_udf_expr(ast) else {
713            return Err(ErrorCode::InvalidInputSyntax(
714                "failed to parse the input query and extract the udf expression, \
715                please recheck the syntax"
716                    .to_owned(),
717            )
718            .into());
719        };
720
721        let bind_result = self.bind_expr(&expr);
722        // Restore arguments information for subsequent binding.
723        self.context.sql_udf_arguments = stashed_arguments;
724
725        bind_result
726    }
727
728    fn bind_sql_udf(
729        &mut self,
730        func: Arc<FunctionCatalog>,
731        args: Vec<ExprImpl>,
732    ) -> Result<ExprImpl> {
733        let Some(body) = &func.body else {
734            return Err(
735                ErrorCode::InvalidInputSyntax("`body` must exist for sql udf".to_owned()).into(),
736            );
737        };
738
739        self.bind_sql_udf_inner(body, &func.arg_names, args)
740    }
741
742    pub(in crate::binder) fn bind_function_expr_arg(
743        &mut self,
744        arg_expr: &FunctionArgExpr,
745    ) -> Result<Vec<ExprImpl>> {
746        match arg_expr {
747            FunctionArgExpr::Expr(expr) => Ok(vec![self.bind_expr_inner(expr)?]),
748            FunctionArgExpr::QualifiedWildcard(_, _)
749            | FunctionArgExpr::ExprQualifiedWildcard(_, _) => Err(ErrorCode::InvalidInputSyntax(
750                format!("unexpected wildcard {}", arg_expr),
751            )
752            .into()),
753            FunctionArgExpr::Wildcard(None) => Ok(vec![]),
754            FunctionArgExpr::Wildcard(Some(_)) => unreachable!(),
755        }
756    }
757
758    pub(in crate::binder) fn bind_function_arg(
759        &mut self,
760        arg: &FunctionArg,
761    ) -> Result<Vec<ExprImpl>> {
762        match arg {
763            FunctionArg::Unnamed(expr) => self.bind_function_expr_arg(expr),
764            FunctionArg::Named { .. } => todo!(),
765        }
766    }
767}