risingwave_frontend/binder/expr/function/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::str::FromStr;
17use std::sync::Arc;
18
19use anyhow::Context;
20use itertools::Itertools;
21use risingwave_common::acl::AclMode;
22use risingwave_common::bail_not_implemented;
23use risingwave_common::catalog::INFORMATION_SCHEMA_SCHEMA_NAME;
24use risingwave_common::types::{DataType, MapType};
25use risingwave_expr::aggregate::AggType;
26use risingwave_expr::window_function::WindowFuncKind;
27use risingwave_pb::user::grant_privilege::PbObject;
28use risingwave_sqlparser::ast::{
29    self, Expr as AstExpr, Function, FunctionArg, FunctionArgExpr, FunctionArgList, Ident,
30    OrderByExpr, Statement, Window,
31};
32use risingwave_sqlparser::parser::Parser;
33
34use crate::binder::Binder;
35use crate::binder::bind_context::Clause;
36use crate::catalog::function_catalog::FunctionCatalog;
37use crate::error::{ErrorCode, Result, RwError};
38use crate::expr::{
39    Expr, ExprImpl, ExprType, FunctionCallWithLambda, InputRef, TableFunction, TableFunctionType,
40    UserDefinedFunction,
41};
42use crate::handler::privilege::ObjectCheckItem;
43
44mod aggregate;
45mod builtin_scalar;
46mod window;
47
48// Defines system functions that without args, ref: https://www.postgresql.org/docs/current/functions-info.html
49const SYS_FUNCTION_WITHOUT_ARGS: &[&str] = &[
50    "session_user",
51    "user",
52    "current_user",
53    "current_role",
54    "current_catalog",
55    "current_schema",
56    "current_timestamp",
57];
58
59pub(super) fn is_sys_function_without_args(ident: &Ident) -> bool {
60    SYS_FUNCTION_WITHOUT_ARGS
61        .iter()
62        .any(|e| ident.real_value().as_str() == *e && ident.quote_style().is_none())
63}
64
65macro_rules! reject_syntax {
66    ($pred:expr, $msg:expr) => {
67        if $pred {
68            return Err(ErrorCode::InvalidInputSyntax($msg.to_string()).into());
69        }
70    };
71
72    ($pred:expr, $fmt:expr, $($arg:tt)*) => {
73        if $pred {
74            return Err(ErrorCode::InvalidInputSyntax(
75                format!($fmt, $($arg)*)
76            ).into());
77        }
78    };
79}
80
81impl Binder {
82    pub(in crate::binder) fn bind_function(
83        &mut self,
84        Function {
85            scalar_as_agg,
86            name,
87            arg_list,
88            within_group,
89            filter,
90            over,
91        }: &Function,
92    ) -> Result<ExprImpl> {
93        let (schema_name, func_name) = match name.0.as_slice() {
94            [name] => (None, name.real_value()),
95            [schema, name] => {
96                let schema_name = schema.real_value();
97                let func_name = if schema_name == INFORMATION_SCHEMA_SCHEMA_NAME {
98                    // definition of information_schema: https://github.com/postgres/postgres/blob/e0b2eed047df9045664da6f724cb42c10f8b12f0/src/backend/catalog/information_schema.sql
99                    //
100                    // FIXME: handle schema correctly, so that the functions are hidden if the schema is not in the search path.
101                    let function_name = name.real_value();
102                    if function_name != "_pg_expandarray" {
103                        bail_not_implemented!(
104                            issue = 12422,
105                            "Unsupported function name under schema: {}",
106                            schema_name
107                        );
108                    }
109                    function_name
110                } else {
111                    name.real_value()
112                };
113                (Some(schema_name), func_name)
114            }
115            [database, schema, name] => {
116                // Support database.schema.function qualified names when database matches current database
117                let database_name = database.real_value();
118                if database_name != self.db_name {
119                    return Err(ErrorCode::BindError(format!(
120                        "Cross-database function call is not supported: {}",
121                        name
122                    ))
123                    .into());
124                }
125                let schema_name = schema.real_value();
126                let func_name = name.real_value();
127                (Some(schema_name), func_name)
128            }
129            _ => bail_not_implemented!(issue = 112, "qualified function {}", name),
130        };
131
132        // FIXME: This is a hack to support [Bytebase queries](https://github.com/TennyZhuang/bytebase/blob/4a26f7c62b80e86e58ad2f77063138dc2f420623/backend/plugin/db/pg/sync.go#L549).
133        // Bytebase widely used the pattern like `obj_description(format('%s.%s',
134        // quote_ident(idx.schemaname), quote_ident(idx.indexname))::regclass) AS comment` to
135        // retrieve object comment, however we don't support casting a non-literal expression to
136        // regclass. We just hack the `obj_description` and `col_description` here, to disable it to
137        // bind its arguments.
138        if func_name == "obj_description" || func_name == "col_description" {
139            return Ok(ExprImpl::literal_varchar("".to_owned()));
140        }
141
142        // special binding logic for `array_transform` and `map_filter`
143        if func_name == "array_transform" || func_name == "map_filter" {
144            return self.validate_and_bind_special_function_params(
145                &func_name,
146                *scalar_as_agg,
147                arg_list,
148                within_group.as_deref(),
149                filter.as_deref(),
150                over.as_ref(),
151            );
152        }
153
154        let mut args: Vec<_> = arg_list
155            .args
156            .iter()
157            .map(|arg| self.bind_function_arg(arg))
158            .flatten_ok()
159            .try_collect()?;
160
161        let mut referred_udfs = HashSet::new();
162
163        let wrapped_agg_type = if *scalar_as_agg {
164            // Let's firstly try to apply the `AGGREGATE:` prefix.
165            // We will reject functions that are not able to be wrapped as aggregate function.
166            let mut array_args = args
167                .iter()
168                .enumerate()
169                .map(|(i, expr)| InputRef::new(i, DataType::list(expr.return_type())).into())
170                .collect_vec();
171            let schema_path = self.bind_schema_path(schema_name.as_deref());
172            let scalar_func_expr = if let Ok((func, _)) = self.catalog.get_function_by_name_inputs(
173                &self.db_name,
174                schema_path,
175                &func_name,
176                &mut array_args,
177            ) {
178                // record the dependency upon the UDF
179                referred_udfs.insert(func.id);
180                self.check_privilege(
181                    ObjectCheckItem::new(
182                        func.owner,
183                        AclMode::Execute,
184                        func.name.clone(),
185                        PbObject::FunctionId(func.id.function_id()),
186                    ),
187                    self.database_id,
188                )?;
189
190                if !func.kind.is_scalar() {
191                    return Err(ErrorCode::InvalidInputSyntax(
192                        "expect a scalar function after `AGGREGATE:`".to_owned(),
193                    )
194                    .into());
195                }
196
197                if func.language == "sql" {
198                    self.bind_sql_udf(func.clone(), array_args)?
199                } else {
200                    UserDefinedFunction::new(func.clone(), array_args).into()
201                }
202            } else {
203                self.bind_builtin_scalar_function(&func_name, array_args, arg_list.variadic)?
204            };
205
206            // `AggType::WrapScalar` requires the inner expression to be convertible to `ExprNode`
207            // and directly executable. If there's any subquery in the expression, this will fail.
208            let expr_node = match scalar_func_expr.try_to_expr_proto() {
209                Ok(expr_node) => expr_node,
210                Err(e) => {
211                    return Err(ErrorCode::InvalidInputSyntax(format!(
212                        "function {func_name} cannot be used after `AGGREGATE:`: {e}",
213                    ))
214                    .into());
215                }
216            };
217
218            // now this is either an aggregate/window function call
219            Some(AggType::WrapScalar(expr_node))
220        } else {
221            None
222        };
223
224        let schema_path = self.bind_schema_path(schema_name.as_deref());
225        let udf = if wrapped_agg_type.is_none()
226            && let Ok((func, _)) = self.catalog.get_function_by_name_inputs(
227                &self.db_name,
228                schema_path,
229                &func_name,
230                &mut args,
231            ) {
232            // record the dependency upon the UDF
233            referred_udfs.insert(func.id);
234            self.check_privilege(
235                ObjectCheckItem::new(
236                    func.owner,
237                    AclMode::Execute,
238                    func.name.clone(),
239                    PbObject::FunctionId(func.id.function_id()),
240                ),
241                self.database_id,
242            )?;
243            Some(func.clone())
244        } else {
245            None
246        };
247
248        self.included_udfs.extend(referred_udfs);
249
250        let agg_type = if wrapped_agg_type.is_some() {
251            wrapped_agg_type
252        } else if let Some(ref udf) = udf
253            && udf.kind.is_aggregate()
254        {
255            assert_ne!(udf.language, "sql", "SQL UDAF is not supported yet");
256            Some(AggType::UserDefined(udf.as_ref().into()))
257        } else {
258            AggType::from_str(&func_name).ok()
259        };
260
261        // try to bind it as a window function call
262        if let Some(over) = over {
263            reject_syntax!(
264                arg_list.distinct,
265                "`DISTINCT` is not allowed in window function call"
266            );
267            reject_syntax!(
268                arg_list.variadic,
269                "`VARIADIC` is not allowed in window function call"
270            );
271            reject_syntax!(
272                !arg_list.order_by.is_empty(),
273                "`ORDER BY` is not allowed in window function call argument list"
274            );
275            reject_syntax!(
276                within_group.is_some(),
277                "`WITHIN GROUP` is not allowed in window function call"
278            );
279
280            let kind = if let Some(agg_type) = agg_type {
281                // aggregate as window function
282                WindowFuncKind::Aggregate(agg_type)
283            } else if let Ok(kind) = WindowFuncKind::from_str(&func_name) {
284                kind
285            } else {
286                bail_not_implemented!(issue = 8961, "Unrecognized window function: {}", func_name);
287            };
288            return self.bind_window_function(
289                kind,
290                args,
291                arg_list.ignore_nulls,
292                filter.as_deref(),
293                over,
294            );
295        }
296
297        // now it's an aggregate/scalar/table function call
298        reject_syntax!(
299            arg_list.ignore_nulls,
300            "`IGNORE NULLS` is not allowed in aggregate/scalar/table function call"
301        );
302
303        // try to bind it as an aggregate function call
304        if let Some(agg_type) = agg_type {
305            reject_syntax!(
306                arg_list.variadic,
307                "`VARIADIC` is not allowed in aggregate function call"
308            );
309            return self.bind_aggregate_function(
310                agg_type,
311                arg_list.distinct,
312                args,
313                &arg_list.order_by,
314                within_group.as_deref(),
315                filter.as_deref(),
316            );
317        }
318
319        // now it's a scalar/table function call
320        reject_syntax!(
321            arg_list.distinct,
322            "`DISTINCT` is not allowed in scalar/table function call"
323        );
324        reject_syntax!(
325            !arg_list.order_by.is_empty(),
326            "`ORDER BY` is not allowed in scalar/table function call"
327        );
328        reject_syntax!(
329            within_group.is_some(),
330            "`WITHIN GROUP` is not allowed in scalar/table function call"
331        );
332        reject_syntax!(
333            filter.is_some(),
334            "`FILTER` is not allowed in scalar/table function call"
335        );
336
337        // try to bind it as a table function call
338        {
339            // `file_scan` table function
340            if func_name.eq_ignore_ascii_case("file_scan") {
341                reject_syntax!(
342                    arg_list.variadic,
343                    "`VARIADIC` is not allowed in table function call"
344                );
345                self.ensure_table_function_allowed()?;
346                return Ok(TableFunction::new_file_scan(args)?.into());
347            }
348            // `postgres_query` table function
349            if func_name.eq("postgres_query") {
350                reject_syntax!(
351                    arg_list.variadic,
352                    "`VARIADIC` is not allowed in table function call"
353                );
354                self.ensure_table_function_allowed()?;
355                return Ok(TableFunction::new_postgres_query(
356                    &self.catalog,
357                    &self.db_name,
358                    self.bind_schema_path(schema_name.as_deref()),
359                    args,
360                )
361                .context("postgres_query error")?
362                .into());
363            }
364            // `mysql_query` table function
365            if func_name.eq("mysql_query") {
366                reject_syntax!(
367                    arg_list.variadic,
368                    "`VARIADIC` is not allowed in table function call"
369                );
370                self.ensure_table_function_allowed()?;
371                return Ok(TableFunction::new_mysql_query(
372                    &self.catalog,
373                    &self.db_name,
374                    self.bind_schema_path(schema_name.as_deref()),
375                    args,
376                )
377                .context("mysql_query error")?
378                .into());
379            }
380            // `internal_backfill_progress` table function
381            if func_name.eq("internal_backfill_progress") {
382                reject_syntax!(
383                    arg_list.variadic,
384                    "`VARIADIC` is not allowed in table function call"
385                );
386                self.ensure_table_function_allowed()?;
387                return Ok(TableFunction::new_internal_backfill_progress().into());
388            }
389            // `internal_source_backfill_progress` table function
390            if func_name.eq("internal_source_backfill_progress") {
391                reject_syntax!(
392                    arg_list.variadic,
393                    "`VARIADIC` is not allowed in table function call"
394                );
395                self.ensure_table_function_allowed()?;
396                return Ok(TableFunction::new_internal_source_backfill_progress().into());
397            }
398            // `internal_get_channel_delta_stats` table function
399            if func_name.eq("internal_get_channel_delta_stats") {
400                reject_syntax!(
401                    arg_list.variadic,
402                    "`VARIADIC` is not allowed in table function call"
403                );
404                self.ensure_table_function_allowed()?;
405
406                return Ok(TableFunction::new_internal_get_channel_delta_stats(args).into());
407            }
408            // UDTF
409            if let Some(ref udf) = udf
410                && udf.kind.is_table()
411            {
412                reject_syntax!(
413                    arg_list.variadic,
414                    "`VARIADIC` is not allowed in table function call"
415                );
416                self.ensure_table_function_allowed()?;
417                if udf.language == "sql" {
418                    return self.bind_sql_udf(udf.clone(), args);
419                }
420                return Ok(TableFunction::new_user_defined(udf.clone(), args).into());
421            }
422            // builtin table function
423            if let Ok(function_type) = TableFunctionType::from_str(&func_name) {
424                reject_syntax!(
425                    arg_list.variadic,
426                    "`VARIADIC` is not allowed in table function call"
427                );
428                self.ensure_table_function_allowed()?;
429                return Ok(TableFunction::new(function_type, args)?.into());
430            }
431        }
432
433        // try to bind it as a scalar function call
434        if let Some(ref udf) = udf {
435            assert!(udf.kind.is_scalar());
436            reject_syntax!(
437                arg_list.variadic,
438                "`VARIADIC` is not allowed in user-defined function call"
439            );
440            if udf.language == "sql" {
441                return self.bind_sql_udf(udf.clone(), args);
442            }
443            return Ok(UserDefinedFunction::new(udf.clone(), args).into());
444        }
445
446        self.bind_builtin_scalar_function(&func_name, args, arg_list.variadic)
447    }
448
449    fn validate_and_bind_special_function_params(
450        &mut self,
451        func_name: &str,
452        scalar_as_agg: bool,
453        arg_list: &FunctionArgList,
454        within_group: Option<&OrderByExpr>,
455        filter: Option<&risingwave_sqlparser::ast::Expr>,
456        over: Option<&Window>,
457    ) -> Result<ExprImpl> {
458        assert!(["array_transform", "map_filter"].contains(&func_name));
459
460        reject_syntax!(
461            scalar_as_agg,
462            "`AGGREGATE:` prefix is not allowed for `{}`",
463            func_name
464        );
465        reject_syntax!(
466            !arg_list.is_args_only(),
467            "keywords like `DISTINCT`, `ORDER BY` are not allowed in `{}` argument list",
468            func_name
469        );
470        reject_syntax!(
471            within_group.is_some(),
472            "`WITHIN GROUP` is not allowed in `{}` call",
473            func_name
474        );
475        reject_syntax!(
476            filter.is_some(),
477            "`FILTER` is not allowed in `{}` call",
478            func_name
479        );
480        reject_syntax!(
481            over.is_some(),
482            "`OVER` is not allowed in `{}` call",
483            func_name
484        );
485        if func_name == "array_transform" {
486            self.bind_array_transform(&arg_list.args)
487        } else {
488            self.bind_map_filter(&arg_list.args)
489        }
490    }
491
492    fn bind_array_transform(&mut self, args: &[FunctionArg]) -> Result<ExprImpl> {
493        let [array, lambda] = args else {
494            return Err(ErrorCode::BindError(format!(
495                "`array_transform` expect two inputs `array` and `lambda`, but {} were given",
496                args.len()
497            ))
498            .into());
499        };
500
501        let bound_array = self.bind_function_arg(array)?;
502        let [bound_array] = <[ExprImpl; 1]>::try_from(bound_array).map_err(|bound_array| -> RwError {
503            ErrorCode::BindError(format!("The `array` argument for `array_transform` should be bound to one argument, but {} were got", bound_array.len()))
504                .into()
505        })?;
506
507        let inner_ty = match bound_array.return_type() {
508            DataType::List(ty) => ty.into_elem(),
509            real_type => return Err(ErrorCode::BindError(format!(
510                "The `array` argument for `array_transform` should be an array, but {} were got",
511                real_type
512            ))
513            .into()),
514        };
515
516        let ast::FunctionArgExpr::Expr(ast::Expr::LambdaFunction {
517            args: lambda_args,
518            body: lambda_body,
519        }) = lambda.get_expr()
520        else {
521            return Err(ErrorCode::BindError(
522                "The `lambda` argument for `array_transform` should be a lambda function"
523                    .to_owned(),
524            )
525            .into());
526        };
527
528        let [lambda_arg] = <[Ident; 1]>::try_from(lambda_args).map_err(|args| -> RwError {
529            ErrorCode::BindError(format!(
530                "The `lambda` argument for `array_transform` should be a lambda function with one argument, but {} were given",
531                args.len()
532            ))
533            .into()
534        })?;
535
536        let bound_lambda = self.bind_unary_lambda_function(inner_ty, lambda_arg, *lambda_body)?;
537
538        let lambda_ret_type = bound_lambda.return_type();
539        let transform_ret_type = DataType::list(lambda_ret_type);
540
541        Ok(ExprImpl::FunctionCallWithLambda(Box::new(
542            FunctionCallWithLambda::new_unchecked(
543                ExprType::ArrayTransform,
544                vec![bound_array],
545                bound_lambda,
546                transform_ret_type,
547            ),
548        )))
549    }
550
551    fn bind_unary_lambda_function(
552        &mut self,
553        input_ty: DataType,
554        arg: Ident,
555        body: ast::Expr,
556    ) -> Result<ExprImpl> {
557        let lambda_args = HashMap::from([(arg.real_value(), (0usize, input_ty))]);
558        let orig_lambda_args = self.context.lambda_args.replace(lambda_args);
559        let body = self.bind_expr_inner(&body)?;
560        self.context.lambda_args = orig_lambda_args;
561
562        Ok(body)
563    }
564
565    fn bind_map_filter(&mut self, args: &[FunctionArg]) -> Result<ExprImpl> {
566        let [input, lambda] = args else {
567            return Err(ErrorCode::BindError(format!(
568                "`map_filter` requires two arguments (input_map and lambda), got {}",
569                args.len()
570            ))
571            .into());
572        };
573
574        let bound_input = self.bind_function_arg(input)?;
575        let [bound_input] = <[ExprImpl; 1]>::try_from(bound_input).map_err(|e| {
576            ErrorCode::BindError(format!(
577                "Input argument should resolve to single expression, got {}",
578                e.len()
579            ))
580        })?;
581
582        let (key_type, value_type) = match bound_input.return_type() {
583            DataType::Map(map_type) => (map_type.key().clone(), map_type.value().clone()),
584            t => {
585                return Err(
586                    ErrorCode::BindError(format!("Input must be Map type, got {}", t)).into(),
587                );
588            }
589        };
590
591        let ast::FunctionArgExpr::Expr(ast::Expr::LambdaFunction {
592            args: lambda_args,
593            body: lambda_body,
594        }) = lambda.get_expr()
595        else {
596            return Err(ErrorCode::BindError(
597                "Second argument must be a lambda function".to_owned(),
598            )
599            .into());
600        };
601
602        let [key_arg, value_arg] = <[Ident; 2]>::try_from(lambda_args).map_err(|args| {
603            ErrorCode::BindError(format!(
604                "Lambda must have exactly two parameters (key, value), got {}",
605                args.len()
606            ))
607        })?;
608
609        let bound_lambda = self.bind_binary_lambda_function(
610            key_arg,
611            key_type.clone(),
612            value_arg,
613            value_type.clone(),
614            *lambda_body,
615        )?;
616
617        let lambda_ret_type = bound_lambda.return_type();
618        if lambda_ret_type != DataType::Boolean {
619            return Err(ErrorCode::BindError(format!(
620                "Lambda must return Boolean type, got {}",
621                lambda_ret_type
622            ))
623            .into());
624        }
625
626        let map_type = MapType::from_kv(key_type, value_type);
627        let return_type = DataType::Map(map_type);
628
629        Ok(ExprImpl::FunctionCallWithLambda(Box::new(
630            FunctionCallWithLambda::new_unchecked(
631                ExprType::MapFilter,
632                vec![bound_input],
633                bound_lambda,
634                return_type,
635            ),
636        )))
637    }
638
639    fn bind_binary_lambda_function(
640        &mut self,
641        first_arg: Ident,
642        first_ty: DataType,
643        second_arg: Ident,
644        second_ty: DataType,
645        body: ast::Expr,
646    ) -> Result<ExprImpl> {
647        let lambda_args = HashMap::from([
648            (first_arg.real_value(), (0usize, first_ty)),
649            (second_arg.real_value(), (1usize, second_ty)),
650        ]);
651
652        let orig_ctx = self.context.lambda_args.replace(lambda_args);
653        let bound_body = self.bind_expr_inner(&body)?;
654        self.context.lambda_args = orig_ctx;
655
656        Ok(bound_body)
657    }
658
659    fn ensure_table_function_allowed(&self) -> Result<()> {
660        if let Some(clause) = self.context.clause {
661            match clause {
662                Clause::JoinOn
663                | Clause::Where
664                | Clause::Having
665                | Clause::Filter
666                | Clause::Values
667                | Clause::Insert
668                | Clause::GeneratedColumn => {
669                    return Err(ErrorCode::InvalidInputSyntax(format!(
670                        "table functions are not allowed in {}",
671                        clause
672                    ))
673                    .into());
674                }
675                Clause::GroupBy | Clause::From => {}
676            }
677        }
678        Ok(())
679    }
680
681    /// A common utility function to extract sql udf expression out from the input `ast`.
682    pub(crate) fn extract_udf_expr(ast: Vec<Statement>) -> Result<AstExpr> {
683        if ast.len() != 1 {
684            return Err(ErrorCode::InvalidInputSyntax(
685                "the query for sql udf should contain only one statement".to_owned(),
686            )
687            .into());
688        }
689
690        // Extract the expression out
691        let Statement::Query(query) = ast.into_iter().next().unwrap() else {
692            return Err(ErrorCode::InvalidInputSyntax(
693                "invalid function definition, please recheck the syntax".to_owned(),
694            )
695            .into());
696        };
697
698        if let Some(expr) = query.as_single_select_item() {
699            // Inline SQL UDF.
700            Ok(expr.clone())
701        } else {
702            // Subquery SQL UDF.
703            Ok(AstExpr::Subquery(query))
704        }
705    }
706
707    pub fn bind_sql_udf_inner(
708        &mut self,
709        body: &str,
710        arg_names: &[String],
711        args: Vec<ExprImpl>,
712    ) -> Result<ExprImpl> {
713        // This represents the current user defined function is `language sql`
714        let ast = Parser::parse_sql(body)?;
715
716        // Stash the current arguments.
717        // For subquery SQL UDF, as we always push a new context, there should be no arguments to stash.
718        // For inline SQL UDF, we need to stash the arguments in case of nesting.
719        let stashed_arguments = self.context.sql_udf_arguments.take();
720
721        // The actual inline logic for sql udf.
722        let mut arguments = HashMap::new();
723        for (i, arg) in args.into_iter().enumerate() {
724            if arg_names[i].is_empty() {
725                // unnamed argument, use `$1`, `$2` as the name
726                arguments.insert(format!("${}", i + 1), arg);
727            } else {
728                // named argument
729                arguments.insert(arg_names[i].clone(), arg);
730            }
731        }
732        self.context.sql_udf_arguments = Some(arguments);
733
734        let Ok(expr) = Self::extract_udf_expr(ast) else {
735            return Err(ErrorCode::InvalidInputSyntax(
736                "failed to parse the input query and extract the udf expression, \
737                please recheck the syntax"
738                    .to_owned(),
739            )
740            .into());
741        };
742
743        let bind_result = self.bind_expr(&expr);
744        // Restore arguments information for subsequent binding.
745        self.context.sql_udf_arguments = stashed_arguments;
746
747        bind_result
748    }
749
750    fn bind_sql_udf(
751        &mut self,
752        func: Arc<FunctionCatalog>,
753        args: Vec<ExprImpl>,
754    ) -> Result<ExprImpl> {
755        let Some(body) = &func.body else {
756            return Err(
757                ErrorCode::InvalidInputSyntax("`body` must exist for sql udf".to_owned()).into(),
758            );
759        };
760
761        self.bind_sql_udf_inner(body, &func.arg_names, args)
762    }
763
764    pub(in crate::binder) fn bind_function_expr_arg(
765        &mut self,
766        arg_expr: &FunctionArgExpr,
767    ) -> Result<Vec<ExprImpl>> {
768        match arg_expr {
769            FunctionArgExpr::Expr(expr) => Ok(vec![self.bind_expr_inner(expr)?]),
770            FunctionArgExpr::QualifiedWildcard(_, _)
771            | FunctionArgExpr::ExprQualifiedWildcard(_, _) => Err(ErrorCode::InvalidInputSyntax(
772                format!("unexpected wildcard {}", arg_expr),
773            )
774            .into()),
775            FunctionArgExpr::Wildcard(None) => Ok(vec![]),
776            FunctionArgExpr::Wildcard(Some(_)) => unreachable!(),
777        }
778    }
779
780    pub(in crate::binder) fn bind_function_arg(
781        &mut self,
782        arg: &FunctionArg,
783    ) -> Result<Vec<ExprImpl>> {
784        match arg {
785            FunctionArg::Unnamed(expr) => self.bind_function_expr_arg(expr),
786            FunctionArg::Named { .. } => todo!(),
787        }
788    }
789}