risingwave_frontend/binder/expr/function/
builtin_scalar.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::LazyLock;
17
18use bk_tree::{BKTree, metrics};
19use itertools::Itertools;
20use risingwave_common::session_config::USER_NAME_WILD_CARD;
21use risingwave_common::types::{DataType, ListValue, ScalarImpl, Timestamptz};
22use risingwave_common::{bail_not_implemented, current_cluster_version, no_function};
23use thiserror_ext::AsReport;
24
25use crate::Binder;
26use crate::binder::Clause;
27use crate::error::{ErrorCode, Result};
28use crate::expr::{CastContext, Expr, ExprImpl, ExprType, FunctionCall, Literal, Now};
29
30impl Binder {
31    pub(super) fn bind_builtin_scalar_function(
32        &mut self,
33        function_name: &str,
34        inputs: Vec<ExprImpl>,
35        variadic: bool,
36    ) -> Result<ExprImpl> {
37        type Inputs = Vec<ExprImpl>;
38
39        type Handle = Box<dyn Fn(&mut Binder, Inputs) -> Result<ExprImpl> + Sync + Send>;
40
41        fn rewrite(r#type: ExprType, rewriter: fn(Inputs) -> Result<Inputs>) -> Handle {
42            Box::new(move |_binder, mut inputs| {
43                inputs = (rewriter)(inputs)?;
44                Ok(FunctionCall::new(r#type, inputs)?.into())
45            })
46        }
47
48        fn raw_call(r#type: ExprType) -> Handle {
49            rewrite(r#type, Ok)
50        }
51
52        fn guard_by_len(expected_len: usize, handle: Handle) -> Handle {
53            Box::new(move |binder, inputs| {
54                if inputs.len() == expected_len {
55                    handle(binder, inputs)
56                } else {
57                    Err(ErrorCode::ExprError("unexpected arguments number".into()).into())
58                }
59            })
60        }
61
62        fn raw<F: Fn(&mut Binder, Inputs) -> Result<ExprImpl> + Sync + Send + 'static>(
63            f: F,
64        ) -> Handle {
65            Box::new(f)
66        }
67
68        fn dispatch_by_len(mapping: Vec<(usize, Handle)>) -> Handle {
69            Box::new(move |binder, inputs| {
70                for (len, handle) in &mapping {
71                    if inputs.len() == *len {
72                        return handle(binder, inputs);
73                    }
74                }
75                Err(ErrorCode::ExprError("unexpected arguments number".into()).into())
76            })
77        }
78
79        fn raw_literal(literal: ExprImpl) -> Handle {
80            Box::new(move |_binder, _inputs| Ok(literal.clone()))
81        }
82
83        fn now() -> Handle {
84            guard_by_len(
85                0,
86                raw(move |binder, _inputs| {
87                    binder.ensure_now_function_allowed()?;
88                    // NOTE: this will be further transformed during optimization. See the
89                    // documentation of `Now`.
90                    Ok(Now.into())
91                }),
92            )
93        }
94
95        fn pi() -> Handle {
96            raw_literal(ExprImpl::literal_f64(std::f64::consts::PI))
97        }
98
99        fn proctime() -> Handle {
100            Box::new(move |binder, inputs| {
101                binder.ensure_proctime_function_allowed()?;
102                raw_call(ExprType::Proctime)(binder, inputs)
103            })
104        }
105
106        // `SESSION_USER` is the user name of the user that is connected to the database.
107        fn session_user() -> Handle {
108            guard_by_len(
109                0,
110                raw(|binder, _inputs| {
111                    Ok(ExprImpl::literal_varchar(
112                        binder.auth_context.user_name.clone(),
113                    ))
114                }),
115            )
116        }
117
118        // `CURRENT_USER` is the user name of the user that is executing the command,
119        // `CURRENT_ROLE`, `USER` are synonyms for `CURRENT_USER`. Since we don't support
120        // `SET ROLE xxx` for now, they will all returns session user name.
121        fn current_user() -> Handle {
122            guard_by_len(
123                0,
124                raw(|binder, _inputs| {
125                    Ok(ExprImpl::literal_varchar(
126                        binder.auth_context.user_name.clone(),
127                    ))
128                }),
129            )
130        }
131
132        // `CURRENT_DATABASE` is the name of the database you are currently connected to.
133        // `CURRENT_CATALOG` is a synonym for `CURRENT_DATABASE`.
134        fn current_database() -> Handle {
135            guard_by_len(
136                0,
137                raw(|binder, _inputs| Ok(ExprImpl::literal_varchar(binder.db_name.clone()))),
138            )
139        }
140
141        // XXX: can we unify this with FUNC_SIG_MAP?
142        // For raw_call here, it seems unnecessary to declare it again here.
143        // For some functions, we have validation logic here. Is it still useful now?
144        static HANDLES: LazyLock<HashMap<&'static str, Handle>> = LazyLock::new(|| {
145            [
146                (
147                    "booleq",
148                    rewrite(ExprType::Equal, rewrite_two_bool_inputs),
149                ),
150                (
151                    "boolne",
152                    rewrite(ExprType::NotEqual, rewrite_two_bool_inputs),
153                ),
154                ("coalesce", rewrite(ExprType::Coalesce, |inputs| {
155                    if inputs.iter().any(ExprImpl::has_table_function) {
156                        return Err(ErrorCode::BindError("table functions are not allowed in COALESCE".into()).into());
157                    }
158                    Ok(inputs)
159                })),
160                (
161                    "nullif",
162                    rewrite(ExprType::Case, rewrite_nullif_to_case_when),
163                ),
164                (
165                    "round",
166                    dispatch_by_len(vec![
167                        (2, raw_call(ExprType::RoundDigit)),
168                        (1, raw_call(ExprType::Round)),
169                    ]),
170                ),
171                ("pow", raw_call(ExprType::Pow)),
172                // "power" is the function name used in PG.
173                ("power", raw_call(ExprType::Pow)),
174                ("ceil", raw_call(ExprType::Ceil)),
175                ("ceiling", raw_call(ExprType::Ceil)),
176                ("floor", raw_call(ExprType::Floor)),
177                ("trunc", raw_call(ExprType::Trunc)),
178                ("abs", raw_call(ExprType::Abs)),
179                ("exp", raw_call(ExprType::Exp)),
180                ("ln", raw_call(ExprType::Ln)),
181                ("log", raw_call(ExprType::Log10)),
182                ("log10", raw_call(ExprType::Log10)),
183                ("mod", raw_call(ExprType::Modulus)),
184                ("sin", raw_call(ExprType::Sin)),
185                ("cos", raw_call(ExprType::Cos)),
186                ("tan", raw_call(ExprType::Tan)),
187                ("cot", raw_call(ExprType::Cot)),
188                ("asin", raw_call(ExprType::Asin)),
189                ("acos", raw_call(ExprType::Acos)),
190                ("atan", raw_call(ExprType::Atan)),
191                ("atan2", raw_call(ExprType::Atan2)),
192                ("sind", raw_call(ExprType::Sind)),
193                ("cosd", raw_call(ExprType::Cosd)),
194                ("cotd", raw_call(ExprType::Cotd)),
195                ("tand", raw_call(ExprType::Tand)),
196                ("sinh", raw_call(ExprType::Sinh)),
197                ("cosh", raw_call(ExprType::Cosh)),
198                ("tanh", raw_call(ExprType::Tanh)),
199                ("coth", raw_call(ExprType::Coth)),
200                ("asinh", raw_call(ExprType::Asinh)),
201                ("acosh", raw_call(ExprType::Acosh)),
202                ("atanh", raw_call(ExprType::Atanh)),
203                ("asind", raw_call(ExprType::Asind)),
204                ("acosd", raw_call(ExprType::Acosd)),
205                ("atand", raw_call(ExprType::Atand)),
206                ("atan2d", raw_call(ExprType::Atan2d)),
207                ("degrees", raw_call(ExprType::Degrees)),
208                ("radians", raw_call(ExprType::Radians)),
209                ("sqrt", raw_call(ExprType::Sqrt)),
210                ("cbrt", raw_call(ExprType::Cbrt)),
211                ("sign", raw_call(ExprType::Sign)),
212                ("scale", raw_call(ExprType::Scale)),
213                ("min_scale", raw_call(ExprType::MinScale)),
214                ("trim_scale", raw_call(ExprType::TrimScale)),
215                // date and time
216                (
217                    "to_timestamp",
218                    dispatch_by_len(vec![
219                        (1, raw_call(ExprType::SecToTimestamptz)),
220                        (2, raw_call(ExprType::CharToTimestamptz)),
221                    ]),
222                ),
223                ("date_trunc", raw_call(ExprType::DateTrunc)),
224                ("date_bin", raw_call(ExprType::DateBin)),
225                ("date_part", raw_call(ExprType::DatePart)),
226                ("make_date", raw_call(ExprType::MakeDate)),
227                ("make_time", raw_call(ExprType::MakeTime)),
228                ("make_timestamp", raw_call(ExprType::MakeTimestamp)),
229                ("make_timestamptz", raw_call(ExprType::MakeTimestamptz)),
230                ("timezone", rewrite(ExprType::AtTimeZone, |mut inputs| {
231                    if inputs.len() == 2 {
232                        inputs.swap(0, 1);
233                        Ok(inputs)
234                    } else {
235                        Err(ErrorCode::ExprError("unexpected arguments number".into()).into())
236                    }
237                })),
238                ("to_date", raw_call(ExprType::CharToDate)),
239                // string
240                ("substr", raw_call(ExprType::Substr)),
241                ("length", raw_call(ExprType::Length)),
242                ("upper", raw_call(ExprType::Upper)),
243                ("lower", raw_call(ExprType::Lower)),
244                ("trim", raw_call(ExprType::Trim)),
245                ("replace", raw_call(ExprType::Replace)),
246                ("overlay", raw_call(ExprType::Overlay)),
247                ("btrim", raw_call(ExprType::Trim)),
248                ("ltrim", raw_call(ExprType::Ltrim)),
249                ("rtrim", raw_call(ExprType::Rtrim)),
250                ("md5", raw_call(ExprType::Md5)),
251                ("to_char", raw_call(ExprType::ToChar)),
252                (
253                    "concat",
254                    rewrite(ExprType::ConcatWs, rewrite_concat_to_concat_ws),
255                ),
256                ("concat_ws", raw_call(ExprType::ConcatWs)),
257                ("format", raw_call(ExprType::Format)),
258                ("translate", raw_call(ExprType::Translate)),
259                ("split_part", raw_call(ExprType::SplitPart)),
260                ("char_length", raw_call(ExprType::CharLength)),
261                ("character_length", raw_call(ExprType::CharLength)),
262                ("repeat", raw_call(ExprType::Repeat)),
263                ("ascii", raw_call(ExprType::Ascii)),
264                ("octet_length", raw_call(ExprType::OctetLength)),
265                ("bit_length", raw_call(ExprType::BitLength)),
266                ("regexp_match", raw_call(ExprType::RegexpMatch)),
267                ("regexp_replace", raw_call(ExprType::RegexpReplace)),
268                ("regexp_count", raw_call(ExprType::RegexpCount)),
269                ("regexp_split_to_array", raw_call(ExprType::RegexpSplitToArray)),
270                ("chr", raw_call(ExprType::Chr)),
271                ("starts_with", raw_call(ExprType::StartsWith)),
272                ("initcap", raw_call(ExprType::Initcap)),
273                ("lpad", raw_call(ExprType::Lpad)),
274                ("rpad", raw_call(ExprType::Rpad)),
275                ("reverse", raw_call(ExprType::Reverse)),
276                ("strpos", raw_call(ExprType::Position)),
277                ("to_ascii", raw_call(ExprType::ToAscii)),
278                ("to_hex", raw_call(ExprType::ToHex)),
279                ("quote_ident", raw_call(ExprType::QuoteIdent)),
280                ("quote_literal", guard_by_len(1, raw(|_binder, mut inputs| {
281                    if inputs[0].return_type() != DataType::Varchar {
282                        // Support `quote_literal(any)` by converting it to `quote_literal(any::text)`
283                        // Ref. https://github.com/postgres/postgres/blob/REL_16_1/src/include/catalog/pg_proc.dat#L4641
284                        FunctionCall::cast_mut(&mut inputs[0], DataType::Varchar, CastContext::Explicit)?;
285                    }
286                    Ok(FunctionCall::new_unchecked(ExprType::QuoteLiteral, inputs, DataType::Varchar).into())
287                }))),
288                ("quote_nullable", guard_by_len(1, raw(|_binder, mut inputs| {
289                    if inputs[0].return_type() != DataType::Varchar {
290                        // Support `quote_nullable(any)` by converting it to `quote_nullable(any::text)`
291                        // Ref. https://github.com/postgres/postgres/blob/REL_16_1/src/include/catalog/pg_proc.dat#L4650
292                        FunctionCall::cast_mut(&mut inputs[0], DataType::Varchar, CastContext::Explicit)?;
293                    }
294                    Ok(FunctionCall::new_unchecked(ExprType::QuoteNullable, inputs, DataType::Varchar).into())
295                }))),
296                ("string_to_array", raw_call(ExprType::StringToArray)),
297                ("encode", raw_call(ExprType::Encode)),
298                ("decode", raw_call(ExprType::Decode)),
299                ("convert_from", raw_call(ExprType::ConvertFrom)),
300                ("convert_to", raw_call(ExprType::ConvertTo)),
301                ("sha1", raw_call(ExprType::Sha1)),
302                ("sha224", raw_call(ExprType::Sha224)),
303                ("sha256", raw_call(ExprType::Sha256)),
304                ("sha384", raw_call(ExprType::Sha384)),
305                ("sha512", raw_call(ExprType::Sha512)),
306                ("encrypt", raw_call(ExprType::Encrypt)),
307                ("decrypt", raw_call(ExprType::Decrypt)),
308                ("hmac", raw_call(ExprType::Hmac)),
309                ("secure_compare", raw_call(ExprType::SecureCompare)),
310                ("left", raw_call(ExprType::Left)),
311                ("right", raw_call(ExprType::Right)),
312                ("inet_aton", raw_call(ExprType::InetAton)),
313                ("inet_ntoa", raw_call(ExprType::InetNtoa)),
314                ("int8send", raw_call(ExprType::PgwireSend)),
315                ("int8recv", guard_by_len(1, raw(|_binder, mut inputs| {
316                    // Similar to `cast` from string, return type is set explicitly rather than inferred.
317                    let hint = if !inputs[0].is_untyped() && inputs[0].return_type() == DataType::Varchar {
318                        " Consider `decode` or cast."
319                    } else {
320                        ""
321                    };
322                    inputs[0].cast_implicit_mut(DataType::Bytea).map_err(|e| {
323                        ErrorCode::BindError(format!("{} in `recv`.{hint}", e.as_report()))
324                    })?;
325                    Ok(FunctionCall::new_unchecked(ExprType::PgwireRecv, inputs, DataType::Int64).into())
326                }))),
327                // array
328                ("array_cat", raw_call(ExprType::ArrayCat)),
329                ("array_append", raw_call(ExprType::ArrayAppend)),
330                ("array_join", raw_call(ExprType::ArrayToString)),
331                ("array_prepend", raw_call(ExprType::ArrayPrepend)),
332                ("array_to_string", raw_call(ExprType::ArrayToString)),
333                ("array_distinct", raw_call(ExprType::ArrayDistinct)),
334                ("array_min", raw_call(ExprType::ArrayMin)),
335                ("array_sort", raw_call(ExprType::ArraySort)),
336                ("array_length", raw_call(ExprType::ArrayLength)),
337                ("cardinality", raw_call(ExprType::Cardinality)),
338                ("array_remove", raw_call(ExprType::ArrayRemove)),
339                ("array_replace", raw_call(ExprType::ArrayReplace)),
340                ("array_max", raw_call(ExprType::ArrayMax)),
341                ("array_sum", raw_call(ExprType::ArraySum)),
342                ("array_position", raw_call(ExprType::ArrayPosition)),
343                ("array_positions", raw_call(ExprType::ArrayPositions)),
344                ("array_contains", raw_call(ExprType::ArrayContains)),
345                ("arraycontains", raw_call(ExprType::ArrayContains)),
346                ("array_contained", raw_call(ExprType::ArrayContained)),
347                ("arraycontained", raw_call(ExprType::ArrayContained)),
348                ("array_flatten", guard_by_len(1, raw(|_binder, inputs| {
349                    inputs[0].ensure_array_type().map_err(|_| ErrorCode::BindError("array_flatten expects `any[][]` input".into()))?;
350                    let return_type = inputs[0].return_type().into_list_element_type();
351                    if !return_type.is_array() {
352                        return Err(ErrorCode::BindError("array_flatten expects `any[][]` input".into()).into());
353
354                    }
355                    Ok(FunctionCall::new_unchecked(ExprType::ArrayFlatten, inputs, return_type).into())
356                }))),
357                ("trim_array", raw_call(ExprType::TrimArray)),
358                (
359                    "array_ndims",
360                    guard_by_len(1, raw(|_binder, inputs| {
361                        inputs[0].ensure_array_type()?;
362
363                        let n = inputs[0].return_type().array_ndims()
364                            .try_into().map_err(|_| ErrorCode::BindError("array_ndims integer overflow".into()))?;
365                        Ok(ExprImpl::literal_int(n))
366                    })),
367                ),
368                (
369                    "array_lower",
370                    guard_by_len(2, raw(|binder, inputs| {
371                        let (arg0, arg1) = inputs.into_iter().next_tuple().unwrap();
372                        // rewrite into `CASE WHEN 0 < arg1 AND arg1 <= array_ndims(arg0) THEN 1 END`
373                        let ndims_expr = binder.bind_builtin_scalar_function("array_ndims", vec![arg0], false)?;
374                        let arg1 = arg1.cast_implicit(DataType::Int32)?;
375
376                        FunctionCall::new(
377                            ExprType::Case,
378                            vec![
379                                FunctionCall::new(
380                                    ExprType::And,
381                                    vec![
382                                        FunctionCall::new(ExprType::LessThan, vec![ExprImpl::literal_int(0), arg1.clone()])?.into(),
383                                        FunctionCall::new(ExprType::LessThanOrEqual, vec![arg1, ndims_expr])?.into(),
384                                    ],
385                                )?.into(),
386                                ExprImpl::literal_int(1),
387                            ],
388                        ).map(Into::into)
389                    })),
390                ),
391                ("array_upper", raw_call(ExprType::ArrayLength)), // `lower == 1` implies `upper == length`
392                ("array_dims", raw_call(ExprType::ArrayDims)),
393                // int256
394                ("hex_to_int256", raw_call(ExprType::HexToInt256)),
395                // jsonb
396                ("jsonb_object_field", raw_call(ExprType::JsonbAccess)),
397                ("jsonb_array_element", raw_call(ExprType::JsonbAccess)),
398                ("jsonb_object_field_text", raw_call(ExprType::JsonbAccessStr)),
399                ("jsonb_array_element_text", raw_call(ExprType::JsonbAccessStr)),
400                ("jsonb_extract_path", raw_call(ExprType::JsonbExtractPath)),
401                ("jsonb_extract_path_text", raw_call(ExprType::JsonbExtractPathText)),
402                ("jsonb_typeof", raw_call(ExprType::JsonbTypeof)),
403                ("jsonb_array_length", raw_call(ExprType::JsonbArrayLength)),
404                ("jsonb_concat", raw_call(ExprType::JsonbConcat)),
405                ("jsonb_object", raw_call(ExprType::JsonbObject)),
406                ("jsonb_pretty", raw_call(ExprType::JsonbPretty)),
407                ("jsonb_contains", raw_call(ExprType::JsonbContains)),
408                ("jsonb_contained", raw_call(ExprType::JsonbContained)),
409                ("jsonb_exists", raw_call(ExprType::JsonbExists)),
410                ("jsonb_exists_any", raw_call(ExprType::JsonbExistsAny)),
411                ("jsonb_exists_all", raw_call(ExprType::JsonbExistsAll)),
412                ("jsonb_delete", raw_call(ExprType::Subtract)),
413                ("jsonb_delete_path", raw_call(ExprType::JsonbDeletePath)),
414                ("jsonb_strip_nulls", raw_call(ExprType::JsonbStripNulls)),
415                ("to_jsonb", raw_call(ExprType::ToJsonb)),
416                ("jsonb_build_array", raw_call(ExprType::JsonbBuildArray)),
417                ("jsonb_build_object", raw_call(ExprType::JsonbBuildObject)),
418                ("jsonb_populate_record", raw_call(ExprType::JsonbPopulateRecord)),
419                ("jsonb_path_match", raw_call(ExprType::JsonbPathMatch)),
420                ("jsonb_path_exists", raw_call(ExprType::JsonbPathExists)),
421                ("jsonb_path_query_array", raw_call(ExprType::JsonbPathQueryArray)),
422                ("jsonb_path_query_first", raw_call(ExprType::JsonbPathQueryFirst)),
423                ("jsonb_set", raw_call(ExprType::JsonbSet)),
424                ("jsonb_populate_map", raw_call(ExprType::JsonbPopulateMap)),
425                // map
426                ("map_from_entries", raw_call(ExprType::MapFromEntries)),
427                ("map_access", raw_call(ExprType::MapAccess)),
428                ("map_keys", raw_call(ExprType::MapKeys)),
429                ("map_values", raw_call(ExprType::MapValues)),
430                ("map_entries", raw_call(ExprType::MapEntries)),
431                ("map_from_key_values", raw_call(ExprType::MapFromKeyValues)),
432                ("map_cat", raw_call(ExprType::MapCat)),
433                ("map_contains", raw_call(ExprType::MapContains)),
434                ("map_delete", raw_call(ExprType::MapDelete)),
435                ("map_insert", raw_call(ExprType::MapInsert)),
436                ("map_length", raw_call(ExprType::MapLength)),
437                // Functions that return a constant value
438                ("pi", pi()),
439                // greatest and least
440                ("greatest", raw_call(ExprType::Greatest)),
441                ("least", raw_call(ExprType::Least)),
442                // System information operations.
443                (
444                    "pg_typeof",
445                    guard_by_len(1, raw(|_binder, inputs| {
446                        let input = &inputs[0];
447                        let v = match input.is_untyped() {
448                            true => "unknown".into(),
449                            false => input.return_type().to_string(),
450                        };
451                        Ok(ExprImpl::literal_varchar(v))
452                    })),
453                ),
454                ("current_catalog", current_database()),
455                ("current_database", current_database()),
456                ("current_schema", guard_by_len(0, raw(|binder, _inputs| {
457                    Ok(binder
458                        .first_valid_schema()
459                        .map(|schema| ExprImpl::literal_varchar(schema.name()))
460                        .unwrap_or_else(|_| ExprImpl::literal_null(DataType::Varchar)))
461                }))),
462                ("current_schemas", raw(|binder, mut inputs| {
463                    let no_match_err = ErrorCode::ExprError(
464                        "No function matches the given name and argument types. You might need to add explicit type casts.".into()
465                    );
466                    if inputs.len() != 1 {
467                        return Err(no_match_err.into());
468                    }
469                    let input = inputs
470                        .pop()
471                        .unwrap()
472                        .enforce_bool_clause("current_schemas")
473                        .map_err(|_| no_match_err)?;
474
475                    let ExprImpl::Literal(literal) = &input else {
476                        bail_not_implemented!("Only boolean literals are supported in `current_schemas`.");
477                    };
478
479                    let Some(bool) = literal.get_data().as_ref().map(|bool| bool.clone().into_bool()) else {
480                        return Ok(ExprImpl::literal_null(DataType::List(Box::new(DataType::Varchar))));
481                    };
482
483                    let paths = if bool {
484                        binder.search_path.path()
485                    } else {
486                        binder.search_path.real_path()
487                    };
488
489                    let mut schema_names = vec![];
490                    for path in paths {
491                        let mut schema_name = path;
492                        if schema_name == USER_NAME_WILD_CARD {
493                            schema_name = &binder.auth_context.user_name;
494                        }
495
496                        if binder
497                            .catalog
498                            .get_schema_by_name(&binder.db_name, schema_name)
499                            .is_ok()
500                        {
501                            schema_names.push(schema_name.as_str());
502                        }
503                    }
504
505                    Ok(ExprImpl::literal_list(
506                        ListValue::from_iter(schema_names),
507                        DataType::Varchar,
508                    ))
509                })),
510                ("session_user", session_user()),
511                ("current_role", current_user()),
512                ("current_user", current_user()),
513                ("user", current_user()),
514                ("pg_get_userbyid", raw_call(ExprType::PgGetUserbyid)),
515                ("pg_get_indexdef", raw_call(ExprType::PgGetIndexdef)),
516                ("pg_get_viewdef", raw_call(ExprType::PgGetViewdef)),
517                ("pg_index_column_has_property", raw_call(ExprType::PgIndexColumnHasProperty)),
518                ("pg_relation_size", raw(|_binder, mut inputs| {
519                    if inputs.is_empty() {
520                        return Err(ErrorCode::ExprError(
521                            "function pg_relation_size() does not exist".into(),
522                        )
523                            .into());
524                    }
525                    inputs[0].cast_to_regclass_mut()?;
526                    Ok(FunctionCall::new(ExprType::PgRelationSize, inputs)?.into())
527                })),
528                ("pg_get_serial_sequence", raw_literal(ExprImpl::literal_null(DataType::Varchar))),
529                ("pg_table_size", guard_by_len(1, raw(|_binder, mut inputs| {
530                    inputs[0].cast_to_regclass_mut()?;
531                    Ok(FunctionCall::new(ExprType::PgRelationSize, inputs)?.into())
532                }))),
533                ("pg_indexes_size", guard_by_len(1, raw(|_binder, mut inputs| {
534                    inputs[0].cast_to_regclass_mut()?;
535                    Ok(FunctionCall::new(ExprType::PgIndexesSize, inputs)?.into())
536                }))),
537                ("pg_get_expr", raw(|_binder, inputs| {
538                    if inputs.len() == 2 || inputs.len() == 3 {
539                        // TODO: implement pg_get_expr rather than just return empty as an workaround.
540                        Ok(ExprImpl::literal_varchar("".into()))
541                    } else {
542                        Err(ErrorCode::ExprError(
543                            "Too many/few arguments for pg_catalog.pg_get_expr()".into(),
544                        )
545                            .into())
546                    }
547                })),
548                ("pg_my_temp_schema", guard_by_len(0, raw(|_binder, _inputs| {
549                    // Returns the OID of the current session's temporary schema, or zero if it has none (because it has not created any temporary tables).
550                    Ok(ExprImpl::literal_int(
551                        // always return 0, as we haven't supported temporary tables nor temporary schema yet
552                        0,
553                    ))
554                }))),
555                ("current_setting", guard_by_len(1, raw(|binder, inputs| {
556                    let input = &inputs[0];
557                    let input = if let ExprImpl::Literal(literal) = input &&
558                        let Some(ScalarImpl::Utf8(input)) = literal.get_data()
559                    {
560                        input
561                    } else {
562                        return Err(ErrorCode::ExprError(
563                            "Only literal is supported in `setting_name`.".into(),
564                        )
565                            .into());
566                    };
567                    let session_config = binder.session_config.read();
568                    Ok(ExprImpl::literal_varchar(session_config.get(input.as_ref())?))
569                }))),
570                ("set_config", guard_by_len(3, raw(|binder, inputs| {
571                    let setting_name = if let ExprImpl::Literal(literal) = &inputs[0] && let Some(ScalarImpl::Utf8(input)) = literal.get_data() {
572                        input
573                    } else {
574                        return Err(ErrorCode::ExprError(
575                            "Only string literal is supported in `setting_name`.".into(),
576                        )
577                            .into());
578                    };
579
580                    let new_value = if let ExprImpl::Literal(literal) = &inputs[1] && let Some(ScalarImpl::Utf8(input)) = literal.get_data() {
581                        input
582                    } else {
583                        return Err(ErrorCode::ExprError(
584                            "Only string literal is supported in `setting_name`.".into(),
585                        )
586                            .into());
587                    };
588
589                    let is_local = if let ExprImpl::Literal(literal) = &inputs[2] && let Some(ScalarImpl::Bool(input)) = literal.get_data() {
590                        input
591                    } else {
592                        return Err(ErrorCode::ExprError(
593                            "Only bool literal is supported in `is_local`.".into(),
594                        )
595                            .into());
596                    };
597
598                    if *is_local {
599                        return Err(ErrorCode::ExprError(
600                            "`is_local = true` is not supported now.".into(),
601                        )
602                            .into());
603                    }
604
605                    let mut session_config = binder.session_config.write();
606
607                    // TODO: report session config changes if necessary.
608                    session_config.set(setting_name, new_value.to_string(), &mut ())?;
609
610                    Ok(ExprImpl::literal_varchar(new_value.to_string()))
611                }))),
612                ("format_type", raw_call(ExprType::FormatType)),
613                ("pg_table_is_visible", raw_call(ExprType::PgTableIsVisible)),
614                ("pg_type_is_visible", raw_literal(ExprImpl::literal_bool(true))),
615                ("pg_get_constraintdef", raw_literal(ExprImpl::literal_null(DataType::Varchar))),
616                ("pg_get_partkeydef", raw_literal(ExprImpl::literal_null(DataType::Varchar))),
617                ("pg_encoding_to_char", raw_literal(ExprImpl::literal_varchar("UTF8".into()))),
618                ("has_database_privilege", raw_literal(ExprImpl::literal_bool(true))),
619                ("has_table_privilege", raw(|binder, mut inputs| {
620                    if inputs.len() == 2 {
621                        inputs.insert(0, ExprImpl::literal_varchar(binder.auth_context.user_name.clone()));
622                    }
623                    if inputs.len() == 3 {
624                        if inputs[1].return_type() == DataType::Varchar {
625                            inputs[1].cast_to_regclass_mut()?;
626                        }
627                        Ok(FunctionCall::new(ExprType::HasTablePrivilege, inputs)?.into())
628                    } else {
629                        Err(ErrorCode::ExprError(
630                            "Too many/few arguments for pg_catalog.has_table_privilege()".into(),
631                        )
632                            .into())
633                    }
634                })),
635                ("has_any_column_privilege", raw(|binder, mut inputs| {
636                    if inputs.len() == 2 {
637                        inputs.insert(0, ExprImpl::literal_varchar(binder.auth_context.user_name.clone()));
638                    }
639                    if inputs.len() == 3 {
640                        if inputs[1].return_type() == DataType::Varchar {
641                            inputs[1].cast_to_regclass_mut()?;
642                        }
643                        Ok(FunctionCall::new(ExprType::HasAnyColumnPrivilege, inputs)?.into())
644                    } else {
645                        Err(ErrorCode::ExprError(
646                            "Too many/few arguments for pg_catalog.has_any_column_privilege()".into(),
647                        )
648                            .into())
649                    }
650                })),
651                ("has_schema_privilege", raw(|binder, mut inputs| {
652                    if inputs.len() == 2 {
653                        inputs.insert(0, ExprImpl::literal_varchar(binder.auth_context.user_name.clone()));
654                    }
655                    if inputs.len() == 3 {
656                        Ok(FunctionCall::new(ExprType::HasSchemaPrivilege, inputs)?.into())
657                    } else {
658                        Err(ErrorCode::ExprError(
659                            "Too many/few arguments for pg_catalog.has_schema_privilege()".into(),
660                        )
661                            .into())
662                    }
663                })),
664                ("has_function_privilege", raw(|binder, mut inputs| {
665                    if inputs.len() == 2 {
666                        inputs.insert(0, ExprImpl::literal_varchar(binder.auth_context.user_name.clone()));
667                    }
668                    if inputs.len() == 3 {
669                        Ok(FunctionCall::new(ExprType::HasFunctionPrivilege, inputs)?.into())
670                    } else {
671                        Err(ErrorCode::ExprError(
672                            "Too many/few arguments for pg_catalog.has_function_privilege()".into(),
673                        )
674                            .into())
675                    }
676                })),
677                ("pg_stat_get_numscans", raw_literal(ExprImpl::literal_bigint(0))),
678                ("pg_backend_pid", raw(|binder, _inputs| {
679                    // FIXME: the session id is not global unique in multi-frontend env.
680                    Ok(ExprImpl::literal_int(binder.session_id.0))
681                })),
682                ("pg_cancel_backend", guard_by_len(1, raw(|_binder, _inputs| {
683                    // TODO: implement real cancel rather than just return false as an workaround.
684                    Ok(ExprImpl::literal_bool(false))
685                }))),
686                ("pg_terminate_backend", guard_by_len(1, raw(|_binder, _inputs| {
687                    // TODO: implement real terminate rather than just return false as an
688                    // workaround.
689                    Ok(ExprImpl::literal_bool(false))
690                }))),
691                ("pg_tablespace_location", guard_by_len(1, raw_literal(ExprImpl::literal_null(DataType::Varchar)))),
692                ("pg_postmaster_start_time", guard_by_len(0, raw(|_binder, _inputs| {
693                    let server_start_time = risingwave_variables::get_server_start_time();
694                    let datum = server_start_time.map(Timestamptz::from).map(ScalarImpl::from);
695                    let literal = Literal::new(datum, DataType::Timestamptz);
696                    Ok(literal.into())
697                }))),
698                // TODO: really implement them.
699                // https://www.postgresql.org/docs/9.5/functions-info.html#FUNCTIONS-INFO-COMMENT-TABLE
700                // WARN: Hacked in [`Binder::bind_function`]!!!
701                ("col_description", raw_call(ExprType::ColDescription)),
702                ("obj_description", raw_literal(ExprImpl::literal_varchar("".to_owned()))),
703                ("shobj_description", raw_literal(ExprImpl::literal_varchar("".to_owned()))),
704                ("pg_is_in_recovery", raw_call(ExprType::PgIsInRecovery)),
705                ("rw_recovery_status", raw_call(ExprType::RwRecoveryStatus)),
706                ("rw_epoch_to_ts", raw_call(ExprType::RwEpochToTs)),
707                // internal
708                ("rw_vnode", raw_call(ExprType::VnodeUser)),
709                ("rw_license", raw_call(ExprType::License)),
710                ("rw_test_paid_tier", raw_call(ExprType::TestPaidTier)), // for testing purposes
711                // TODO: choose which pg version we should return.
712                ("version", raw_literal(ExprImpl::literal_varchar(current_cluster_version()))),
713                // non-deterministic
714                ("now", now()),
715                ("current_timestamp", now()),
716                ("proctime", proctime()),
717                ("pg_sleep", raw_call(ExprType::PgSleep)),
718                ("pg_sleep_for", raw_call(ExprType::PgSleepFor)),
719                // TODO: implement pg_sleep_until
720                // ("pg_sleep_until", raw_call(ExprType::PgSleepUntil)),
721
722                // cast functions
723                // only functions required by the existing PostgreSQL tool are implemented
724                ("date", guard_by_len(1, raw(|_binder, inputs| {
725                    inputs[0].clone().cast_explicit(DataType::Date).map_err(Into::into)
726                }))),
727            ]
728                .into_iter()
729                .collect()
730        });
731
732        static FUNCTIONS_BKTREE: LazyLock<BKTree<&str>> = LazyLock::new(|| {
733            let mut tree = BKTree::new(metrics::Levenshtein);
734
735            // TODO: Also hint other functinos, e.g., Agg or UDF.
736            for k in HANDLES.keys() {
737                tree.add(*k);
738            }
739
740            tree
741        });
742
743        if variadic {
744            let func = match function_name {
745                "format" => ExprType::FormatVariadic,
746                "concat" => ExprType::ConcatVariadic,
747                "concat_ws" => ExprType::ConcatWsVariadic,
748                "jsonb_build_array" => ExprType::JsonbBuildArrayVariadic,
749                "jsonb_build_object" => ExprType::JsonbBuildObjectVariadic,
750                "jsonb_extract_path" => ExprType::JsonbExtractPathVariadic,
751                "jsonb_extract_path_text" => ExprType::JsonbExtractPathTextVariadic,
752                _ => {
753                    return Err(ErrorCode::BindError(format!(
754                        "VARIADIC argument is not allowed in function \"{}\"",
755                        function_name
756                    ))
757                    .into());
758                }
759            };
760            return Ok(FunctionCall::new(func, inputs)?.into());
761        }
762
763        // Note: for raw_call, we only check name here. The type check is done later.
764        match HANDLES.get(function_name) {
765            Some(handle) => handle(self, inputs),
766            None => {
767                let allowed_distance = if function_name.len() > 3 { 2 } else { 1 };
768
769                let candidates = FUNCTIONS_BKTREE
770                    .find(function_name, allowed_distance)
771                    .map(|(_idx, c)| c)
772                    .join(" or ");
773
774                Err(no_function!(
775                    candidates = (!candidates.is_empty()).then_some(candidates),
776                    "{}({})",
777                    function_name,
778                    inputs.iter().map(|e| e.return_type()).join(", ")
779                )
780                .into())
781            }
782        }
783    }
784
785    fn ensure_now_function_allowed(&self) -> Result<()> {
786        if self.is_for_stream()
787            && !matches!(
788                self.context.clause,
789                Some(Clause::Where)
790                    | Some(Clause::Having)
791                    | Some(Clause::JoinOn)
792                    | Some(Clause::From)
793            )
794        {
795            return Err(ErrorCode::InvalidInputSyntax(format!(
796                "For streaming queries, `NOW()` function is only allowed in `WHERE`, `HAVING`, `ON` and `FROM`. Found in clause: {:?}. \
797                Please please refer to https://www.risingwave.dev/docs/current/sql-pattern-temporal-filters/ for more information",
798                self.context.clause
799            ))
800                .into());
801        }
802        if matches!(self.context.clause, Some(Clause::GeneratedColumn)) {
803            return Err(ErrorCode::InvalidInputSyntax(
804                "Cannot use `NOW()` function in generated columns. Do you want `PROCTIME()`?"
805                    .to_owned(),
806            )
807            .into());
808        }
809        Ok(())
810    }
811
812    fn ensure_proctime_function_allowed(&self) -> Result<()> {
813        if !self.is_for_ddl() {
814            return Err(ErrorCode::InvalidInputSyntax(
815                "Function `PROCTIME()` is only allowed in CREATE TABLE/SOURCE. Is `NOW()` what you want?".to_owned(),
816            )
817                .into());
818        }
819        Ok(())
820    }
821}
822
823fn rewrite_concat_to_concat_ws(inputs: Vec<ExprImpl>) -> Result<Vec<ExprImpl>> {
824    if inputs.is_empty() {
825        Err(ErrorCode::BindError(
826            "Function `concat` takes at least 1 arguments (0 given)".to_owned(),
827        )
828        .into())
829    } else {
830        let inputs = std::iter::once(ExprImpl::literal_varchar("".to_owned()))
831            .chain(inputs)
832            .collect();
833        Ok(inputs)
834    }
835}
836
837/// Make sure inputs only have 2 value and rewrite the arguments.
838/// Nullif(expr1,expr2) -> Case(Equal(expr1 = expr2),null,expr1).
839fn rewrite_nullif_to_case_when(inputs: Vec<ExprImpl>) -> Result<Vec<ExprImpl>> {
840    if inputs.len() != 2 {
841        Err(ErrorCode::BindError("Function `nullif` must contain 2 arguments".to_owned()).into())
842    } else {
843        let inputs = vec![
844            FunctionCall::new(ExprType::Equal, inputs.clone())?.into(),
845            Literal::new(None, inputs[0].return_type()).into(),
846            inputs[0].clone(),
847        ];
848        Ok(inputs)
849    }
850}
851
852fn rewrite_two_bool_inputs(mut inputs: Vec<ExprImpl>) -> Result<Vec<ExprImpl>> {
853    if inputs.len() != 2 {
854        return Err(
855            ErrorCode::BindError("function must contain only 2 arguments".to_owned()).into(),
856        );
857    }
858    let left = inputs.pop().unwrap();
859    let right = inputs.pop().unwrap();
860    Ok(vec![
861        left.cast_implicit(DataType::Boolean)?,
862        right.cast_implicit(DataType::Boolean)?,
863    ])
864}