risingwave_frontend/expr/
user_defined_function.rsuse std::sync::Arc;
use itertools::Itertools;
use risingwave_common::catalog::FunctionId;
use risingwave_common::types::DataType;
use super::{Expr, ExprImpl};
use crate::catalog::function_catalog::{FunctionCatalog, FunctionKind};
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct UserDefinedFunction {
pub args: Vec<ExprImpl>,
pub catalog: Arc<FunctionCatalog>,
}
impl UserDefinedFunction {
pub fn new(catalog: Arc<FunctionCatalog>, args: Vec<ExprImpl>) -> Self {
Self { args, catalog }
}
pub(super) fn from_expr_proto(
udf: &risingwave_pb::expr::UserDefinedFunction,
return_type: DataType,
) -> crate::error::Result<Self> {
let args: Vec<_> = udf
.get_children()
.iter()
.map(ExprImpl::from_expr_proto)
.try_collect()?;
let arg_types = udf.get_arg_types().iter().map_into().collect_vec();
let catalog = FunctionCatalog {
id: FunctionId::placeholder(),
name: udf.name.clone(),
owner: u32::MAX - 1,
kind: FunctionKind::Scalar,
arg_names: udf.arg_names.clone(),
arg_types,
return_type,
language: udf.language.clone(),
runtime: udf.runtime.clone(),
identifier: udf.identifier.clone(),
body: udf.body.clone(),
link: udf.link.clone(),
compressed_binary: udf.compressed_binary.clone(),
always_retry_on_network_error: udf.always_retry_on_network_error,
};
Ok(Self {
args,
catalog: Arc::new(catalog),
})
}
}
impl Expr for UserDefinedFunction {
fn return_type(&self) -> DataType {
self.catalog.return_type.clone()
}
fn to_expr_proto(&self) -> risingwave_pb::expr::ExprNode {
use risingwave_pb::expr::expr_node::*;
use risingwave_pb::expr::*;
ExprNode {
function_type: Type::Unspecified.into(),
return_type: Some(self.return_type().to_protobuf()),
rex_node: Some(RexNode::Udf(UserDefinedFunction {
children: self.args.iter().map(Expr::to_expr_proto).collect(),
name: self.catalog.name.clone(),
arg_names: self.catalog.arg_names.clone(),
arg_types: self
.catalog
.arg_types
.iter()
.map(|t| t.to_protobuf())
.collect(),
language: self.catalog.language.clone(),
runtime: self.catalog.runtime.clone(),
identifier: self.catalog.identifier.clone(),
link: self.catalog.link.clone(),
body: self.catalog.body.clone(),
compressed_binary: self.catalog.compressed_binary.clone(),
always_retry_on_network_error: self.catalog.always_retry_on_network_error,
})),
}
}
}