risingwave_meta_model/
sink.rs1use risingwave_pb::catalog::{PbSink, PbSinkType};
16use sea_orm::ActiveValue::Set;
17use sea_orm::entity::prelude::*;
18use serde::{Deserialize, Serialize};
19
20use crate::{
21    ColumnCatalogArray, ColumnOrderArray, ConnectionId, I32Array, Property, SecretRef,
22    SinkFormatDesc, SinkId, TableId,
23};
24
25#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
26#[sea_orm(rs_type = "String", db_type = "string(None)")]
27pub enum SinkType {
28    #[sea_orm(string_value = "APPEND_ONLY")]
29    AppendOnly,
30    #[sea_orm(string_value = "FORCE_APPEND_ONLY")]
31    ForceAppendOnly,
32    #[sea_orm(string_value = "UPSERT")]
33    Upsert,
34}
35
36impl From<SinkType> for PbSinkType {
37    fn from(sink_type: SinkType) -> Self {
38        match sink_type {
39            SinkType::AppendOnly => Self::AppendOnly,
40            SinkType::ForceAppendOnly => Self::ForceAppendOnly,
41            SinkType::Upsert => Self::Upsert,
42        }
43    }
44}
45
46impl From<PbSinkType> for SinkType {
47    fn from(sink_type: PbSinkType) -> Self {
48        match sink_type {
49            PbSinkType::AppendOnly => Self::AppendOnly,
50            PbSinkType::ForceAppendOnly => Self::ForceAppendOnly,
51            PbSinkType::Upsert => Self::Upsert,
52            PbSinkType::Unspecified => unreachable!("Unspecified sink type"),
53        }
54    }
55}
56
57#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
58#[sea_orm(table_name = "sink")]
59pub struct Model {
60    #[sea_orm(primary_key, auto_increment = false)]
61    pub sink_id: SinkId,
62    pub name: String,
63    pub columns: ColumnCatalogArray,
64    pub plan_pk: ColumnOrderArray,
65    pub distribution_key: I32Array,
66    pub downstream_pk: I32Array,
67    pub sink_type: SinkType,
68    pub properties: Property,
69    pub definition: String,
70    pub connection_id: Option<ConnectionId>,
71    pub db_name: String,
72    pub sink_from_name: String,
73    pub sink_format_desc: Option<SinkFormatDesc>,
74    pub target_table: Option<TableId>,
75    pub secret_ref: Option<SecretRef>,
77    pub original_target_columns: Option<ColumnCatalogArray>,
78    pub auto_refresh_schema_from_table: Option<TableId>,
79}
80
81#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
82pub enum Relation {
83    #[sea_orm(
84        belongs_to = "super::connection::Entity",
85        from = "Column::ConnectionId",
86        to = "super::connection::Column::ConnectionId",
87        on_update = "NoAction",
88        on_delete = "NoAction"
89    )]
90    Connection,
91    #[sea_orm(
92        belongs_to = "super::object::Entity",
93        from = "Column::SinkId",
94        to = "super::object::Column::Oid",
95        on_update = "NoAction",
96        on_delete = "Cascade"
97    )]
98    Object,
99}
100
101impl Related<super::connection::Entity> for Entity {
102    fn to() -> RelationDef {
103        Relation::Connection.def()
104    }
105}
106
107impl Related<super::object::Entity> for Entity {
108    fn to() -> RelationDef {
109        Relation::Object.def()
110    }
111}
112
113impl ActiveModelBehavior for ActiveModel {}
114
115impl From<PbSink> for ActiveModel {
116    fn from(pb_sink: PbSink) -> Self {
117        let sink_type = pb_sink.sink_type();
118
119        Self {
120            sink_id: Set(pb_sink.id as _),
121            name: Set(pb_sink.name),
122            columns: Set(pb_sink.columns.into()),
123            plan_pk: Set(pb_sink.plan_pk.into()),
124            distribution_key: Set(pb_sink.distribution_key.into()),
125            downstream_pk: Set(pb_sink.downstream_pk.into()),
126            sink_type: Set(sink_type.into()),
127            properties: Set(pb_sink.properties.into()),
128            definition: Set(pb_sink.definition),
129            connection_id: Set(pb_sink.connection_id.map(|x| x as _)),
130            db_name: Set(pb_sink.db_name),
131            sink_from_name: Set(pb_sink.sink_from_name),
132            sink_format_desc: Set(pb_sink.format_desc.as_ref().map(|x| x.into())),
133            target_table: Set(pb_sink.target_table.map(|x| x as _)),
134            secret_ref: Set(Some(SecretRef::from(pb_sink.secret_refs))),
135            original_target_columns: Set(Some(pb_sink.original_target_columns.into())),
136            auto_refresh_schema_from_table: Set(pb_sink
137                .auto_refresh_schema_from_table
138                .map(|id| id as _)),
139        }
140    }
141}