risingwave_frontend/binder/expr/function/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::str::FromStr;
17use std::sync::Arc;
18
19use anyhow::Context;
20use itertools::Itertools;
21use risingwave_common::acl::AclMode;
22use risingwave_common::bail_not_implemented;
23use risingwave_common::catalog::{INFORMATION_SCHEMA_SCHEMA_NAME, PG_CATALOG_SCHEMA_NAME};
24use risingwave_common::types::DataType;
25use risingwave_expr::aggregate::AggType;
26use risingwave_expr::window_function::WindowFuncKind;
27use risingwave_pb::user::grant_privilege::PbObject;
28use risingwave_sqlparser::ast::{self, Function, FunctionArg, FunctionArgExpr, Ident};
29use risingwave_sqlparser::parser::ParserError;
30
31use crate::binder::bind_context::Clause;
32use crate::binder::{Binder, UdfContext};
33use crate::catalog::function_catalog::FunctionCatalog;
34use crate::error::{ErrorCode, Result, RwError};
35use crate::expr::{
36    Expr, ExprImpl, ExprType, FunctionCallWithLambda, InputRef, TableFunction, TableFunctionType,
37    UserDefinedFunction,
38};
39
40mod aggregate;
41mod builtin_scalar;
42mod window;
43
44// Defines system functions that without args, ref: https://www.postgresql.org/docs/current/functions-info.html
45const SYS_FUNCTION_WITHOUT_ARGS: &[&str] = &[
46    "session_user",
47    "user",
48    "current_user",
49    "current_role",
50    "current_catalog",
51    "current_schema",
52    "current_timestamp",
53];
54
55pub(super) fn is_sys_function_without_args(ident: &Ident) -> bool {
56    SYS_FUNCTION_WITHOUT_ARGS
57        .iter()
58        .any(|e| ident.real_value().as_str() == *e && ident.quote_style().is_none())
59}
60
61/// The global max calling depth for the global counter in `udf_context`
62/// To reduce the chance that the current running rw thread
63/// be killed by os, the current allowance depth of calling
64/// stack is set to `16`.
65const SQL_UDF_MAX_CALLING_DEPTH: u32 = 16;
66
67macro_rules! reject_syntax {
68    ($pred:expr, $msg:expr) => {
69        if $pred {
70            return Err(ErrorCode::InvalidInputSyntax($msg.to_string()).into());
71        }
72    };
73}
74
75impl Binder {
76    pub(in crate::binder) fn bind_function(
77        &mut self,
78        Function {
79            scalar_as_agg,
80            name,
81            arg_list,
82            within_group,
83            filter,
84            over,
85        }: Function,
86    ) -> Result<ExprImpl> {
87        let (schema_name, func_name) = match name.0.as_slice() {
88            [name] => (None, name.real_value()),
89            [schema, name] => {
90                let schema_name = schema.real_value();
91                let func_name = if schema_name == PG_CATALOG_SCHEMA_NAME {
92                    // pg_catalog is always effectively part of the search path, so we can always bind the function.
93                    // Ref: https://www.postgresql.org/docs/current/ddl-schemas.html#DDL-SCHEMAS-CATALOG
94                    name.real_value()
95                } else if schema_name == INFORMATION_SCHEMA_SCHEMA_NAME {
96                    // definition of information_schema: https://github.com/postgres/postgres/blob/e0b2eed047df9045664da6f724cb42c10f8b12f0/src/backend/catalog/information_schema.sql
97                    //
98                    // FIXME: handle schema correctly, so that the functions are hidden if the schema is not in the search path.
99                    let function_name = name.real_value();
100                    if function_name != "_pg_expandarray" {
101                        bail_not_implemented!(
102                            issue = 12422,
103                            "Unsupported function name under schema: {}",
104                            schema_name
105                        );
106                    }
107                    function_name
108                } else {
109                    bail_not_implemented!(
110                        issue = 12422,
111                        "Unsupported function name under schema: {}",
112                        schema_name
113                    );
114                };
115                (Some(schema_name), func_name)
116            }
117            _ => bail_not_implemented!(issue = 112, "qualified function {}", name),
118        };
119
120        // FIXME: This is a hack to support [Bytebase queries](https://github.com/TennyZhuang/bytebase/blob/4a26f7c62b80e86e58ad2f77063138dc2f420623/backend/plugin/db/pg/sync.go#L549).
121        // Bytebase widely used the pattern like `obj_description(format('%s.%s',
122        // quote_ident(idx.schemaname), quote_ident(idx.indexname))::regclass) AS comment` to
123        // retrieve object comment, however we don't support casting a non-literal expression to
124        // regclass. We just hack the `obj_description` and `col_description` here, to disable it to
125        // bind its arguments.
126        if func_name == "obj_description" || func_name == "col_description" {
127            return Ok(ExprImpl::literal_varchar("".to_owned()));
128        }
129
130        // special binding logic for `array_transform`
131        if func_name == "array_transform" {
132            // For type inference, we need to bind the array type first.
133            reject_syntax!(
134                scalar_as_agg,
135                "`AGGREGATE:` prefix is not allowed for `array_transform`"
136            );
137            reject_syntax!(
138                !arg_list.is_args_only(),
139                "keywords like `DISTINCT`, `ORDER BY` are not allowed in `array_transform` argument list"
140            );
141            reject_syntax!(
142                within_group.is_some(),
143                "`WITHIN GROUP` is not allowed in `array_transform` call"
144            );
145            reject_syntax!(
146                filter.is_some(),
147                "`FILTER` is not allowed in `array_transform` call"
148            );
149            reject_syntax!(
150                over.is_some(),
151                "`OVER` is not allowed in `array_transform` call"
152            );
153            return self.bind_array_transform(arg_list.args);
154        }
155
156        let mut args: Vec<_> = arg_list
157            .args
158            .iter()
159            .map(|arg| self.bind_function_arg(arg.clone()))
160            .flatten_ok()
161            .try_collect()?;
162
163        let mut referred_udfs = HashSet::new();
164
165        let wrapped_agg_type = if scalar_as_agg {
166            // Let's firstly try to apply the `AGGREGATE:` prefix.
167            // We will reject functions that are not able to be wrapped as aggregate function.
168            let mut array_args = args
169                .iter()
170                .enumerate()
171                .map(|(i, expr)| {
172                    InputRef::new(i, DataType::List(Box::new(expr.return_type()))).into()
173                })
174                .collect_vec();
175            let schema_path = self.bind_schema_path(schema_name.as_deref());
176            let scalar_func_expr = if let Ok((func, _)) = self.catalog.get_function_by_name_inputs(
177                &self.db_name,
178                schema_path,
179                &func_name,
180                &mut array_args,
181            ) {
182                // record the dependency upon the UDF
183                referred_udfs.insert(func.id);
184                self.check_privilege(
185                    PbObject::FunctionId(func.id.function_id()),
186                    self.database_id,
187                    AclMode::Execute,
188                    func.owner,
189                )?;
190
191                if !func.kind.is_scalar() {
192                    return Err(ErrorCode::InvalidInputSyntax(
193                        "expect a scalar function after `AGGREGATE:`".to_owned(),
194                    )
195                    .into());
196                }
197
198                if func.language == "sql" {
199                    self.bind_sql_udf(func.clone(), array_args)?
200                } else {
201                    UserDefinedFunction::new(func.clone(), array_args).into()
202                }
203            } else {
204                self.bind_builtin_scalar_function(&func_name, array_args, arg_list.variadic)?
205            };
206
207            // now this is either an aggregate/window function call
208            Some(AggType::WrapScalar(scalar_func_expr.to_expr_proto()))
209        } else {
210            None
211        };
212
213        let schema_path = self.bind_schema_path(schema_name.as_deref());
214        let udf = if wrapped_agg_type.is_none()
215            && let Ok((func, _)) = self.catalog.get_function_by_name_inputs(
216                &self.db_name,
217                schema_path,
218                &func_name,
219                &mut args,
220            ) {
221            // record the dependency upon the UDF
222            referred_udfs.insert(func.id);
223            self.check_privilege(
224                PbObject::FunctionId(func.id.function_id()),
225                self.database_id,
226                AclMode::Execute,
227                func.owner,
228            )?;
229            Some(func.clone())
230        } else {
231            None
232        };
233
234        self.included_udfs.extend(referred_udfs);
235
236        let agg_type = if wrapped_agg_type.is_some() {
237            wrapped_agg_type
238        } else if let Some(ref udf) = udf
239            && udf.kind.is_aggregate()
240        {
241            assert_ne!(udf.language, "sql", "SQL UDAF is not supported yet");
242            Some(AggType::UserDefined(udf.as_ref().into()))
243        } else {
244            AggType::from_str(&func_name).ok()
245        };
246
247        // try to bind it as a window function call
248        if let Some(over) = over {
249            reject_syntax!(
250                arg_list.distinct,
251                "`DISTINCT` is not allowed in window function call"
252            );
253            reject_syntax!(
254                arg_list.variadic,
255                "`VARIADIC` is not allowed in window function call"
256            );
257            reject_syntax!(
258                !arg_list.order_by.is_empty(),
259                "`ORDER BY` is not allowed in window function call argument list"
260            );
261            reject_syntax!(
262                within_group.is_some(),
263                "`WITHIN GROUP` is not allowed in window function call"
264            );
265
266            let kind = if let Some(agg_type) = agg_type {
267                // aggregate as window function
268                WindowFuncKind::Aggregate(agg_type)
269            } else if let Ok(kind) = WindowFuncKind::from_str(&func_name) {
270                kind
271            } else {
272                bail_not_implemented!(issue = 8961, "Unrecognized window function: {}", func_name);
273            };
274            return self.bind_window_function(kind, args, arg_list.ignore_nulls, filter, over);
275        }
276
277        // now it's an aggregate/scalar/table function call
278        reject_syntax!(
279            arg_list.ignore_nulls,
280            "`IGNORE NULLS` is not allowed in aggregate/scalar/table function call"
281        );
282
283        // try to bind it as an aggregate function call
284        if let Some(agg_type) = agg_type {
285            reject_syntax!(
286                arg_list.variadic,
287                "`VARIADIC` is not allowed in aggregate function call"
288            );
289            return self.bind_aggregate_function(
290                agg_type,
291                arg_list.distinct,
292                args,
293                arg_list.order_by,
294                within_group,
295                filter,
296            );
297        }
298
299        // now it's a scalar/table function call
300        reject_syntax!(
301            arg_list.distinct,
302            "`DISTINCT` is not allowed in scalar/table function call"
303        );
304        reject_syntax!(
305            !arg_list.order_by.is_empty(),
306            "`ORDER BY` is not allowed in scalar/table function call"
307        );
308        reject_syntax!(
309            within_group.is_some(),
310            "`WITHIN GROUP` is not allowed in scalar/table function call"
311        );
312        reject_syntax!(
313            filter.is_some(),
314            "`FILTER` is not allowed in scalar/table function call"
315        );
316
317        // try to bind it as a table function call
318        {
319            // `file_scan` table function
320            if func_name.eq_ignore_ascii_case("file_scan") {
321                reject_syntax!(
322                    arg_list.variadic,
323                    "`VARIADIC` is not allowed in table function call"
324                );
325                self.ensure_table_function_allowed()?;
326                return Ok(TableFunction::new_file_scan(args)?.into());
327            }
328            // `postgres_query` table function
329            if func_name.eq("postgres_query") {
330                reject_syntax!(
331                    arg_list.variadic,
332                    "`VARIADIC` is not allowed in table function call"
333                );
334                self.ensure_table_function_allowed()?;
335                return Ok(TableFunction::new_postgres_query(
336                    &self.catalog,
337                    &self.db_name,
338                    self.bind_schema_path(schema_name.as_deref()),
339                    args,
340                )
341                .context("postgres_query error")?
342                .into());
343            }
344            // `mysql_query` table function
345            if func_name.eq("mysql_query") {
346                reject_syntax!(
347                    arg_list.variadic,
348                    "`VARIADIC` is not allowed in table function call"
349                );
350                self.ensure_table_function_allowed()?;
351                return Ok(TableFunction::new_mysql_query(
352                    &self.catalog,
353                    &self.db_name,
354                    self.bind_schema_path(schema_name.as_deref()),
355                    args,
356                )
357                .context("mysql_query error")?
358                .into());
359            }
360            // UDTF
361            if let Some(ref udf) = udf
362                && udf.kind.is_table()
363            {
364                reject_syntax!(
365                    arg_list.variadic,
366                    "`VARIADIC` is not allowed in table function call"
367                );
368                self.ensure_table_function_allowed()?;
369                if udf.language == "sql" {
370                    return self.bind_sql_udf(udf.clone(), args);
371                }
372                return Ok(TableFunction::new_user_defined(udf.clone(), args).into());
373            }
374            // builtin table function
375            if let Ok(function_type) = TableFunctionType::from_str(&func_name) {
376                reject_syntax!(
377                    arg_list.variadic,
378                    "`VARIADIC` is not allowed in table function call"
379                );
380                self.ensure_table_function_allowed()?;
381                return Ok(TableFunction::new(function_type, args)?.into());
382            }
383        }
384
385        // try to bind it as a scalar function call
386        if let Some(ref udf) = udf {
387            assert!(udf.kind.is_scalar());
388            reject_syntax!(
389                arg_list.variadic,
390                "`VARIADIC` is not allowed in user-defined function call"
391            );
392            if udf.language == "sql" {
393                return self.bind_sql_udf(udf.clone(), args);
394            }
395            return Ok(UserDefinedFunction::new(udf.clone(), args).into());
396        }
397
398        self.bind_builtin_scalar_function(&func_name, args, arg_list.variadic)
399    }
400
401    fn bind_array_transform(&mut self, args: Vec<FunctionArg>) -> Result<ExprImpl> {
402        let [array, lambda] = <[FunctionArg; 2]>::try_from(args).map_err(|args| -> RwError {
403            ErrorCode::BindError(format!(
404                "`array_transform` expect two inputs `array` and `lambda`, but {} were given",
405                args.len()
406            ))
407            .into()
408        })?;
409
410        let bound_array = self.bind_function_arg(array)?;
411        let [bound_array] = <[ExprImpl; 1]>::try_from(bound_array).map_err(|bound_array| -> RwError {
412            ErrorCode::BindError(format!("The `array` argument for `array_transform` should be bound to one argument, but {} were got", bound_array.len()))
413                .into()
414        })?;
415
416        let inner_ty = match bound_array.return_type() {
417            DataType::List(ty) => *ty,
418            real_type => return Err(ErrorCode::BindError(format!(
419                "The `array` argument for `array_transform` should be an array, but {} were got",
420                real_type
421            ))
422            .into()),
423        };
424
425        let ast::FunctionArgExpr::Expr(ast::Expr::LambdaFunction {
426            args: lambda_args,
427            body: lambda_body,
428        }) = lambda.get_expr()
429        else {
430            return Err(ErrorCode::BindError(
431                "The `lambda` argument for `array_transform` should be a lambda function"
432                    .to_owned(),
433            )
434            .into());
435        };
436
437        let [lambda_arg] = <[Ident; 1]>::try_from(lambda_args).map_err(|args| -> RwError {
438            ErrorCode::BindError(format!(
439                "The `lambda` argument for `array_transform` should be a lambda function with one argument, but {} were given",
440                args.len()
441            ))
442            .into()
443        })?;
444
445        let bound_lambda = self.bind_unary_lambda_function(inner_ty, lambda_arg, *lambda_body)?;
446
447        let lambda_ret_type = bound_lambda.return_type();
448        let transform_ret_type = DataType::List(Box::new(lambda_ret_type));
449
450        Ok(ExprImpl::FunctionCallWithLambda(Box::new(
451            FunctionCallWithLambda::new_unchecked(
452                ExprType::ArrayTransform,
453                vec![bound_array],
454                bound_lambda,
455                transform_ret_type,
456            ),
457        )))
458    }
459
460    fn bind_unary_lambda_function(
461        &mut self,
462        input_ty: DataType,
463        arg: Ident,
464        body: ast::Expr,
465    ) -> Result<ExprImpl> {
466        let lambda_args = HashMap::from([(arg.real_value(), (0usize, input_ty))]);
467        let orig_lambda_args = self.context.lambda_args.replace(lambda_args);
468        let body = self.bind_expr_inner(body)?;
469        self.context.lambda_args = orig_lambda_args;
470
471        Ok(body)
472    }
473
474    fn ensure_table_function_allowed(&self) -> Result<()> {
475        if let Some(clause) = self.context.clause {
476            match clause {
477                Clause::JoinOn
478                | Clause::Where
479                | Clause::Having
480                | Clause::Filter
481                | Clause::Values
482                | Clause::Insert
483                | Clause::GeneratedColumn => {
484                    return Err(ErrorCode::InvalidInputSyntax(format!(
485                        "table functions are not allowed in {}",
486                        clause
487                    ))
488                    .into());
489                }
490                Clause::GroupBy | Clause::From => {}
491            }
492        }
493        Ok(())
494    }
495
496    fn bind_sql_udf(
497        &mut self,
498        func: Arc<FunctionCatalog>,
499        args: Vec<ExprImpl>,
500    ) -> Result<ExprImpl> {
501        if func.body.is_none() {
502            return Err(
503                ErrorCode::InvalidInputSyntax("`body` must exist for sql udf".to_owned()).into(),
504            );
505        }
506
507        // This represents the current user defined function is `language sql`
508        let parse_result =
509            risingwave_sqlparser::parser::Parser::parse_sql(func.body.as_ref().unwrap().as_str());
510        if let Err(ParserError::ParserError(err)) | Err(ParserError::TokenizerError(err)) =
511            parse_result
512        {
513            // Here we just return the original parse error message
514            return Err(ErrorCode::InvalidInputSyntax(err).into());
515        }
516
517        debug_assert!(parse_result.is_ok());
518
519        // We can safely unwrap here
520        let ast = parse_result.unwrap();
521
522        // Stash the current `udf_context`
523        // Note that the `udf_context` may be empty,
524        // if the current binding is the root (top-most) sql udf.
525        // In this case the empty context will be stashed
526        // and restored later, no need to maintain other flags.
527        let stashed_udf_context = self.udf_context.get_context();
528
529        // The actual inline logic for sql udf
530        // Note that we will always create new udf context for each sql udf
531        let mut udf_context = HashMap::new();
532        for (i, arg) in args.into_iter().enumerate() {
533            if func.arg_names[i].is_empty() {
534                // unnamed argument, use `$1`, `$2` as the name
535                udf_context.insert(format!("${}", i + 1), arg);
536            } else {
537                // named argument
538                udf_context.insert(func.arg_names[i].clone(), arg);
539            }
540        }
541        self.udf_context.update_context(udf_context);
542
543        // Check for potential recursive calling
544        if self.udf_context.global_count() >= SQL_UDF_MAX_CALLING_DEPTH {
545            return Err(ErrorCode::BindError(format!(
546                "function {} calling stack depth limit exceeded",
547                func.name
548            ))
549            .into());
550        } else {
551            // Update the status for the global counter
552            self.udf_context.incr_global_count();
553        }
554
555        if let Ok(expr) = UdfContext::extract_udf_expression(ast) {
556            let bind_result = self.bind_expr(expr);
557
558            // We should properly decrement global count after a successful binding
559            // Since the subsequent probe operation in `bind_column` or
560            // `bind_parameter` relies on global counting
561            self.udf_context.decr_global_count();
562
563            // Restore context information for subsequent binding
564            self.udf_context.update_context(stashed_udf_context);
565
566            return bind_result;
567        }
568
569        Err(ErrorCode::InvalidInputSyntax(
570            "failed to parse the input query and extract the udf expression,
571                please recheck the syntax"
572                .to_owned(),
573        )
574        .into())
575    }
576
577    pub(in crate::binder) fn bind_function_expr_arg(
578        &mut self,
579        arg_expr: FunctionArgExpr,
580    ) -> Result<Vec<ExprImpl>> {
581        match arg_expr {
582            FunctionArgExpr::Expr(expr) => Ok(vec![self.bind_expr_inner(expr)?]),
583            FunctionArgExpr::QualifiedWildcard(_, _)
584            | FunctionArgExpr::ExprQualifiedWildcard(_, _) => Err(ErrorCode::InvalidInputSyntax(
585                format!("unexpected wildcard {}", arg_expr),
586            )
587            .into()),
588            FunctionArgExpr::Wildcard(None) => Ok(vec![]),
589            FunctionArgExpr::Wildcard(Some(_)) => unreachable!(),
590        }
591    }
592
593    pub(in crate::binder) fn bind_function_arg(
594        &mut self,
595        arg: FunctionArg,
596    ) -> Result<Vec<ExprImpl>> {
597        match arg {
598            FunctionArg::Unnamed(expr) => self.bind_function_expr_arg(expr),
599            FunctionArg::Named { .. } => todo!(),
600        }
601    }
602}