risingwave_frontend/binder/expr/function/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::str::FromStr;
17use std::sync::Arc;
18
19use anyhow::Context;
20use itertools::Itertools;
21use risingwave_common::acl::AclMode;
22use risingwave_common::bail_not_implemented;
23use risingwave_common::catalog::INFORMATION_SCHEMA_SCHEMA_NAME;
24use risingwave_common::types::{DataType, MapType};
25use risingwave_expr::aggregate::AggType;
26use risingwave_expr::window_function::WindowFuncKind;
27use risingwave_sqlparser::ast::{
28    self, Expr as AstExpr, Function, FunctionArg, FunctionArgExpr, FunctionArgList, Ident,
29    OrderByExpr, Statement, Window,
30};
31use risingwave_sqlparser::parser::Parser;
32
33use crate::binder::Binder;
34use crate::binder::bind_context::Clause;
35use crate::catalog::function_catalog::FunctionCatalog;
36use crate::error::{ErrorCode, Result, RwError};
37use crate::expr::{
38    Expr, ExprImpl, ExprType, FunctionCallWithLambda, InputRef, TableFunction, TableFunctionType,
39    UserDefinedFunction,
40};
41use crate::handler::privilege::ObjectCheckItem;
42
43mod aggregate;
44mod builtin_scalar;
45mod window;
46
47// Defines system functions that without args, ref: https://www.postgresql.org/docs/current/functions-info.html
48const SYS_FUNCTION_WITHOUT_ARGS: &[&str] = &[
49    "session_user",
50    "user",
51    "current_user",
52    "current_role",
53    "current_catalog",
54    "current_schema",
55    "current_timestamp",
56];
57
58pub(super) fn is_sys_function_without_args(ident: &Ident) -> bool {
59    SYS_FUNCTION_WITHOUT_ARGS
60        .iter()
61        .any(|e| ident.real_value().as_str() == *e && ident.quote_style().is_none())
62}
63
64macro_rules! reject_syntax {
65    ($pred:expr, $msg:expr) => {
66        if $pred {
67            return Err(ErrorCode::InvalidInputSyntax($msg.to_string()).into());
68        }
69    };
70
71    ($pred:expr, $fmt:expr, $($arg:tt)*) => {
72        if $pred {
73            return Err(ErrorCode::InvalidInputSyntax(
74                format!($fmt, $($arg)*)
75            ).into());
76        }
77    };
78}
79
80impl Binder {
81    pub(in crate::binder) fn bind_function(
82        &mut self,
83        Function {
84            scalar_as_agg,
85            name,
86            arg_list,
87            within_group,
88            filter,
89            over,
90        }: &Function,
91    ) -> Result<ExprImpl> {
92        let (schema_name, func_name) = match name.0.as_slice() {
93            [name] => (None, name.real_value()),
94            [schema, name] => {
95                let schema_name = schema.real_value();
96                let func_name = if schema_name == INFORMATION_SCHEMA_SCHEMA_NAME {
97                    // definition of information_schema: https://github.com/postgres/postgres/blob/e0b2eed047df9045664da6f724cb42c10f8b12f0/src/backend/catalog/information_schema.sql
98                    //
99                    // FIXME: handle schema correctly, so that the functions are hidden if the schema is not in the search path.
100                    let function_name = name.real_value();
101                    if function_name != "_pg_expandarray" {
102                        bail_not_implemented!(
103                            issue = 12422,
104                            "Unsupported function name under schema: {}",
105                            schema_name
106                        );
107                    }
108                    function_name
109                } else {
110                    name.real_value()
111                };
112                (Some(schema_name), func_name)
113            }
114            [database, schema, name] => {
115                // Support database.schema.function qualified names when database matches current database
116                let database_name = database.real_value();
117                if database_name != self.db_name {
118                    return Err(ErrorCode::BindError(format!(
119                        "Cross-database function call is not supported: {}",
120                        name
121                    ))
122                    .into());
123                }
124                let schema_name = schema.real_value();
125                let func_name = name.real_value();
126                (Some(schema_name), func_name)
127            }
128            _ => bail_not_implemented!(issue = 112, "qualified function {}", name),
129        };
130
131        // FIXME: This is a hack to support [Bytebase queries](https://github.com/TennyZhuang/bytebase/blob/4a26f7c62b80e86e58ad2f77063138dc2f420623/backend/plugin/db/pg/sync.go#L549).
132        // Bytebase widely used the pattern like `obj_description(format('%s.%s',
133        // quote_ident(idx.schemaname), quote_ident(idx.indexname))::regclass) AS comment` to
134        // retrieve object comment, however we don't support casting a non-literal expression to
135        // regclass. We just hack the `obj_description` and `col_description` here, to disable it to
136        // bind its arguments.
137        if func_name == "obj_description" || func_name == "col_description" {
138            return Ok(ExprImpl::literal_varchar("".to_owned()));
139        }
140
141        // special binding logic for `array_transform` and `map_filter`
142        if func_name == "array_transform" || func_name == "map_filter" {
143            return self.validate_and_bind_special_function_params(
144                &func_name,
145                *scalar_as_agg,
146                arg_list,
147                within_group.as_deref(),
148                filter.as_deref(),
149                over.as_ref(),
150            );
151        }
152
153        let mut args: Vec<_> = arg_list
154            .args
155            .iter()
156            .map(|arg| self.bind_function_arg(arg))
157            .flatten_ok()
158            .try_collect()?;
159
160        let mut referred_udfs = HashSet::new();
161
162        let wrapped_agg_type = if *scalar_as_agg {
163            // Let's firstly try to apply the `AGGREGATE:` prefix.
164            // We will reject functions that are not able to be wrapped as aggregate function.
165            let mut array_args = args
166                .iter()
167                .enumerate()
168                .map(|(i, expr)| InputRef::new(i, DataType::list(expr.return_type())).into())
169                .collect_vec();
170            let schema_path = self.bind_schema_path(schema_name.as_deref());
171            let scalar_func_expr = if let Ok((func, _)) = self.catalog.get_function_by_name_inputs(
172                &self.db_name,
173                schema_path,
174                &func_name,
175                &mut array_args,
176            ) {
177                // record the dependency upon the UDF
178                referred_udfs.insert(func.id);
179                self.check_privilege(
180                    ObjectCheckItem::new(func.owner, AclMode::Execute, func.name.clone(), func.id),
181                    self.database_id,
182                )?;
183
184                if !func.kind.is_scalar() {
185                    return Err(ErrorCode::InvalidInputSyntax(
186                        "expect a scalar function after `AGGREGATE:`".to_owned(),
187                    )
188                    .into());
189                }
190
191                if func.language == "sql" {
192                    self.bind_sql_udf(func.clone(), array_args)?
193                } else {
194                    UserDefinedFunction::new(func.clone(), array_args).into()
195                }
196            } else {
197                self.bind_builtin_scalar_function(&func_name, array_args, arg_list.variadic)?
198            };
199
200            // `AggType::WrapScalar` requires the inner expression to be convertible to `ExprNode`
201            // and directly executable. If there's any subquery in the expression, this will fail.
202            let expr_node = match scalar_func_expr.try_to_expr_proto() {
203                Ok(expr_node) => expr_node,
204                Err(e) => {
205                    return Err(ErrorCode::InvalidInputSyntax(format!(
206                        "function {func_name} cannot be used after `AGGREGATE:`: {e}",
207                    ))
208                    .into());
209                }
210            };
211
212            // now this is either an aggregate/window function call
213            Some(AggType::WrapScalar(expr_node))
214        } else {
215            None
216        };
217
218        let schema_path = self.bind_schema_path(schema_name.as_deref());
219        let udf = if wrapped_agg_type.is_none()
220            && let Ok((func, _)) = self.catalog.get_function_by_name_inputs(
221                &self.db_name,
222                schema_path,
223                &func_name,
224                &mut args,
225            ) {
226            // record the dependency upon the UDF
227            referred_udfs.insert(func.id);
228            self.check_privilege(
229                ObjectCheckItem::new(func.owner, AclMode::Execute, func.name.clone(), func.id),
230                self.database_id,
231            )?;
232            Some(func.clone())
233        } else {
234            None
235        };
236
237        self.included_udfs.extend(referred_udfs);
238
239        let agg_type = if wrapped_agg_type.is_some() {
240            wrapped_agg_type
241        } else if let Some(ref udf) = udf
242            && udf.kind.is_aggregate()
243        {
244            assert_ne!(udf.language, "sql", "SQL UDAF is not supported yet");
245            Some(AggType::UserDefined(udf.as_ref().into()))
246        } else {
247            AggType::from_str(&func_name).ok()
248        };
249
250        // try to bind it as a window function call
251        if let Some(over) = over {
252            reject_syntax!(
253                arg_list.distinct,
254                "`DISTINCT` is not allowed in window function call"
255            );
256            reject_syntax!(
257                arg_list.variadic,
258                "`VARIADIC` is not allowed in window function call"
259            );
260            reject_syntax!(
261                !arg_list.order_by.is_empty(),
262                "`ORDER BY` is not allowed in window function call argument list"
263            );
264            reject_syntax!(
265                within_group.is_some(),
266                "`WITHIN GROUP` is not allowed in window function call"
267            );
268
269            let kind = if let Some(agg_type) = agg_type {
270                // aggregate as window function
271                WindowFuncKind::Aggregate(agg_type)
272            } else if let Ok(kind) = WindowFuncKind::from_str(&func_name) {
273                kind
274            } else {
275                bail_not_implemented!(issue = 8961, "Unrecognized window function: {}", func_name);
276            };
277            return self.bind_window_function(
278                kind,
279                args,
280                arg_list.ignore_nulls,
281                filter.as_deref(),
282                over,
283            );
284        }
285
286        // now it's an aggregate/scalar/table function call
287        reject_syntax!(
288            arg_list.ignore_nulls,
289            "`IGNORE NULLS` is not allowed in aggregate/scalar/table function call"
290        );
291
292        // try to bind it as an aggregate function call
293        if let Some(agg_type) = agg_type {
294            reject_syntax!(
295                arg_list.variadic,
296                "`VARIADIC` is not allowed in aggregate function call"
297            );
298            return self.bind_aggregate_function(
299                agg_type,
300                arg_list.distinct,
301                args,
302                &arg_list.order_by,
303                within_group.as_deref(),
304                filter.as_deref(),
305            );
306        }
307
308        // now it's a scalar/table function call
309        reject_syntax!(
310            arg_list.distinct,
311            "`DISTINCT` is not allowed in scalar/table function call"
312        );
313        reject_syntax!(
314            !arg_list.order_by.is_empty(),
315            "`ORDER BY` is not allowed in scalar/table function call"
316        );
317        reject_syntax!(
318            within_group.is_some(),
319            "`WITHIN GROUP` is not allowed in scalar/table function call"
320        );
321        reject_syntax!(
322            filter.is_some(),
323            "`FILTER` is not allowed in scalar/table function call"
324        );
325
326        // try to bind it as a table function call
327        {
328            // `file_scan` table function
329            if func_name.eq_ignore_ascii_case("file_scan") {
330                reject_syntax!(
331                    arg_list.variadic,
332                    "`VARIADIC` is not allowed in table function call"
333                );
334                self.ensure_table_function_allowed()?;
335                return Ok(TableFunction::new_file_scan(args)?.into());
336            }
337            // `postgres_query` table function
338            if func_name.eq("postgres_query") {
339                reject_syntax!(
340                    arg_list.variadic,
341                    "`VARIADIC` is not allowed in table function call"
342                );
343                self.ensure_table_function_allowed()?;
344                return Ok(TableFunction::new_postgres_query(
345                    &self.catalog,
346                    &self.db_name,
347                    self.bind_schema_path(schema_name.as_deref()),
348                    args,
349                )
350                .context("postgres_query error")?
351                .into());
352            }
353            // `mysql_query` table function
354            if func_name.eq("mysql_query") {
355                reject_syntax!(
356                    arg_list.variadic,
357                    "`VARIADIC` is not allowed in table function call"
358                );
359                self.ensure_table_function_allowed()?;
360                return Ok(TableFunction::new_mysql_query(
361                    &self.catalog,
362                    &self.db_name,
363                    self.bind_schema_path(schema_name.as_deref()),
364                    args,
365                )
366                .context("mysql_query error")?
367                .into());
368            }
369            // `internal_backfill_progress` table function
370            if func_name.eq("internal_backfill_progress") {
371                reject_syntax!(
372                    arg_list.variadic,
373                    "`VARIADIC` is not allowed in table function call"
374                );
375                self.ensure_table_function_allowed()?;
376                return Ok(TableFunction::new_internal_backfill_progress().into());
377            }
378            // `internal_source_backfill_progress` table function
379            if func_name.eq("internal_source_backfill_progress") {
380                reject_syntax!(
381                    arg_list.variadic,
382                    "`VARIADIC` is not allowed in table function call"
383                );
384                self.ensure_table_function_allowed()?;
385                return Ok(TableFunction::new_internal_source_backfill_progress().into());
386            }
387            // `internal_get_channel_delta_stats` table function
388            if func_name.eq("internal_get_channel_delta_stats") {
389                reject_syntax!(
390                    arg_list.variadic,
391                    "`VARIADIC` is not allowed in table function call"
392                );
393                self.ensure_table_function_allowed()?;
394
395                return Ok(TableFunction::new_internal_get_channel_delta_stats(args).into());
396            }
397            // UDTF
398            if let Some(ref udf) = udf
399                && udf.kind.is_table()
400            {
401                reject_syntax!(
402                    arg_list.variadic,
403                    "`VARIADIC` is not allowed in table function call"
404                );
405                self.ensure_table_function_allowed()?;
406                if udf.language == "sql" {
407                    return self.bind_sql_udf(udf.clone(), args);
408                }
409                return Ok(TableFunction::new_user_defined(udf.clone(), args).into());
410            }
411            // builtin table function
412            if let Ok(function_type) = TableFunctionType::from_str(&func_name) {
413                reject_syntax!(
414                    arg_list.variadic,
415                    "`VARIADIC` is not allowed in table function call"
416                );
417                self.ensure_table_function_allowed()?;
418                return Ok(TableFunction::new(function_type, args)?.into());
419            }
420        }
421
422        // try to bind it as a scalar function call
423        if let Some(ref udf) = udf {
424            assert!(udf.kind.is_scalar());
425            reject_syntax!(
426                arg_list.variadic,
427                "`VARIADIC` is not allowed in user-defined function call"
428            );
429            if udf.language == "sql" {
430                return self.bind_sql_udf(udf.clone(), args);
431            }
432            return Ok(UserDefinedFunction::new(udf.clone(), args).into());
433        }
434
435        self.bind_builtin_scalar_function(&func_name, args, arg_list.variadic)
436    }
437
438    fn validate_and_bind_special_function_params(
439        &mut self,
440        func_name: &str,
441        scalar_as_agg: bool,
442        arg_list: &FunctionArgList,
443        within_group: Option<&OrderByExpr>,
444        filter: Option<&risingwave_sqlparser::ast::Expr>,
445        over: Option<&Window>,
446    ) -> Result<ExprImpl> {
447        assert!(["array_transform", "map_filter"].contains(&func_name));
448
449        reject_syntax!(
450            scalar_as_agg,
451            "`AGGREGATE:` prefix is not allowed for `{}`",
452            func_name
453        );
454        reject_syntax!(
455            !arg_list.is_args_only(),
456            "keywords like `DISTINCT`, `ORDER BY` are not allowed in `{}` argument list",
457            func_name
458        );
459        reject_syntax!(
460            within_group.is_some(),
461            "`WITHIN GROUP` is not allowed in `{}` call",
462            func_name
463        );
464        reject_syntax!(
465            filter.is_some(),
466            "`FILTER` is not allowed in `{}` call",
467            func_name
468        );
469        reject_syntax!(
470            over.is_some(),
471            "`OVER` is not allowed in `{}` call",
472            func_name
473        );
474        if func_name == "array_transform" {
475            self.bind_array_transform(&arg_list.args)
476        } else {
477            self.bind_map_filter(&arg_list.args)
478        }
479    }
480
481    fn bind_array_transform(&mut self, args: &[FunctionArg]) -> Result<ExprImpl> {
482        let [array, lambda] = args else {
483            return Err(ErrorCode::BindError(format!(
484                "`array_transform` expect two inputs `array` and `lambda`, but {} were given",
485                args.len()
486            ))
487            .into());
488        };
489
490        let bound_array = self.bind_function_arg(array)?;
491        let [bound_array] = <[ExprImpl; 1]>::try_from(bound_array).map_err(|bound_array| -> RwError {
492            ErrorCode::BindError(format!("The `array` argument for `array_transform` should be bound to one argument, but {} were got", bound_array.len()))
493                .into()
494        })?;
495
496        let inner_ty = match bound_array.return_type() {
497            DataType::List(ty) => ty.into_elem(),
498            real_type => return Err(ErrorCode::BindError(format!(
499                "The `array` argument for `array_transform` should be an array, but {} were got",
500                real_type
501            ))
502            .into()),
503        };
504
505        let ast::FunctionArgExpr::Expr(ast::Expr::LambdaFunction {
506            args: lambda_args,
507            body: lambda_body,
508        }) = lambda.get_expr()
509        else {
510            return Err(ErrorCode::BindError(
511                "The `lambda` argument for `array_transform` should be a lambda function"
512                    .to_owned(),
513            )
514            .into());
515        };
516
517        let [lambda_arg] = <[Ident; 1]>::try_from(lambda_args).map_err(|args| -> RwError {
518            ErrorCode::BindError(format!(
519                "The `lambda` argument for `array_transform` should be a lambda function with one argument, but {} were given",
520                args.len()
521            ))
522            .into()
523        })?;
524
525        let bound_lambda = self.bind_unary_lambda_function(inner_ty, lambda_arg, *lambda_body)?;
526
527        let lambda_ret_type = bound_lambda.return_type();
528        let transform_ret_type = DataType::list(lambda_ret_type);
529
530        Ok(ExprImpl::FunctionCallWithLambda(Box::new(
531            FunctionCallWithLambda::new_unchecked(
532                ExprType::ArrayTransform,
533                vec![bound_array],
534                bound_lambda,
535                transform_ret_type,
536            ),
537        )))
538    }
539
540    fn bind_unary_lambda_function(
541        &mut self,
542        input_ty: DataType,
543        arg: Ident,
544        body: ast::Expr,
545    ) -> Result<ExprImpl> {
546        let lambda_args = HashMap::from([(arg.real_value(), (0usize, input_ty))]);
547        let orig_lambda_args = self.context.lambda_args.replace(lambda_args);
548        let body = self.bind_expr_inner(&body)?;
549        self.context.lambda_args = orig_lambda_args;
550
551        Ok(body)
552    }
553
554    fn bind_map_filter(&mut self, args: &[FunctionArg]) -> Result<ExprImpl> {
555        let [input, lambda] = args else {
556            return Err(ErrorCode::BindError(format!(
557                "`map_filter` requires two arguments (input_map and lambda), got {}",
558                args.len()
559            ))
560            .into());
561        };
562
563        let bound_input = self.bind_function_arg(input)?;
564        let [bound_input] = <[ExprImpl; 1]>::try_from(bound_input).map_err(|e| {
565            ErrorCode::BindError(format!(
566                "Input argument should resolve to single expression, got {}",
567                e.len()
568            ))
569        })?;
570
571        let (key_type, value_type) = match bound_input.return_type() {
572            DataType::Map(map_type) => (map_type.key().clone(), map_type.value().clone()),
573            t => {
574                return Err(
575                    ErrorCode::BindError(format!("Input must be Map type, got {}", t)).into(),
576                );
577            }
578        };
579
580        let ast::FunctionArgExpr::Expr(ast::Expr::LambdaFunction {
581            args: lambda_args,
582            body: lambda_body,
583        }) = lambda.get_expr()
584        else {
585            return Err(ErrorCode::BindError(
586                "Second argument must be a lambda function".to_owned(),
587            )
588            .into());
589        };
590
591        let [key_arg, value_arg] = <[Ident; 2]>::try_from(lambda_args).map_err(|args| {
592            ErrorCode::BindError(format!(
593                "Lambda must have exactly two parameters (key, value), got {}",
594                args.len()
595            ))
596        })?;
597
598        let bound_lambda = self.bind_binary_lambda_function(
599            key_arg,
600            key_type.clone(),
601            value_arg,
602            value_type.clone(),
603            *lambda_body,
604        )?;
605
606        let lambda_ret_type = bound_lambda.return_type();
607        if lambda_ret_type != DataType::Boolean {
608            return Err(ErrorCode::BindError(format!(
609                "Lambda must return Boolean type, got {}",
610                lambda_ret_type
611            ))
612            .into());
613        }
614
615        let map_type = MapType::from_kv(key_type, value_type);
616        let return_type = DataType::Map(map_type);
617
618        Ok(ExprImpl::FunctionCallWithLambda(Box::new(
619            FunctionCallWithLambda::new_unchecked(
620                ExprType::MapFilter,
621                vec![bound_input],
622                bound_lambda,
623                return_type,
624            ),
625        )))
626    }
627
628    fn bind_binary_lambda_function(
629        &mut self,
630        first_arg: Ident,
631        first_ty: DataType,
632        second_arg: Ident,
633        second_ty: DataType,
634        body: ast::Expr,
635    ) -> Result<ExprImpl> {
636        let lambda_args = HashMap::from([
637            (first_arg.real_value(), (0usize, first_ty)),
638            (second_arg.real_value(), (1usize, second_ty)),
639        ]);
640
641        let orig_ctx = self.context.lambda_args.replace(lambda_args);
642        let bound_body = self.bind_expr_inner(&body)?;
643        self.context.lambda_args = orig_ctx;
644
645        Ok(bound_body)
646    }
647
648    fn ensure_table_function_allowed(&self) -> Result<()> {
649        if let Some(clause) = self.context.clause {
650            match clause {
651                Clause::JoinOn
652                | Clause::Where
653                | Clause::Having
654                | Clause::Filter
655                | Clause::Values
656                | Clause::Insert
657                | Clause::GeneratedColumn => {
658                    return Err(ErrorCode::InvalidInputSyntax(format!(
659                        "table functions are not allowed in {}",
660                        clause
661                    ))
662                    .into());
663                }
664                Clause::GroupBy | Clause::From => {}
665            }
666        }
667        Ok(())
668    }
669
670    /// A common utility function to extract sql udf expression out from the input `ast`.
671    pub(crate) fn extract_udf_expr(ast: Vec<Statement>) -> Result<AstExpr> {
672        if ast.len() != 1 {
673            return Err(ErrorCode::InvalidInputSyntax(
674                "the query for sql udf should contain only one statement".to_owned(),
675            )
676            .into());
677        }
678
679        // Extract the expression out
680        let Statement::Query(query) = ast.into_iter().next().unwrap() else {
681            return Err(ErrorCode::InvalidInputSyntax(
682                "invalid function definition, please recheck the syntax".to_owned(),
683            )
684            .into());
685        };
686
687        if let Some(expr) = query.as_single_select_item() {
688            // Inline SQL UDF.
689            Ok(expr.clone())
690        } else {
691            // Subquery SQL UDF.
692            Ok(AstExpr::Subquery(query))
693        }
694    }
695
696    pub fn bind_sql_udf_inner(
697        &mut self,
698        body: &str,
699        arg_names: &[String],
700        args: Vec<ExprImpl>,
701    ) -> Result<ExprImpl> {
702        // This represents the current user defined function is `language sql`
703        let ast = Parser::parse_sql(body)?;
704
705        // Stash the current arguments.
706        // For subquery SQL UDF, as we always push a new context, there should be no arguments to stash.
707        // For inline SQL UDF, we need to stash the arguments in case of nesting.
708        let stashed_arguments = self.context.sql_udf_arguments.take();
709
710        // The actual inline logic for sql udf.
711        let mut arguments = HashMap::new();
712        for (i, arg) in args.into_iter().enumerate() {
713            if arg_names[i].is_empty() {
714                // unnamed argument, use `$1`, `$2` as the name
715                arguments.insert(format!("${}", i + 1), arg);
716            } else {
717                // named argument
718                arguments.insert(arg_names[i].clone(), arg);
719            }
720        }
721        self.context.sql_udf_arguments = Some(arguments);
722
723        let Ok(expr) = Self::extract_udf_expr(ast) else {
724            return Err(ErrorCode::InvalidInputSyntax(
725                "failed to parse the input query and extract the udf expression, \
726                please recheck the syntax"
727                    .to_owned(),
728            )
729            .into());
730        };
731
732        let bind_result = self.bind_expr(&expr);
733        // Restore arguments information for subsequent binding.
734        self.context.sql_udf_arguments = stashed_arguments;
735
736        bind_result
737    }
738
739    fn bind_sql_udf(
740        &mut self,
741        func: Arc<FunctionCatalog>,
742        args: Vec<ExprImpl>,
743    ) -> Result<ExprImpl> {
744        let Some(body) = &func.body else {
745            return Err(
746                ErrorCode::InvalidInputSyntax("`body` must exist for sql udf".to_owned()).into(),
747            );
748        };
749
750        self.bind_sql_udf_inner(body, &func.arg_names, args)
751    }
752
753    pub(in crate::binder) fn bind_function_expr_arg(
754        &mut self,
755        arg_expr: &FunctionArgExpr,
756    ) -> Result<Vec<ExprImpl>> {
757        match arg_expr {
758            FunctionArgExpr::Expr(expr) => Ok(vec![self.bind_expr_inner(expr)?]),
759            FunctionArgExpr::QualifiedWildcard(_, _)
760            | FunctionArgExpr::ExprQualifiedWildcard(_, _) => Err(ErrorCode::InvalidInputSyntax(
761                format!("unexpected wildcard {}", arg_expr),
762            )
763            .into()),
764            FunctionArgExpr::Wildcard(None) => Ok(vec![]),
765            FunctionArgExpr::Wildcard(Some(_)) => unreachable!(),
766        }
767    }
768
769    pub(in crate::binder) fn bind_function_arg(
770        &mut self,
771        arg: &FunctionArg,
772    ) -> Result<Vec<ExprImpl>> {
773        match arg {
774            FunctionArg::Unnamed(expr) => self.bind_function_expr_arg(expr),
775            FunctionArg::Named { .. } => todo!(),
776        }
777    }
778}