risingwave_frontend/binder/expr/function/
mod.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::str::FromStr;
17use std::sync::Arc;
18
19use anyhow::Context;
20use itertools::Itertools;
21use risingwave_common::acl::AclMode;
22use risingwave_common::bail_not_implemented;
23use risingwave_common::catalog::INFORMATION_SCHEMA_SCHEMA_NAME;
24use risingwave_common::types::{DataType, MapType, StructType};
25use risingwave_common::util::iter_util::ZipEqFast;
26use risingwave_expr::aggregate::AggType;
27use risingwave_expr::window_function::WindowFuncKind;
28use risingwave_sqlparser::ast::{
29    self, Expr as AstExpr, Function, FunctionArg, FunctionArgExpr, FunctionArgList, Ident,
30    OrderByExpr, SecretRefAsType, Statement, Window,
31};
32use risingwave_sqlparser::parser::Parser;
33
34use crate::binder::Binder;
35use crate::binder::bind_context::Clause;
36use crate::catalog::OwnedByUserCatalog;
37use crate::catalog::function_catalog::FunctionCatalog;
38use crate::error::{ErrorCode, Result, RwError};
39use crate::expr::{
40    Expr, ExprImpl, ExprType, FunctionCall, FunctionCallWithLambda, InputRef, TableFunction,
41    TableFunctionType, UserDefinedFunction,
42};
43use crate::handler::privilege::ObjectCheckItem;
44
45mod aggregate;
46mod builtin_scalar;
47mod window;
48
49// Defines system functions that without args, ref: https://www.postgresql.org/docs/current/functions-info.html
50const SYS_FUNCTION_WITHOUT_ARGS: &[&str] = &[
51    "session_user",
52    "user",
53    "current_user",
54    "current_role",
55    "current_catalog",
56    "current_schema",
57    "current_timestamp",
58];
59
60pub(super) fn is_sys_function_without_args(ident: &Ident) -> bool {
61    SYS_FUNCTION_WITHOUT_ARGS
62        .iter()
63        .any(|e| ident.real_value().as_str() == *e && ident.quote_style().is_none())
64}
65
66macro_rules! reject_syntax {
67    ($pred:expr, $msg:expr) => {
68        if $pred {
69            return Err(ErrorCode::InvalidInputSyntax($msg.to_string()).into());
70        }
71    };
72
73    ($pred:expr, $fmt:expr, $($arg:tt)*) => {
74        if $pred {
75            return Err(ErrorCode::InvalidInputSyntax(
76                format!($fmt, $($arg)*)
77            ).into());
78        }
79    };
80}
81
82impl Binder {
83    pub(in crate::binder) fn bind_function(
84        &mut self,
85        Function {
86            scalar_as_agg,
87            name,
88            arg_list,
89            within_group,
90            filter,
91            over,
92        }: &Function,
93    ) -> Result<ExprImpl> {
94        let (schema_name, func_name) = match name.0.as_slice() {
95            [name] => (None, name.real_value()),
96            [schema, name] => {
97                let schema_name = schema.real_value();
98                let func_name = if schema_name == INFORMATION_SCHEMA_SCHEMA_NAME {
99                    // definition of information_schema: https://github.com/postgres/postgres/blob/e0b2eed047df9045664da6f724cb42c10f8b12f0/src/backend/catalog/information_schema.sql
100                    //
101                    // FIXME: handle schema correctly, so that the functions are hidden if the schema is not in the search path.
102                    let function_name = name.real_value();
103                    if function_name != "_pg_expandarray" {
104                        bail_not_implemented!(
105                            issue = 12422,
106                            "Unsupported function name under schema: {}",
107                            schema_name
108                        );
109                    }
110                    function_name
111                } else {
112                    name.real_value()
113                };
114                (Some(schema_name), func_name)
115            }
116            [database, schema, name] => {
117                // Support database.schema.function qualified names when database matches current database
118                let database_name = database.real_value();
119                if database_name != self.db_name {
120                    return Err(ErrorCode::BindError(format!(
121                        "Cross-database function call is not supported: {}",
122                        name
123                    ))
124                    .into());
125                }
126                let schema_name = schema.real_value();
127                let func_name = name.real_value();
128                (Some(schema_name), func_name)
129            }
130            _ => bail_not_implemented!(issue = 112, "qualified function {}", name),
131        };
132
133        // FIXME: This is a hack to support [Bytebase queries](https://github.com/TennyZhuang/bytebase/blob/4a26f7c62b80e86e58ad2f77063138dc2f420623/backend/plugin/db/pg/sync.go#L549).
134        // Bytebase widely used the pattern like `obj_description(format('%s.%s',
135        // quote_ident(idx.schemaname), quote_ident(idx.indexname))::regclass) AS comment` to
136        // retrieve object comment, however we don't support casting a non-literal expression to
137        // regclass. We just hack the `obj_description` and `col_description` here, to disable it to
138        // bind its arguments.
139        if func_name == "obj_description" || func_name == "col_description" {
140            return Ok(ExprImpl::literal_varchar("".to_owned()));
141        }
142
143        // special binding logic for `array_transform` and `map_filter`
144        if func_name == "array_transform" || func_name == "map_filter" {
145            return self.validate_and_bind_special_function_params(
146                &func_name,
147                *scalar_as_agg,
148                arg_list,
149                within_group.as_deref(),
150                filter.as_deref(),
151                over.as_ref(),
152            );
153        }
154
155        // Bind function arguments. Secret references are bound as ExprImpl::SecretRef first,
156        // and then restricted to UDF calls below.
157        let has_secret_ref_arg = arg_list.args.iter().any(|arg| {
158            matches!(
159                arg,
160                FunctionArg::Unnamed(FunctionArgExpr::SecretRef(_))
161                    | FunctionArg::Named {
162                        arg: FunctionArgExpr::SecretRef(_),
163                        ..
164                    }
165            )
166        });
167
168        // Return a clear usage error before secret lookup when there is no UDF candidate by name.
169        // This avoids leaking "not found" for secret refs used in clearly invalid contexts.
170        if has_secret_ref_arg {
171            let has_udf_candidate = self
172                .catalog
173                .get_functions_by_name(
174                    &self.db_name,
175                    self.bind_schema_path(schema_name.as_deref()),
176                    &func_name,
177                )
178                .map(|(funcs, _)| !funcs.is_empty())
179                .unwrap_or(false);
180            if !has_udf_candidate {
181                return Err(ErrorCode::InvalidInputSyntax(
182                    "secret reference is only allowed in user-defined function arguments"
183                        .to_owned(),
184                )
185                .into());
186            }
187        }
188
189        let bind_arg = if func_name.eq_ignore_ascii_case("jsonb_agg") {
190            Self::bind_jsonb_agg_arg
191        } else {
192            Self::bind_function_arg
193        };
194        let mut args: Vec<ExprImpl> = arg_list
195            .args
196            .iter()
197            .map(|arg| bind_arg(self, arg))
198            .flatten_ok()
199            .try_collect()?;
200
201        let mut referred_udfs = HashSet::new();
202        let mut is_udf_call = false;
203
204        let wrapped_agg_type = if *scalar_as_agg {
205            // Let's firstly try to apply the `AGGREGATE:` prefix.
206            // We will reject functions that are not able to be wrapped as aggregate function.
207            let mut array_args = args
208                .iter()
209                .enumerate()
210                .map(|(i, expr)| InputRef::new(i, DataType::list(expr.return_type())).into())
211                .collect_vec();
212            let schema_path = self.bind_schema_path(schema_name.as_deref());
213            let scalar_func_expr = if let Ok((func, _)) = self.catalog.get_function_by_name_inputs(
214                &self.db_name,
215                schema_path,
216                &func_name,
217                &mut array_args,
218            ) {
219                // record the dependency upon the UDF
220                referred_udfs.insert(func.id);
221                is_udf_call = true;
222                self.check_privilege(
223                    ObjectCheckItem::new(func.owner, AclMode::Execute, func.name.clone(), func.id),
224                    self.database_id,
225                )?;
226
227                if !func.kind.is_scalar() {
228                    return Err(ErrorCode::InvalidInputSyntax(
229                        "expect a scalar function after `AGGREGATE:`".to_owned(),
230                    )
231                    .into());
232                }
233
234                if func.language == "sql" {
235                    self.bind_sql_udf(func.clone(), array_args)?
236                } else {
237                    UserDefinedFunction::new(func.clone(), array_args).into()
238                }
239            } else {
240                self.bind_builtin_scalar_function(&func_name, array_args, arg_list.variadic)?
241            };
242
243            // `AggType::WrapScalar` requires the inner expression to be convertible to `ExprNode`
244            // and directly executable. If there's any subquery in the expression, this will fail.
245            let expr_node = match scalar_func_expr.try_to_expr_proto() {
246                Ok(expr_node) => expr_node,
247                Err(e) => {
248                    return Err(ErrorCode::InvalidInputSyntax(format!(
249                        "function {func_name} cannot be used after `AGGREGATE:`: {e}",
250                    ))
251                    .into());
252                }
253            };
254
255            // now this is either an aggregate/window function call
256            Some(AggType::WrapScalar(expr_node))
257        } else {
258            None
259        };
260
261        let schema_path = self.bind_schema_path(schema_name.as_deref());
262        let udf = if wrapped_agg_type.is_none()
263            && let Ok((func, _)) = self.catalog.get_function_by_name_inputs(
264                &self.db_name,
265                schema_path,
266                &func_name,
267                &mut args,
268            ) {
269            // record the dependency upon the UDF
270            referred_udfs.insert(func.id);
271            is_udf_call = true;
272            self.check_privilege(
273                ObjectCheckItem::new(func.owner, AclMode::Execute, func.name.clone(), func.id),
274                self.database_id,
275            )?;
276            Some(func.clone())
277        } else {
278            None
279        };
280
281        if has_secret_ref_arg && !is_udf_call {
282            return Err(ErrorCode::InvalidInputSyntax(
283                "secret reference is only allowed in user-defined function arguments".to_owned(),
284            )
285            .into());
286        }
287
288        self.included_udfs.extend(referred_udfs);
289
290        let agg_type = if wrapped_agg_type.is_some() {
291            wrapped_agg_type
292        } else if let Some(ref udf) = udf
293            && udf.kind.is_aggregate()
294        {
295            assert_ne!(udf.language, "sql", "SQL UDAF is not supported yet");
296            Some(AggType::UserDefined(udf.as_ref().into()))
297        } else {
298            AggType::from_str(&func_name).ok()
299        };
300
301        // try to bind it as a window function call
302        if let Some(over) = over {
303            reject_syntax!(
304                arg_list.distinct,
305                "`DISTINCT` is not allowed in window function call"
306            );
307            reject_syntax!(
308                arg_list.variadic,
309                "`VARIADIC` is not allowed in window function call"
310            );
311            reject_syntax!(
312                !arg_list.order_by.is_empty(),
313                "`ORDER BY` is not allowed in window function call argument list"
314            );
315            reject_syntax!(
316                within_group.is_some(),
317                "`WITHIN GROUP` is not allowed in window function call"
318            );
319
320            let kind = if let Some(agg_type) = agg_type {
321                // aggregate as window function
322                WindowFuncKind::Aggregate(agg_type)
323            } else if let Ok(kind) = WindowFuncKind::from_str(&func_name) {
324                kind
325            } else {
326                bail_not_implemented!(issue = 8961, "Unrecognized window function: {}", func_name);
327            };
328            return self.bind_window_function(
329                kind,
330                args,
331                arg_list.ignore_nulls,
332                filter.as_deref(),
333                over,
334            );
335        }
336
337        // now it's an aggregate/scalar/table function call
338        reject_syntax!(
339            arg_list.ignore_nulls,
340            "`IGNORE NULLS` is not allowed in aggregate/scalar/table function call"
341        );
342
343        // try to bind it as an aggregate function call
344        if let Some(agg_type) = agg_type {
345            reject_syntax!(
346                arg_list.variadic,
347                "`VARIADIC` is not allowed in aggregate function call"
348            );
349            return self.bind_aggregate_function(
350                agg_type,
351                arg_list.distinct,
352                args,
353                &arg_list.order_by,
354                within_group.as_deref(),
355                filter.as_deref(),
356            );
357        }
358
359        // now it's a scalar/table function call
360        reject_syntax!(
361            arg_list.distinct,
362            "`DISTINCT` is not allowed in scalar/table function call"
363        );
364        reject_syntax!(
365            !arg_list.order_by.is_empty(),
366            "`ORDER BY` is not allowed in scalar/table function call"
367        );
368        reject_syntax!(
369            within_group.is_some(),
370            "`WITHIN GROUP` is not allowed in scalar/table function call"
371        );
372        reject_syntax!(
373            filter.is_some(),
374            "`FILTER` is not allowed in scalar/table function call"
375        );
376
377        // try to bind it as a table function call
378        {
379            // `file_scan` table function
380            if func_name.eq_ignore_ascii_case("file_scan") {
381                reject_syntax!(
382                    arg_list.variadic,
383                    "`VARIADIC` is not allowed in table function call"
384                );
385                self.ensure_table_function_allowed()?;
386                return Ok(TableFunction::new_file_scan(args)?.into());
387            }
388            // `postgres_query` table function
389            if func_name.eq("postgres_query") {
390                reject_syntax!(
391                    arg_list.variadic,
392                    "`VARIADIC` is not allowed in table function call"
393                );
394                self.ensure_table_function_allowed()?;
395                return Ok(TableFunction::new_postgres_query(
396                    &self.catalog,
397                    &self.db_name,
398                    self.bind_schema_path(schema_name.as_deref()),
399                    args,
400                )
401                .context("postgres_query error")?
402                .into());
403            }
404            // `mysql_query` table function
405            if func_name.eq("mysql_query") {
406                reject_syntax!(
407                    arg_list.variadic,
408                    "`VARIADIC` is not allowed in table function call"
409                );
410                self.ensure_table_function_allowed()?;
411                return Ok(TableFunction::new_mysql_query(
412                    &self.catalog,
413                    &self.db_name,
414                    self.bind_schema_path(schema_name.as_deref()),
415                    args,
416                )
417                .context("mysql_query error")?
418                .into());
419            }
420            // `internal_backfill_progress` table function
421            if func_name.eq("internal_backfill_progress") {
422                reject_syntax!(
423                    arg_list.variadic,
424                    "`VARIADIC` is not allowed in table function call"
425                );
426                self.ensure_table_function_allowed()?;
427                return Ok(TableFunction::new_internal_backfill_progress().into());
428            }
429            // `internal_source_backfill_progress` table function
430            if func_name.eq("internal_source_backfill_progress") {
431                reject_syntax!(
432                    arg_list.variadic,
433                    "`VARIADIC` is not allowed in table function call"
434                );
435                self.ensure_table_function_allowed()?;
436                return Ok(TableFunction::new_internal_source_backfill_progress().into());
437            }
438            // `internal_get_channel_delta_stats` table function
439            if func_name.eq("internal_get_channel_delta_stats") {
440                reject_syntax!(
441                    arg_list.variadic,
442                    "`VARIADIC` is not allowed in table function call"
443                );
444                self.ensure_table_function_allowed()?;
445
446                return Ok(TableFunction::new_internal_get_channel_delta_stats(args).into());
447            }
448            // UDTF
449            if let Some(ref udf) = udf
450                && udf.kind.is_table()
451            {
452                reject_syntax!(
453                    arg_list.variadic,
454                    "`VARIADIC` is not allowed in table function call"
455                );
456                self.ensure_table_function_allowed()?;
457                if udf.language == "sql" {
458                    return self.bind_sql_udf(udf.clone(), args);
459                }
460                return Ok(TableFunction::new_user_defined(udf.clone(), args).into());
461            }
462            // builtin table function
463            if let Ok(function_type) = TableFunctionType::from_str(&func_name) {
464                reject_syntax!(
465                    arg_list.variadic,
466                    "`VARIADIC` is not allowed in table function call"
467                );
468                self.ensure_table_function_allowed()?;
469                return Ok(TableFunction::new(function_type, args)?.into());
470            }
471        }
472
473        // try to bind it as a scalar function call
474        if let Some(ref udf) = udf {
475            assert!(udf.kind.is_scalar());
476            reject_syntax!(
477                arg_list.variadic,
478                "`VARIADIC` is not allowed in user-defined function call"
479            );
480            if udf.language == "sql" {
481                return self.bind_sql_udf(udf.clone(), args);
482            }
483            return Ok(UserDefinedFunction::new(udf.clone(), args).into());
484        }
485
486        self.bind_builtin_scalar_function(&func_name, args, arg_list.variadic)
487    }
488
489    fn validate_and_bind_special_function_params(
490        &mut self,
491        func_name: &str,
492        scalar_as_agg: bool,
493        arg_list: &FunctionArgList,
494        within_group: Option<&OrderByExpr>,
495        filter: Option<&risingwave_sqlparser::ast::Expr>,
496        over: Option<&Window>,
497    ) -> Result<ExprImpl> {
498        assert!(["array_transform", "map_filter"].contains(&func_name));
499
500        reject_syntax!(
501            scalar_as_agg,
502            "`AGGREGATE:` prefix is not allowed for `{}`",
503            func_name
504        );
505        reject_syntax!(
506            !arg_list.is_args_only(),
507            "keywords like `DISTINCT`, `ORDER BY` are not allowed in `{}` argument list",
508            func_name
509        );
510        reject_syntax!(
511            within_group.is_some(),
512            "`WITHIN GROUP` is not allowed in `{}` call",
513            func_name
514        );
515        reject_syntax!(
516            filter.is_some(),
517            "`FILTER` is not allowed in `{}` call",
518            func_name
519        );
520        reject_syntax!(
521            over.is_some(),
522            "`OVER` is not allowed in `{}` call",
523            func_name
524        );
525        if func_name == "array_transform" {
526            self.bind_array_transform(&arg_list.args)
527        } else {
528            self.bind_map_filter(&arg_list.args)
529        }
530    }
531
532    fn bind_array_transform(&mut self, args: &[FunctionArg]) -> Result<ExprImpl> {
533        let [array, lambda] = args else {
534            return Err(ErrorCode::BindError(format!(
535                "`array_transform` expect two inputs `array` and `lambda`, but {} were given",
536                args.len()
537            ))
538            .into());
539        };
540
541        let bound_array = self.bind_function_arg(array)?;
542        let [bound_array] = <[ExprImpl; 1]>::try_from(bound_array).map_err(|bound_array| -> RwError {
543            ErrorCode::BindError(format!("The `array` argument for `array_transform` should be bound to one argument, but {} were got", bound_array.len()))
544                .into()
545        })?;
546
547        let inner_ty = match bound_array.return_type() {
548            DataType::List(ty) => ty.into_elem(),
549            real_type => return Err(ErrorCode::BindError(format!(
550                "The `array` argument for `array_transform` should be an array, but {} were got",
551                real_type
552            ))
553            .into()),
554        };
555
556        let ast::FunctionArgExpr::Expr(ast::Expr::LambdaFunction {
557            args: lambda_args,
558            body: lambda_body,
559        }) = lambda.get_expr()
560        else {
561            return Err(ErrorCode::BindError(
562                "The `lambda` argument for `array_transform` should be a lambda function"
563                    .to_owned(),
564            )
565            .into());
566        };
567
568        let [lambda_arg] = <[Ident; 1]>::try_from(lambda_args).map_err(|args| -> RwError {
569            ErrorCode::BindError(format!(
570                "The `lambda` argument for `array_transform` should be a lambda function with one argument, but {} were given",
571                args.len()
572            ))
573            .into()
574        })?;
575
576        let bound_lambda = self.bind_unary_lambda_function(inner_ty, lambda_arg, *lambda_body)?;
577
578        let lambda_ret_type = bound_lambda.return_type();
579        let transform_ret_type = DataType::list(lambda_ret_type);
580
581        Ok(ExprImpl::FunctionCallWithLambda(Box::new(
582            FunctionCallWithLambda::new_unchecked(
583                ExprType::ArrayTransform,
584                vec![bound_array],
585                bound_lambda,
586                transform_ret_type,
587            ),
588        )))
589    }
590
591    fn bind_unary_lambda_function(
592        &mut self,
593        input_ty: DataType,
594        arg: Ident,
595        body: ast::Expr,
596    ) -> Result<ExprImpl> {
597        let lambda_args = HashMap::from([(arg.real_value(), (0usize, input_ty))]);
598        let orig_lambda_args = self.context.lambda_args.replace(lambda_args);
599        let body = self.bind_expr_inner(&body)?;
600        self.context.lambda_args = orig_lambda_args;
601
602        Ok(body)
603    }
604
605    fn bind_map_filter(&mut self, args: &[FunctionArg]) -> Result<ExprImpl> {
606        let [input, lambda] = args else {
607            return Err(ErrorCode::BindError(format!(
608                "`map_filter` requires two arguments (input_map and lambda), got {}",
609                args.len()
610            ))
611            .into());
612        };
613
614        let bound_input = self.bind_function_arg(input)?;
615        let [bound_input] = <[ExprImpl; 1]>::try_from(bound_input).map_err(|e| {
616            ErrorCode::BindError(format!(
617                "Input argument should resolve to single expression, got {}",
618                e.len()
619            ))
620        })?;
621
622        let (key_type, value_type) = match bound_input.return_type() {
623            DataType::Map(map_type) => (map_type.key().clone(), map_type.value().clone()),
624            t => {
625                return Err(
626                    ErrorCode::BindError(format!("Input must be Map type, got {}", t)).into(),
627                );
628            }
629        };
630
631        let ast::FunctionArgExpr::Expr(ast::Expr::LambdaFunction {
632            args: lambda_args,
633            body: lambda_body,
634        }) = lambda.get_expr()
635        else {
636            return Err(ErrorCode::BindError(
637                "Second argument must be a lambda function".to_owned(),
638            )
639            .into());
640        };
641
642        let [key_arg, value_arg] = <[Ident; 2]>::try_from(lambda_args).map_err(|args| {
643            ErrorCode::BindError(format!(
644                "Lambda must have exactly two parameters (key, value), got {}",
645                args.len()
646            ))
647        })?;
648
649        let bound_lambda = self.bind_binary_lambda_function(
650            key_arg,
651            key_type.clone(),
652            value_arg,
653            value_type.clone(),
654            *lambda_body,
655        )?;
656
657        let lambda_ret_type = bound_lambda.return_type();
658        if lambda_ret_type != DataType::Boolean {
659            return Err(ErrorCode::BindError(format!(
660                "Lambda must return Boolean type, got {}",
661                lambda_ret_type
662            ))
663            .into());
664        }
665
666        let map_type = MapType::from_kv(key_type, value_type);
667        let return_type = DataType::Map(map_type);
668
669        Ok(ExprImpl::FunctionCallWithLambda(Box::new(
670            FunctionCallWithLambda::new_unchecked(
671                ExprType::MapFilter,
672                vec![bound_input],
673                bound_lambda,
674                return_type,
675            ),
676        )))
677    }
678
679    fn bind_binary_lambda_function(
680        &mut self,
681        first_arg: Ident,
682        first_ty: DataType,
683        second_arg: Ident,
684        second_ty: DataType,
685        body: ast::Expr,
686    ) -> Result<ExprImpl> {
687        let lambda_args = HashMap::from([
688            (first_arg.real_value(), (0usize, first_ty)),
689            (second_arg.real_value(), (1usize, second_ty)),
690        ]);
691
692        let orig_ctx = self.context.lambda_args.replace(lambda_args);
693        let bound_body = self.bind_expr_inner(&body)?;
694        self.context.lambda_args = orig_ctx;
695
696        Ok(bound_body)
697    }
698
699    fn ensure_table_function_allowed(&self) -> Result<()> {
700        if let Some(clause) = self.context.clause {
701            match clause {
702                Clause::JoinOn
703                | Clause::Where
704                | Clause::Having
705                | Clause::Filter
706                | Clause::Values
707                | Clause::Insert
708                | Clause::GeneratedColumn => {
709                    return Err(ErrorCode::InvalidInputSyntax(format!(
710                        "table functions are not allowed in {}",
711                        clause
712                    ))
713                    .into());
714                }
715                Clause::GroupBy | Clause::From => {}
716            }
717        }
718        Ok(())
719    }
720
721    /// A common utility function to extract sql udf expression out from the input `ast`.
722    pub(crate) fn extract_udf_expr(ast: Vec<Statement>) -> Result<AstExpr> {
723        if ast.len() != 1 {
724            return Err(ErrorCode::InvalidInputSyntax(
725                "the query for sql udf should contain only one statement".to_owned(),
726            )
727            .into());
728        }
729
730        // Extract the expression out
731        let Statement::Query(query) = ast.into_iter().next().unwrap() else {
732            return Err(ErrorCode::InvalidInputSyntax(
733                "invalid function definition, please recheck the syntax".to_owned(),
734            )
735            .into());
736        };
737
738        if let Some(expr) = query.as_single_select_item() {
739            // Inline SQL UDF.
740            Ok(expr.clone())
741        } else {
742            // Subquery SQL UDF.
743            Ok(AstExpr::Subquery(query))
744        }
745    }
746
747    pub fn bind_sql_udf_inner(
748        &mut self,
749        body: &str,
750        arg_names: &[String],
751        args: Vec<ExprImpl>,
752    ) -> Result<ExprImpl> {
753        // This represents the current user defined function is `language sql`
754        let ast = Parser::parse_sql(body)?;
755
756        // Stash the current arguments.
757        // For subquery SQL UDF, as we always push a new context, there should be no arguments to stash.
758        // For inline SQL UDF, we need to stash the arguments in case of nesting.
759        let stashed_arguments = self.context.sql_udf_arguments.take();
760
761        // The actual inline logic for sql udf.
762        let mut arguments = HashMap::new();
763        for (i, arg) in args.into_iter().enumerate() {
764            if arg_names[i].is_empty() {
765                // unnamed argument, use `$1`, `$2` as the name
766                arguments.insert(format!("${}", i + 1), arg);
767            } else {
768                // named argument
769                arguments.insert(arg_names[i].clone(), arg);
770            }
771        }
772        self.context.sql_udf_arguments = Some(arguments);
773
774        let Ok(expr) = Self::extract_udf_expr(ast) else {
775            return Err(ErrorCode::InvalidInputSyntax(
776                "failed to parse the input query and extract the udf expression, \
777                please recheck the syntax"
778                    .to_owned(),
779            )
780            .into());
781        };
782
783        let bind_result = self.bind_expr(&expr);
784        // Restore arguments information for subsequent binding.
785        self.context.sql_udf_arguments = stashed_arguments;
786
787        bind_result
788    }
789
790    fn bind_sql_udf(
791        &mut self,
792        func: Arc<FunctionCatalog>,
793        args: Vec<ExprImpl>,
794    ) -> Result<ExprImpl> {
795        let Some(body) = &func.body else {
796            return Err(
797                ErrorCode::InvalidInputSyntax("`body` must exist for sql udf".to_owned()).into(),
798            );
799        };
800
801        self.bind_sql_udf_inner(body, &func.arg_names, args)
802    }
803
804    pub(in crate::binder) fn bind_function_expr_arg(
805        &mut self,
806        arg_expr: &FunctionArgExpr,
807    ) -> Result<Vec<ExprImpl>> {
808        match arg_expr {
809            FunctionArgExpr::Expr(expr) => Ok(vec![self.bind_expr_inner(expr)?]),
810            FunctionArgExpr::QualifiedWildcard(_, _)
811            | FunctionArgExpr::ExprQualifiedWildcard(_, _) => Err(ErrorCode::InvalidInputSyntax(
812                format!("unexpected wildcard {}", arg_expr),
813            )
814            .into()),
815            FunctionArgExpr::Wildcard(None) => Ok(vec![]),
816            FunctionArgExpr::Wildcard(Some(_)) => unreachable!(),
817            FunctionArgExpr::SecretRef(secret_ref_value) => {
818                let (schema_name, secret_name) = Binder::resolve_schema_qualified_name(
819                    &self.db_name,
820                    &secret_ref_value.secret_name,
821                )?;
822                let schema_path = self.bind_schema_path(schema_name.as_deref());
823                let (secret_catalog, _) =
824                    self.catalog
825                        .get_secret_by_name(&self.db_name, schema_path, &secret_name)?;
826
827                self.check_privilege(
828                    ObjectCheckItem::new(
829                        secret_catalog.owner(),
830                        AclMode::Usage,
831                        secret_catalog.name.clone(),
832                        secret_catalog.id,
833                    ),
834                    self.database_id,
835                )?;
836
837                self.included_secrets.insert(secret_catalog.id);
838
839                let ref_as = match secret_ref_value.ref_as {
840                    SecretRefAsType::Text => risingwave_pb::secret::secret_ref::RefAsType::Text,
841                    SecretRefAsType::File => risingwave_pb::secret::secret_ref::RefAsType::File,
842                };
843
844                Ok(vec![
845                    crate::expr::SecretRef {
846                        secret_id: secret_catalog.id,
847                        ref_as,
848                        secret_name: secret_catalog.name.clone(),
849                    }
850                    .into(),
851                ])
852            }
853        }
854    }
855
856    pub(in crate::binder) fn bind_function_arg(
857        &mut self,
858        arg: &FunctionArg,
859    ) -> Result<Vec<ExprImpl>> {
860        match arg {
861            FunctionArg::Unnamed(expr) => self.bind_function_expr_arg(expr),
862            FunctionArg::Named { .. } => Err(ErrorCode::InvalidInputSyntax(
863                "named function arguments are not supported yet".to_owned(),
864            )
865            .into()),
866        }
867    }
868
869    fn bind_jsonb_agg_arg(&mut self, arg: &FunctionArg) -> Result<Vec<ExprImpl>> {
870        match arg {
871            FunctionArg::Unnamed(FunctionArgExpr::QualifiedWildcard(prefix, except)) => {
872                let relation_name = prefix.to_string();
873                let (schema_name, table_name) =
874                    Binder::resolve_schema_qualified_name(&self.db_name, prefix)?;
875                let except_indices = self.generate_except_indices(except.as_deref())?;
876                let (begin, end) = self
877                    .context
878                    .range_of
879                    .get(&(schema_name, table_name))
880                    .ok_or_else(|| {
881                        ErrorCode::ItemNotFound(format!("relation \"{}\"", relation_name))
882                    })?;
883                let (exprs, names) = Self::iter_bound_columns(
884                    self.context.columns[*begin..*end]
885                        .iter()
886                        .filter(|c| !c.is_hidden && !except_indices.contains(&c.index)),
887                );
888                self.wrap_wildcard_exprs_as_named_row(exprs, names)
889            }
890            FunctionArg::Unnamed(FunctionArgExpr::ExprQualifiedWildcard(expr, prefix)) => {
891                let (exprs, names) = self.bind_wildcard_field_column(expr, prefix)?;
892                self.wrap_wildcard_exprs_as_named_row(exprs, names)
893            }
894            _ => self.bind_function_arg(arg),
895        }
896    }
897
898    fn wrap_wildcard_exprs_as_named_row(
899        &self,
900        exprs: Vec<ExprImpl>,
901        names: Vec<Option<String>>,
902    ) -> Result<Vec<ExprImpl>> {
903        let return_type =
904            DataType::Struct(StructType::new(
905                names.into_iter().zip_eq_fast(exprs.iter()).enumerate().map(
906                    |(idx, (name, expr))| {
907                        (
908                            name.unwrap_or_else(|| format!("f{}", idx + 1)),
909                            expr.return_type(),
910                        )
911                    },
912                ),
913            ));
914        Ok(vec![
915            FunctionCall::new_unchecked(ExprType::Row, exprs, return_type).into(),
916        ])
917    }
918}