risingwave_frontend/expr/
table_function.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use anyhow::Context;
18use itertools::Itertools;
19use mysql_async::consts::ColumnType as MySqlColumnType;
20use mysql_async::prelude::*;
21use risingwave_common::array::arrow::IcebergArrowConvert;
22use risingwave_common::secret::LocalSecretManager;
23use risingwave_common::types::{DataType, ScalarImpl, StructType};
24use risingwave_connector::source::iceberg::{
25    FileScanBackend, extract_bucket_and_file_name, get_parquet_fields, list_data_directory,
26    new_azblob_operator, new_gcs_operator, new_s3_operator,
27};
28use risingwave_pb::expr::PbTableFunction;
29pub use risingwave_pb::expr::table_function::PbType as TableFunctionType;
30use thiserror_ext::AsReport;
31use tokio_postgres::types::Type as TokioPgType;
32
33use super::{Expr, ExprImpl, ExprRewriter, Literal, RwResult, infer_type};
34use crate::catalog::catalog_service::CatalogReadGuard;
35use crate::catalog::function_catalog::{FunctionCatalog, FunctionKind};
36use crate::catalog::root_catalog::SchemaPath;
37use crate::error::ErrorCode::BindError;
38use crate::utils::FRONTEND_RUNTIME;
39
40const INLINE_ARG_LEN: usize = 6;
41const CDC_SOURCE_ARG_LEN: usize = 2;
42
43/// A table function takes a row as input and returns a table. It is also known as Set-Returning
44/// Function.
45///
46/// See also [`TableFunction`](risingwave_expr::table_function::TableFunction) trait in expr crate
47/// and [`ProjectSetSelectItem`](risingwave_pb::expr::ProjectSetSelectItem).
48#[derive(Clone, Eq, PartialEq, Hash)]
49pub struct TableFunction {
50    pub args: Vec<ExprImpl>,
51    pub return_type: DataType,
52    pub function_type: TableFunctionType,
53    /// Catalog of user defined table function.
54    pub user_defined: Option<Arc<FunctionCatalog>>,
55}
56
57impl TableFunction {
58    /// Create a `TableFunction` expr with the return type inferred from `func_type` and types of
59    /// `inputs`.
60    pub fn new(func_type: TableFunctionType, mut args: Vec<ExprImpl>) -> RwResult<Self> {
61        let return_type = infer_type(func_type.into(), &mut args)?;
62        Ok(TableFunction {
63            args,
64            return_type,
65            function_type: func_type,
66            user_defined: None,
67        })
68    }
69
70    /// Create a user-defined `TableFunction`.
71    pub fn new_user_defined(catalog: Arc<FunctionCatalog>, args: Vec<ExprImpl>) -> Self {
72        let FunctionKind::Table = &catalog.kind else {
73            panic!("not a table function");
74        };
75        TableFunction {
76            args,
77            return_type: catalog.return_type.clone(),
78            function_type: TableFunctionType::UserDefined,
79            user_defined: Some(catalog),
80        }
81    }
82
83    /// A special table function which would be transformed into `LogicalFileScan` by `TableFunctionToFileScanRule` in the optimizer.
84    /// select * from `file_scan`('parquet', 's3', region, ak, sk, location)
85    pub fn new_file_scan(mut args: Vec<ExprImpl>) -> RwResult<Self> {
86        let return_type = {
87            // arguments:
88            // file format e.g. parquet
89            // storage type e.g. s3, gcs, azblob
90            // For s3: file_scan('parquet', 's3', s3_region, s3_access_key, s3_secret_key, file_location_or_directory)
91            // For gcs: file_scan('parquet', 'gcs', credential, file_location_or_directory)
92            // For azblob: file_scan('parquet', 'azblob', endpoint, account_name, account_key, file_location)
93            let mut eval_args: Vec<String> = vec![];
94            for arg in &args {
95                if arg.return_type() != DataType::Varchar {
96                    return Err(BindError(
97                        "file_scan function only accepts string arguments".to_owned(),
98                    )
99                    .into());
100                }
101                match arg.try_fold_const() {
102                    Some(Ok(value)) => {
103                        if value.is_none() {
104                            return Err(BindError(
105                                "file_scan function does not accept null arguments".to_owned(),
106                            )
107                            .into());
108                        }
109                        match value {
110                            Some(ScalarImpl::Utf8(s)) => {
111                                eval_args.push(s.to_string());
112                            }
113                            _ => {
114                                return Err(BindError(
115                                    "file_scan function only accepts string arguments".to_owned(),
116                                )
117                                .into());
118                            }
119                        }
120                    }
121                    Some(Err(err)) => {
122                        return Err(err);
123                    }
124                    None => {
125                        return Err(BindError(
126                            "file_scan function only accepts constant arguments".to_owned(),
127                        )
128                        .into());
129                    }
130                }
131            }
132
133            if (eval_args.len() != 4 && eval_args.len() != 6)
134                || (eval_args.len() == 4 && !"gcs".eq_ignore_ascii_case(&eval_args[1]))
135                || (eval_args.len() == 6
136                    && !"s3".eq_ignore_ascii_case(&eval_args[1])
137                    && !"azblob".eq_ignore_ascii_case(&eval_args[1]))
138            {
139                return Err(BindError(
140                "file_scan function supports three backends: s3, gcs, and azblob. Their formats are as follows: \n
141                    file_scan('parquet', 's3', s3_region, s3_access_key, s3_secret_key, file_location) \n
142                    file_scan('parquet', 'gcs', credential, service_account, file_location) \n
143                    file_scan('parquet', 'azblob', endpoint, account_name, account_key, file_location)"
144                        .to_owned(),
145                )
146                .into());
147            }
148            if !"parquet".eq_ignore_ascii_case(&eval_args[0]) {
149                return Err(BindError(
150                    "file_scan function only accepts 'parquet' as file format".to_owned(),
151                )
152                .into());
153            }
154
155            if !"s3".eq_ignore_ascii_case(&eval_args[1])
156                && !"gcs".eq_ignore_ascii_case(&eval_args[1])
157                && !"azblob".eq_ignore_ascii_case(&eval_args[1])
158            {
159                return Err(BindError(
160                    "file_scan function only accepts 's3', 'gcs' or 'azblob' as storage type"
161                        .to_owned(),
162                )
163                .into());
164            }
165
166            #[cfg(madsim)]
167            return Err(crate::error::ErrorCode::BindError(
168                "file_scan can't be used in the madsim mode".to_string(),
169            )
170            .into());
171
172            #[cfg(not(madsim))]
173            {
174                let (file_scan_backend, input_file_location) =
175                    if "s3".eq_ignore_ascii_case(&eval_args[1]) {
176                        (FileScanBackend::S3, eval_args[5].clone())
177                    } else if "gcs".eq_ignore_ascii_case(&eval_args[1]) {
178                        (FileScanBackend::Gcs, eval_args[3].clone())
179                    } else if "azblob".eq_ignore_ascii_case(&eval_args[1]) {
180                        (FileScanBackend::Azblob, eval_args[5].clone())
181                    } else {
182                        unreachable!();
183                    };
184                let op = match file_scan_backend {
185                    FileScanBackend::S3 => {
186                        let (bucket, _) = extract_bucket_and_file_name(
187                            &eval_args[5].clone(),
188                            &file_scan_backend,
189                        )?;
190
191                        let (s3_region, s3_endpoint) = match eval_args[2].starts_with("http") {
192                            true => ("us-east-1".to_owned(), eval_args[2].clone()), /* for minio, hard code region as not used but needed. */
193                            false => (
194                                eval_args[2].clone(),
195                                format!("https://{}.s3.{}.amazonaws.com", bucket, eval_args[2],),
196                            ),
197                        };
198                        new_s3_operator(
199                            s3_region.clone(),
200                            eval_args[3].clone(),
201                            eval_args[4].clone(),
202                            bucket.clone(),
203                            s3_endpoint.clone(),
204                        )?
205                    }
206                    FileScanBackend::Gcs => {
207                        let (bucket, _) =
208                            extract_bucket_and_file_name(&input_file_location, &file_scan_backend)?;
209
210                        new_gcs_operator(eval_args[2].clone(), bucket.clone())?
211                    }
212                    FileScanBackend::Azblob => {
213                        let (bucket, _) =
214                            extract_bucket_and_file_name(&input_file_location, &file_scan_backend)?;
215
216                        new_azblob_operator(
217                            eval_args[2].clone(),
218                            eval_args[3].clone(),
219                            eval_args[4].clone(),
220                            bucket.clone(),
221                        )?
222                    }
223                };
224                let files = if input_file_location.ends_with('/') {
225                    let files = tokio::task::block_in_place(|| {
226                        FRONTEND_RUNTIME.block_on(async {
227                            let files = list_data_directory(
228                                op.clone(),
229                                input_file_location.clone(),
230                                &file_scan_backend,
231                            )
232                            .await?;
233
234                            Ok::<Vec<String>, anyhow::Error>(files)
235                        })
236                    })?;
237                    if files.is_empty() {
238                        return Err(BindError(
239                            "file_scan function only accepts non-empty directory".to_owned(),
240                        )
241                        .into());
242                    }
243
244                    Some(files)
245                } else {
246                    None
247                };
248                let schema = tokio::task::block_in_place(|| {
249                    FRONTEND_RUNTIME.block_on(async {
250                        let location = match files.as_ref() {
251                            Some(files) => files[0].clone(),
252                            None => input_file_location.clone(),
253                        };
254                        let (_, file_name) =
255                            extract_bucket_and_file_name(&location, &file_scan_backend)?;
256
257                        let fields = get_parquet_fields(op, file_name).await?;
258
259                        let mut rw_types = vec![];
260                        for field in &fields {
261                            rw_types.push((
262                                field.name().to_string(),
263                                IcebergArrowConvert.type_from_field(field)?,
264                            ));
265                        }
266
267                        Ok::<risingwave_common::types::DataType, anyhow::Error>(DataType::Struct(
268                            StructType::new(rw_types),
269                        ))
270                    })
271                })?;
272
273                if let Some(files) = files {
274                    // if the file location is a directory, we need to remove the last argument and add all files in the directory as arguments
275                    match file_scan_backend {
276                        FileScanBackend::S3 => args.remove(5),
277                        FileScanBackend::Gcs => args.remove(3),
278                        FileScanBackend::Azblob => args.remove(5),
279                    };
280                    for file in files {
281                        args.push(ExprImpl::Literal(Box::new(Literal::new(
282                            Some(ScalarImpl::Utf8(file.into())),
283                            DataType::Varchar,
284                        ))));
285                    }
286                }
287
288                schema
289            }
290        };
291
292        Ok(TableFunction {
293            args,
294            return_type,
295            function_type: TableFunctionType::FileScan,
296            user_defined: None,
297        })
298    }
299
300    fn handle_postgres_or_mysql_query_args(
301        catalog_reader: &CatalogReadGuard,
302        db_name: &str,
303        schema_path: SchemaPath<'_>,
304        args: Vec<ExprImpl>,
305        expect_connector_name: &str,
306    ) -> RwResult<Vec<ExprImpl>> {
307        let cast_args = match args.len() {
308            INLINE_ARG_LEN => {
309                let mut cast_args = Vec::with_capacity(INLINE_ARG_LEN);
310                for arg in args {
311                    let arg = arg.cast_implicit(DataType::Varchar)?;
312                    cast_args.push(arg);
313                }
314                cast_args
315            }
316            CDC_SOURCE_ARG_LEN => {
317                let source_name = expr_impl_to_string_fn(&args[0])?;
318                let source_catalog = catalog_reader
319                    .get_source_by_name(db_name, schema_path, &source_name)?
320                    .0;
321                if !source_catalog
322                    .connector_name()
323                    .eq_ignore_ascii_case(expect_connector_name)
324                {
325                    return Err(BindError(format!("TVF function only accepts `mysql-cdc` and `postgres-cdc` source. Expected: {}, but got: {}", expect_connector_name, source_catalog.connector_name())).into());
326                }
327
328                let (props, secret_refs) = source_catalog.with_properties.clone().into_parts();
329                let secret_resolved =
330                    LocalSecretManager::global().fill_secrets(props, secret_refs)?;
331
332                vec![
333                    ExprImpl::literal_varchar(secret_resolved["hostname"].clone()),
334                    ExprImpl::literal_varchar(secret_resolved["port"].clone()),
335                    ExprImpl::literal_varchar(secret_resolved["username"].clone()),
336                    ExprImpl::literal_varchar(secret_resolved["password"].clone()),
337                    ExprImpl::literal_varchar(secret_resolved["database.name"].clone()),
338                    args.get(1)
339                        .unwrap()
340                        .clone()
341                        .cast_implicit(DataType::Varchar)?,
342                ]
343            }
344            _ => {
345                return Err(BindError("postgres_query function and mysql_query function accept either 2 arguments: (cdc_source_name varchar, query varchar) or 6 arguments: (hostname varchar, port varchar, username varchar, password varchar, database_name varchar, query varchar)".to_owned()).into());
346            }
347        };
348
349        Ok(cast_args)
350    }
351
352    pub fn new_postgres_query(
353        catalog_reader: &CatalogReadGuard,
354        db_name: &str,
355        schema_path: SchemaPath<'_>,
356        args: Vec<ExprImpl>,
357    ) -> RwResult<Self> {
358        let args = Self::handle_postgres_or_mysql_query_args(
359            catalog_reader,
360            db_name,
361            schema_path,
362            args,
363            "postgres-cdc",
364        )?;
365        let evaled_args = args
366            .iter()
367            .map(expr_impl_to_string_fn)
368            .collect::<RwResult<Vec<_>>>()?;
369
370        #[cfg(madsim)]
371        {
372            return Err(crate::error::ErrorCode::BindError(
373                "postgres_query can't be used in the madsim mode".to_string(),
374            )
375            .into());
376        }
377
378        #[cfg(not(madsim))]
379        {
380            let schema = tokio::task::block_in_place(|| {
381                FRONTEND_RUNTIME.block_on(async {
382                    let (client, connection) = tokio_postgres::connect(
383                        format!(
384                            "host={} port={} user={} password={} dbname={}",
385                            evaled_args[0],
386                            evaled_args[1],
387                            evaled_args[2],
388                            evaled_args[3],
389                            evaled_args[4]
390                        )
391                        .as_str(),
392                        tokio_postgres::NoTls,
393                    )
394                    .await?;
395
396                    tokio::spawn(async move {
397                        if let Err(e) = connection.await {
398                            tracing::error!(
399                                "mysql_query_executor: connection error: {:?}",
400                                e.as_report()
401                            );
402                        }
403                    });
404
405                    let statement = client.prepare(evaled_args[5].as_str()).await?;
406
407                    let mut rw_types = vec![];
408                    for column in statement.columns() {
409                        let name = column.name().to_owned();
410                        let data_type = match *column.type_() {
411                            TokioPgType::BOOL => DataType::Boolean,
412                            TokioPgType::INT2 => DataType::Int16,
413                            TokioPgType::INT4 => DataType::Int32,
414                            TokioPgType::INT8 => DataType::Int64,
415                            TokioPgType::FLOAT4 => DataType::Float32,
416                            TokioPgType::FLOAT8 => DataType::Float64,
417                            TokioPgType::NUMERIC => DataType::Decimal,
418                            TokioPgType::DATE => DataType::Date,
419                            TokioPgType::TIME => DataType::Time,
420                            TokioPgType::TIMESTAMP => DataType::Timestamp,
421                            TokioPgType::TIMESTAMPTZ => DataType::Timestamptz,
422                            TokioPgType::TEXT | TokioPgType::VARCHAR => DataType::Varchar,
423                            TokioPgType::INTERVAL => DataType::Interval,
424                            TokioPgType::JSONB => DataType::Jsonb,
425                            TokioPgType::BYTEA => DataType::Bytea,
426                            _ => {
427                                return Err(crate::error::ErrorCode::BindError(format!(
428                                    "unsupported column type: {}",
429                                    column.type_()
430                                ))
431                                .into());
432                            }
433                        };
434                        rw_types.push((name, data_type));
435                    }
436                    Ok::<risingwave_common::types::DataType, anyhow::Error>(DataType::Struct(
437                        StructType::new(rw_types),
438                    ))
439                })
440            })?;
441
442            Ok(TableFunction {
443                args,
444                return_type: schema,
445                function_type: TableFunctionType::PostgresQuery,
446                user_defined: None,
447            })
448        }
449    }
450
451    pub fn new_mysql_query(
452        catalog_reader: &CatalogReadGuard,
453        db_name: &str,
454        schema_path: SchemaPath<'_>,
455        args: Vec<ExprImpl>,
456    ) -> RwResult<Self> {
457        let args = Self::handle_postgres_or_mysql_query_args(
458            catalog_reader,
459            db_name,
460            schema_path,
461            args,
462            "mysql-cdc",
463        )?;
464        let evaled_args = args
465            .iter()
466            .map(expr_impl_to_string_fn)
467            .collect::<RwResult<Vec<_>>>()?;
468
469        #[cfg(madsim)]
470        {
471            return Err(crate::error::ErrorCode::BindError(
472                "postgres_query can't be used in the madsim mode".to_string(),
473            )
474            .into());
475        }
476
477        #[cfg(not(madsim))]
478        {
479            let schema = tokio::task::block_in_place(|| {
480                FRONTEND_RUNTIME.block_on(async {
481                    let database_opts: mysql_async::Opts = {
482                        let port = evaled_args[1]
483                            .parse::<u16>()
484                            .context("failed to parse port")?;
485                        mysql_async::OptsBuilder::default()
486                            .ip_or_hostname(evaled_args[0].clone())
487                            .tcp_port(port)
488                            .user(Some(evaled_args[2].clone()))
489                            .pass(Some(evaled_args[3].clone()))
490                            .db_name(Some(evaled_args[4].clone()))
491                            .into()
492                    };
493
494                    let pool = mysql_async::Pool::new(database_opts);
495                    let mut conn = pool
496                        .get_conn()
497                        .await
498                        .context("failed to connect to mysql in binder")?;
499
500                    let query = evaled_args[5].clone();
501                    let statement = conn
502                        .prep(query)
503                        .await
504                        .context("failed to prepare mysql_query in binder")?;
505
506                    let mut rw_types = vec![];
507                    #[allow(clippy::never_loop)]
508                    for column in statement.columns() {
509                        let name = column.name_str().to_string();
510                        let data_type = match column.column_type() {
511                            // Boolean types
512                            MySqlColumnType::MYSQL_TYPE_BIT if column.column_length() == 1 => {
513                                DataType::Boolean
514                            }
515
516                            // Numeric types
517                            // NOTE(kwannoel): Although `bool/boolean` is a synonym of TINY(1) in MySQL,
518                            // we treat it as Int16 here. It is better to be straightforward in our conversion.
519                            MySqlColumnType::MYSQL_TYPE_TINY => DataType::Int16,
520                            MySqlColumnType::MYSQL_TYPE_SHORT => DataType::Int16,
521                            MySqlColumnType::MYSQL_TYPE_INT24 => DataType::Int32,
522                            MySqlColumnType::MYSQL_TYPE_LONG => DataType::Int32,
523                            MySqlColumnType::MYSQL_TYPE_LONGLONG => DataType::Int64,
524                            MySqlColumnType::MYSQL_TYPE_FLOAT => DataType::Float32,
525                            MySqlColumnType::MYSQL_TYPE_DOUBLE => DataType::Float64,
526                            MySqlColumnType::MYSQL_TYPE_NEWDECIMAL => DataType::Decimal,
527                            MySqlColumnType::MYSQL_TYPE_DECIMAL => DataType::Decimal,
528
529                            // Date time types
530                            MySqlColumnType::MYSQL_TYPE_YEAR => DataType::Int32,
531                            MySqlColumnType::MYSQL_TYPE_DATE => DataType::Date,
532                            MySqlColumnType::MYSQL_TYPE_NEWDATE => DataType::Date,
533                            MySqlColumnType::MYSQL_TYPE_TIME => DataType::Time,
534                            MySqlColumnType::MYSQL_TYPE_TIME2 => DataType::Time,
535                            MySqlColumnType::MYSQL_TYPE_DATETIME => DataType::Timestamp,
536                            MySqlColumnType::MYSQL_TYPE_DATETIME2 => DataType::Timestamp,
537                            MySqlColumnType::MYSQL_TYPE_TIMESTAMP => DataType::Timestamptz,
538                            MySqlColumnType::MYSQL_TYPE_TIMESTAMP2 => DataType::Timestamptz,
539
540                            // String types
541                            MySqlColumnType::MYSQL_TYPE_VARCHAR
542                            | MySqlColumnType::MYSQL_TYPE_STRING
543                            | MySqlColumnType::MYSQL_TYPE_VAR_STRING => DataType::Varchar,
544
545                            // JSON types
546                            MySqlColumnType::MYSQL_TYPE_JSON => DataType::Jsonb,
547
548                            // Binary types
549                            MySqlColumnType::MYSQL_TYPE_BIT
550                            | MySqlColumnType::MYSQL_TYPE_BLOB
551                            | MySqlColumnType::MYSQL_TYPE_TINY_BLOB
552                            | MySqlColumnType::MYSQL_TYPE_MEDIUM_BLOB
553                            | MySqlColumnType::MYSQL_TYPE_LONG_BLOB => DataType::Bytea,
554
555                            MySqlColumnType::MYSQL_TYPE_UNKNOWN
556                            | MySqlColumnType::MYSQL_TYPE_TYPED_ARRAY
557                            | MySqlColumnType::MYSQL_TYPE_ENUM
558                            | MySqlColumnType::MYSQL_TYPE_SET
559                            | MySqlColumnType::MYSQL_TYPE_GEOMETRY
560                            | MySqlColumnType::MYSQL_TYPE_NULL => {
561                                return Err(crate::error::ErrorCode::BindError(format!(
562                                    "unsupported column type: {:?}",
563                                    column.column_type()
564                                ))
565                                .into());
566                            }
567                        };
568                        rw_types.push((name, data_type));
569                    }
570                    Ok::<risingwave_common::types::DataType, anyhow::Error>(DataType::Struct(
571                        StructType::new(rw_types),
572                    ))
573                })
574            })?;
575
576            Ok(TableFunction {
577                args,
578                return_type: schema,
579                function_type: TableFunctionType::MysqlQuery,
580                user_defined: None,
581            })
582        }
583    }
584
585    pub fn to_protobuf(&self) -> PbTableFunction {
586        PbTableFunction {
587            function_type: self.function_type as i32,
588            args: self.args.iter().map(|c| c.to_expr_proto()).collect_vec(),
589            return_type: Some(self.return_type.to_protobuf()),
590            udf: self.user_defined.as_ref().map(|c| c.as_ref().into()),
591        }
592    }
593
594    /// Get the name of the table function.
595    pub fn name(&self) -> String {
596        match self.function_type {
597            TableFunctionType::UserDefined => self.user_defined.as_ref().unwrap().name.clone(),
598            t => t.as_str_name().to_lowercase(),
599        }
600    }
601
602    pub fn rewrite(self, rewriter: &mut impl ExprRewriter) -> Self {
603        Self {
604            args: self
605                .args
606                .into_iter()
607                .map(|e| rewriter.rewrite_expr(e))
608                .collect(),
609            ..self
610        }
611    }
612}
613
614impl std::fmt::Debug for TableFunction {
615    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
616        if f.alternate() {
617            f.debug_struct("FunctionCall")
618                .field("function_type", &self.function_type)
619                .field("return_type", &self.return_type)
620                .field("args", &self.args)
621                .finish()
622        } else {
623            let func_name = format!("{:?}", self.function_type);
624            let mut builder = f.debug_tuple(&func_name);
625            self.args.iter().for_each(|child| {
626                builder.field(child);
627            });
628            builder.finish()
629        }
630    }
631}
632
633impl Expr for TableFunction {
634    fn return_type(&self) -> DataType {
635        self.return_type.clone()
636    }
637
638    fn to_expr_proto(&self) -> risingwave_pb::expr::ExprNode {
639        unreachable!("Table function should not be converted to ExprNode")
640    }
641}
642
643fn expr_impl_to_string_fn(arg: &ExprImpl) -> RwResult<String> {
644    match arg.try_fold_const() {
645        Some(Ok(value)) => {
646            let Some(scalar) = value else {
647                return Err(BindError(
648                    "postgres_query function and mysql_query function do not accept null arguments"
649                        .to_owned(),
650                )
651                .into());
652            };
653            Ok(scalar.into_utf8().to_string())
654        }
655        Some(Err(err)) => Err(err),
656        None => Err(BindError(
657            "postgres_query function and mysql_query function only accept constant arguments"
658                .to_owned(),
659        )
660        .into()),
661    }
662}