risingwave_meta_model/
function.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16
17use risingwave_pb::catalog::PbFunction;
18use risingwave_pb::catalog::function::Kind;
19use sea_orm::ActiveValue::Set;
20use sea_orm::entity::prelude::*;
21use serde::{Deserialize, Serialize};
22
23use crate::{DataType, DataTypeArray, FunctionId, Property};
24
25#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
26#[sea_orm(rs_type = "String", db_type = "string(None)")]
27pub enum FunctionKind {
28    #[sea_orm(string_value = "Scalar")]
29    Scalar,
30    #[sea_orm(string_value = "Table")]
31    Table,
32    #[sea_orm(string_value = "Aggregate")]
33    Aggregate,
34}
35
36#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
37#[sea_orm(table_name = "function")]
38pub struct Model {
39    #[sea_orm(primary_key, auto_increment = false)]
40    pub function_id: FunctionId,
41    pub name: String,
42    // encode Vec<String> as comma separated string
43    pub arg_names: String,
44    pub arg_types: DataTypeArray,
45    pub return_type: DataType,
46    pub language: String,
47    pub runtime: Option<String>,
48    pub link: Option<String>,
49    pub name_in_runtime: Option<String>,
50    pub body: Option<String>,
51    pub compressed_binary: Option<Vec<u8>>,
52    pub kind: FunctionKind,
53    // To keep compatible with legacy code, this is not included in `options`.
54    pub always_retry_on_network_error: bool,
55    pub options: Option<Property>,
56}
57
58#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
59pub enum Relation {
60    #[sea_orm(
61        belongs_to = "super::object::Entity",
62        from = "Column::FunctionId",
63        to = "super::object::Column::Oid",
64        on_update = "NoAction",
65        on_delete = "Cascade"
66    )]
67    Object,
68}
69
70impl Related<super::object::Entity> for Entity {
71    fn to() -> RelationDef {
72        Relation::Object.def()
73    }
74}
75
76impl ActiveModelBehavior for ActiveModel {}
77
78impl From<Kind> for FunctionKind {
79    fn from(kind: Kind) -> Self {
80        match kind {
81            Kind::Scalar(_) => Self::Scalar,
82            Kind::Table(_) => Self::Table,
83            Kind::Aggregate(_) => Self::Aggregate,
84        }
85    }
86}
87
88impl From<FunctionKind> for Kind {
89    fn from(value: FunctionKind) -> Self {
90        match value {
91            FunctionKind::Scalar => Self::Scalar(Default::default()),
92            FunctionKind::Table => Self::Table(Default::default()),
93            FunctionKind::Aggregate => Self::Aggregate(Default::default()),
94        }
95    }
96}
97
98impl From<PbFunction> for ActiveModel {
99    fn from(function: PbFunction) -> Self {
100        let mut options = BTreeMap::new();
101        if let Some(b) = function.is_batched {
102            options.insert("batch".to_string(), b.to_string());
103        }
104        if let Some(b) = function.is_async {
105            options.insert("async".to_string(), b.to_string());
106        }
107        Self {
108            function_id: Set(function.id as _),
109            name: Set(function.name),
110            arg_names: Set(function.arg_names.join(",")),
111            arg_types: Set(DataTypeArray::from(function.arg_types)),
112            return_type: Set(DataType::from(&function.return_type.unwrap())),
113            language: Set(function.language),
114            runtime: Set(function.runtime),
115            link: Set(function.link),
116            name_in_runtime: Set(function.name_in_runtime),
117            body: Set(function.body),
118            compressed_binary: Set(function.compressed_binary),
119            kind: Set(function.kind.unwrap().into()),
120            always_retry_on_network_error: Set(function.always_retry_on_network_error),
121            options: Set(Some(options.into())),
122        }
123    }
124}