1use risingwave_pb::catalog::{PbSink, PbSinkType};
16use sea_orm::ActiveValue::Set;
17use sea_orm::entity::prelude::*;
18use serde::{Deserialize, Serialize};
19
20use crate::{
21 ColumnCatalogArray, ColumnOrderArray, ConnectionId, I32Array, Property, SecretRef,
22 SinkFormatDesc, SinkId, TableId,
23};
24
25#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
26#[sea_orm(rs_type = "String", db_type = "string(None)")]
27pub enum SinkType {
28 #[sea_orm(string_value = "APPEND_ONLY")]
29 AppendOnly,
30 #[sea_orm(string_value = "FORCE_APPEND_ONLY")]
31 ForceAppendOnly,
32 #[sea_orm(string_value = "UPSERT")]
33 Upsert,
34 #[sea_orm(string_value = "RETRACT")]
35 Retract,
36}
37
38impl From<SinkType> for PbSinkType {
39 fn from(sink_type: SinkType) -> Self {
40 match sink_type {
41 SinkType::AppendOnly => Self::AppendOnly,
42 SinkType::ForceAppendOnly => Self::ForceAppendOnly,
43 SinkType::Upsert => Self::Upsert,
44 SinkType::Retract => Self::Retract,
45 }
46 }
47}
48
49impl From<PbSinkType> for SinkType {
50 fn from(sink_type: PbSinkType) -> Self {
51 match sink_type {
52 PbSinkType::AppendOnly => Self::AppendOnly,
53 PbSinkType::ForceAppendOnly => Self::ForceAppendOnly,
54 PbSinkType::Upsert => Self::Upsert,
55 PbSinkType::Retract => Self::Retract,
56 PbSinkType::Unspecified => unreachable!("Unspecified sink type"),
57 }
58 }
59}
60
61#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
62#[sea_orm(table_name = "sink")]
63pub struct Model {
64 #[sea_orm(primary_key, auto_increment = false)]
65 pub sink_id: SinkId,
66 pub name: String,
67 pub columns: ColumnCatalogArray,
68 pub plan_pk: ColumnOrderArray,
69 pub distribution_key: I32Array,
70 pub downstream_pk: I32Array,
71 pub sink_type: SinkType,
72 pub properties: Property,
73 pub definition: String,
74 pub connection_id: Option<ConnectionId>,
75 pub db_name: String,
76 pub sink_from_name: String,
77 pub sink_format_desc: Option<SinkFormatDesc>,
78 pub target_table: Option<TableId>,
79 pub secret_ref: Option<SecretRef>,
81 pub original_target_columns: Option<ColumnCatalogArray>,
82 pub auto_refresh_schema_from_table: Option<TableId>,
83}
84
85#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
86pub enum Relation {
87 #[sea_orm(
88 belongs_to = "super::connection::Entity",
89 from = "Column::ConnectionId",
90 to = "super::connection::Column::ConnectionId",
91 on_update = "NoAction",
92 on_delete = "NoAction"
93 )]
94 Connection,
95 #[sea_orm(
96 belongs_to = "super::object::Entity",
97 from = "Column::SinkId",
98 to = "super::object::Column::Oid",
99 on_update = "NoAction",
100 on_delete = "Cascade"
101 )]
102 Object,
103}
104
105impl Related<super::connection::Entity> for Entity {
106 fn to() -> RelationDef {
107 Relation::Connection.def()
108 }
109}
110
111impl Related<super::object::Entity> for Entity {
112 fn to() -> RelationDef {
113 Relation::Object.def()
114 }
115}
116
117impl ActiveModelBehavior for ActiveModel {}
118
119impl From<PbSink> for ActiveModel {
120 fn from(pb_sink: PbSink) -> Self {
121 let sink_type = pb_sink.sink_type();
122
123 Self {
124 sink_id: Set(pb_sink.id as _),
125 name: Set(pb_sink.name),
126 columns: Set(pb_sink.columns.into()),
127 plan_pk: Set(pb_sink.plan_pk.into()),
128 distribution_key: Set(pb_sink.distribution_key.into()),
129 downstream_pk: Set(pb_sink.downstream_pk.into()),
130 sink_type: Set(sink_type.into()),
131 properties: Set(pb_sink.properties.into()),
132 definition: Set(pb_sink.definition),
133 connection_id: Set(pb_sink.connection_id.map(|x| x as _)),
134 db_name: Set(pb_sink.db_name),
135 sink_from_name: Set(pb_sink.sink_from_name),
136 sink_format_desc: Set(pb_sink.format_desc.as_ref().map(|x| x.into())),
137 target_table: Set(pb_sink.target_table),
138 secret_ref: Set(Some(SecretRef::from(pb_sink.secret_refs))),
139 original_target_columns: Set(Some(pb_sink.original_target_columns.into())),
140 auto_refresh_schema_from_table: Set(pb_sink.auto_refresh_schema_from_table),
141 }
142 }
143}