risingwave_meta_model/
sink.rs1use risingwave_pb::catalog::{PbSink, PbSinkType};
16use sea_orm::ActiveValue::Set;
17use sea_orm::entity::prelude::*;
18use serde::{Deserialize, Serialize};
19
20use crate::{
21 ColumnCatalogArray, ColumnOrderArray, ConnectionId, I32Array, Property, SecretRef,
22 SinkFormatDesc, SinkId, TableId,
23};
24
25#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
26#[sea_orm(rs_type = "String", db_type = "string(None)")]
27pub enum SinkType {
28 #[sea_orm(string_value = "APPEND_ONLY")]
29 AppendOnly,
30 #[sea_orm(string_value = "FORCE_APPEND_ONLY")]
31 ForceAppendOnly,
32 #[sea_orm(string_value = "UPSERT")]
33 Upsert,
34}
35
36impl From<SinkType> for PbSinkType {
37 fn from(sink_type: SinkType) -> Self {
38 match sink_type {
39 SinkType::AppendOnly => Self::AppendOnly,
40 SinkType::ForceAppendOnly => Self::ForceAppendOnly,
41 SinkType::Upsert => Self::Upsert,
42 }
43 }
44}
45
46impl From<PbSinkType> for SinkType {
47 fn from(sink_type: PbSinkType) -> Self {
48 match sink_type {
49 PbSinkType::AppendOnly => Self::AppendOnly,
50 PbSinkType::ForceAppendOnly => Self::ForceAppendOnly,
51 PbSinkType::Upsert => Self::Upsert,
52 PbSinkType::Unspecified => unreachable!("Unspecified sink type"),
53 }
54 }
55}
56
57#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
58#[sea_orm(table_name = "sink")]
59pub struct Model {
60 #[sea_orm(primary_key, auto_increment = false)]
61 pub sink_id: SinkId,
62 pub name: String,
63 pub columns: ColumnCatalogArray,
64 pub plan_pk: ColumnOrderArray,
65 pub distribution_key: I32Array,
66 pub downstream_pk: I32Array,
67 pub sink_type: SinkType,
68 pub properties: Property,
69 pub definition: String,
70 pub connection_id: Option<ConnectionId>,
71 pub db_name: String,
72 pub sink_from_name: String,
73 pub sink_format_desc: Option<SinkFormatDesc>,
74 pub target_table: Option<TableId>,
75 pub secret_ref: Option<SecretRef>,
77 pub original_target_columns: Option<ColumnCatalogArray>,
78}
79
80#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
81pub enum Relation {
82 #[sea_orm(
83 belongs_to = "super::connection::Entity",
84 from = "Column::ConnectionId",
85 to = "super::connection::Column::ConnectionId",
86 on_update = "NoAction",
87 on_delete = "NoAction"
88 )]
89 Connection,
90 #[sea_orm(
91 belongs_to = "super::object::Entity",
92 from = "Column::SinkId",
93 to = "super::object::Column::Oid",
94 on_update = "NoAction",
95 on_delete = "Cascade"
96 )]
97 Object,
98}
99
100impl Related<super::connection::Entity> for Entity {
101 fn to() -> RelationDef {
102 Relation::Connection.def()
103 }
104}
105
106impl Related<super::object::Entity> for Entity {
107 fn to() -> RelationDef {
108 Relation::Object.def()
109 }
110}
111
112impl ActiveModelBehavior for ActiveModel {}
113
114impl From<PbSink> for ActiveModel {
115 fn from(pb_sink: PbSink) -> Self {
116 let sink_type = pb_sink.sink_type();
117
118 Self {
119 sink_id: Set(pb_sink.id as _),
120 name: Set(pb_sink.name),
121 columns: Set(pb_sink.columns.into()),
122 plan_pk: Set(pb_sink.plan_pk.into()),
123 distribution_key: Set(pb_sink.distribution_key.into()),
124 downstream_pk: Set(pb_sink.downstream_pk.into()),
125 sink_type: Set(sink_type.into()),
126 properties: Set(pb_sink.properties.into()),
127 definition: Set(pb_sink.definition),
128 connection_id: Set(pb_sink.connection_id.map(|x| x as _)),
129 db_name: Set(pb_sink.db_name),
130 sink_from_name: Set(pb_sink.sink_from_name),
131 sink_format_desc: Set(pb_sink.format_desc.as_ref().map(|x| x.into())),
132 target_table: Set(pb_sink.target_table.map(|x| x as _)),
133 secret_ref: Set(Some(SecretRef::from(pb_sink.secret_refs))),
134 original_target_columns: Set(Some(pb_sink.original_target_columns.into())),
135 }
136 }
137}