risingwave_meta_model/
sink.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_pb::catalog::{PbSink, PbSinkType};
16use sea_orm::ActiveValue::Set;
17use sea_orm::entity::prelude::*;
18use serde::{Deserialize, Serialize};
19
20use crate::{
21    ColumnCatalogArray, ColumnOrderArray, ConnectionId, I32Array, Property, SecretRef,
22    SinkFormatDesc, SinkId, TableId,
23};
24
25#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
26#[sea_orm(rs_type = "String", db_type = "string(None)")]
27pub enum SinkType {
28    #[sea_orm(string_value = "APPEND_ONLY")]
29    AppendOnly,
30    #[sea_orm(string_value = "UPSERT")]
31    Upsert,
32    #[sea_orm(string_value = "RETRACT")]
33    Retract,
34}
35
36impl From<SinkType> for PbSinkType {
37    fn from(sink_type: SinkType) -> Self {
38        match sink_type {
39            SinkType::AppendOnly => Self::AppendOnly,
40            SinkType::Upsert => Self::Upsert,
41            SinkType::Retract => Self::Retract,
42        }
43    }
44}
45
46impl From<PbSinkType> for SinkType {
47    fn from(sink_type: PbSinkType) -> Self {
48        match sink_type {
49            // `ForceAppendOnly` is now denoted by `AppendOnly` + `ignore_delete`.
50            #[expect(deprecated)]
51            PbSinkType::AppendOnly | PbSinkType::ForceAppendOnly => Self::AppendOnly,
52            PbSinkType::Upsert => Self::Upsert,
53            PbSinkType::Retract => Self::Retract,
54            PbSinkType::Unspecified => unreachable!("Unspecified sink type"),
55        }
56    }
57}
58
59#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
60#[sea_orm(table_name = "sink")]
61pub struct Model {
62    #[sea_orm(primary_key, auto_increment = false)]
63    pub sink_id: SinkId,
64    pub name: String,
65    pub columns: ColumnCatalogArray,
66    pub plan_pk: ColumnOrderArray,
67    pub distribution_key: I32Array,
68    pub downstream_pk: I32Array,
69    pub sink_type: SinkType,
70    pub ignore_delete: bool,
71    pub properties: Property,
72    pub definition: String,
73    pub connection_id: Option<ConnectionId>,
74    pub db_name: String,
75    pub sink_from_name: String,
76    pub sink_format_desc: Option<SinkFormatDesc>,
77    pub target_table: Option<TableId>,
78    // `secret_ref` stores the mapping info mapping from property name to secret id and type.
79    pub secret_ref: Option<SecretRef>,
80    pub original_target_columns: Option<ColumnCatalogArray>,
81    pub auto_refresh_schema_from_table: Option<TableId>,
82}
83
84#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
85pub enum Relation {
86    #[sea_orm(
87        belongs_to = "super::connection::Entity",
88        from = "Column::ConnectionId",
89        to = "super::connection::Column::ConnectionId",
90        on_update = "NoAction",
91        on_delete = "NoAction"
92    )]
93    Connection,
94    #[sea_orm(
95        belongs_to = "super::object::Entity",
96        from = "Column::SinkId",
97        to = "super::object::Column::Oid",
98        on_update = "NoAction",
99        on_delete = "Cascade"
100    )]
101    Object,
102}
103
104impl Related<super::connection::Entity> for Entity {
105    fn to() -> RelationDef {
106        Relation::Connection.def()
107    }
108}
109
110impl Related<super::object::Entity> for Entity {
111    fn to() -> RelationDef {
112        Relation::Object.def()
113    }
114}
115
116impl ActiveModelBehavior for ActiveModel {}
117
118impl From<PbSink> for ActiveModel {
119    fn from(pb_sink: PbSink) -> Self {
120        let sink_type = pb_sink.sink_type();
121        let ignore_delete = pb_sink.ignore_delete();
122
123        Self {
124            sink_id: Set(pb_sink.id),
125            name: Set(pb_sink.name),
126            columns: Set(pb_sink.columns.into()),
127            plan_pk: Set(pb_sink.plan_pk.into()),
128            distribution_key: Set(pb_sink.distribution_key.into()),
129            downstream_pk: Set(pb_sink.downstream_pk.into()),
130            sink_type: Set(sink_type.into()),
131            ignore_delete: Set(ignore_delete),
132            properties: Set(pb_sink.properties.into()),
133            definition: Set(pb_sink.definition),
134            connection_id: Set(pb_sink.connection_id),
135            db_name: Set(pb_sink.db_name),
136            sink_from_name: Set(pb_sink.sink_from_name),
137            sink_format_desc: Set(pb_sink.format_desc.as_ref().map(|x| x.into())),
138            target_table: Set(pb_sink.target_table),
139            secret_ref: Set(Some(SecretRef::from(pb_sink.secret_refs))),
140            original_target_columns: Set(Some(pb_sink.original_target_columns.into())),
141            auto_refresh_schema_from_table: Set(pb_sink.auto_refresh_schema_from_table),
142        }
143    }
144}