1use risingwave_common::catalog::OBJECT_ID_PLACEHOLDER;
16use risingwave_common::hash::VnodeCountCompat;
17use risingwave_pb::catalog::table::{OptionalAssociatedSourceId, PbEngine, PbTableType};
18use risingwave_pb::catalog::{PbHandleConflictBehavior, PbTable};
19use sea_orm::ActiveValue::Set;
20use sea_orm::NotSet;
21use sea_orm::entity::prelude::*;
22use serde::{Deserialize, Serialize};
23
24use crate::{
25 Cardinality, ColumnCatalogArray, ColumnOrderArray, FragmentId, I32Array, ObjectId, SourceId,
26 TableId, TableVersion, WebhookSourceInfo,
27};
28
29#[derive(
30 Clone, Debug, PartialEq, Hash, Copy, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize,
31)]
32#[sea_orm(rs_type = "String", db_type = "string(None)")]
33pub enum TableType {
34 #[sea_orm(string_value = "TABLE")]
35 Table,
36 #[sea_orm(string_value = "MATERIALIZED_VIEW")]
37 MaterializedView,
38 #[sea_orm(string_value = "INDEX")]
39 Index,
40 #[sea_orm(string_value = "INTERNAL")]
41 Internal,
42}
43
44impl From<TableType> for PbTableType {
45 fn from(table_type: TableType) -> Self {
46 match table_type {
47 TableType::Table => Self::Table,
48 TableType::MaterializedView => Self::MaterializedView,
49 TableType::Index => Self::Index,
50 TableType::Internal => Self::Internal,
51 }
52 }
53}
54
55impl From<PbTableType> for TableType {
56 fn from(table_type: PbTableType) -> Self {
57 match table_type {
58 PbTableType::Table => Self::Table,
59 PbTableType::MaterializedView => Self::MaterializedView,
60 PbTableType::Index => Self::Index,
61 PbTableType::Internal => Self::Internal,
62 PbTableType::Unspecified => unreachable!("Unspecified table type"),
63 }
64 }
65}
66
67#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
68#[sea_orm(rs_type = "String", db_type = "string(None)")]
69pub enum HandleConflictBehavior {
70 #[sea_orm(string_value = "OVERWRITE")]
71 Overwrite,
72 #[sea_orm(string_value = "IGNORE")]
73 Ignore,
74 #[sea_orm(string_value = "NO_CHECK")]
75 NoCheck,
76 #[sea_orm(string_value = "DO_UPDATE_IF_NOT_NULL")]
77 DoUpdateIfNotNull,
78}
79
80impl From<HandleConflictBehavior> for PbHandleConflictBehavior {
81 fn from(handle_conflict_behavior: HandleConflictBehavior) -> Self {
82 match handle_conflict_behavior {
83 HandleConflictBehavior::Overwrite => Self::Overwrite,
84 HandleConflictBehavior::Ignore => Self::Ignore,
85 HandleConflictBehavior::NoCheck => Self::NoCheck,
86 HandleConflictBehavior::DoUpdateIfNotNull => Self::DoUpdateIfNotNull,
87 }
88 }
89}
90
91impl From<PbHandleConflictBehavior> for HandleConflictBehavior {
92 fn from(handle_conflict_behavior: PbHandleConflictBehavior) -> Self {
93 match handle_conflict_behavior {
94 PbHandleConflictBehavior::Overwrite => Self::Overwrite,
95 PbHandleConflictBehavior::Ignore => Self::Ignore,
96 PbHandleConflictBehavior::NoCheck => Self::NoCheck,
97 PbHandleConflictBehavior::DoUpdateIfNotNull => Self::DoUpdateIfNotNull,
98 PbHandleConflictBehavior::Unspecified => {
99 unreachable!("Unspecified handle conflict behavior")
100 }
101 }
102 }
103}
104
105#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
106#[sea_orm(rs_type = "String", db_type = "string(None)")]
107pub enum Engine {
108 #[sea_orm(string_value = "HUMMOCK")]
109 Hummock,
110 #[sea_orm(string_value = "ICEBERG")]
111 Iceberg,
112}
113
114impl From<Engine> for PbEngine {
115 fn from(engine: Engine) -> Self {
116 match engine {
117 Engine::Hummock => Self::Hummock,
118 Engine::Iceberg => Self::Iceberg,
119 }
120 }
121}
122
123impl From<PbEngine> for Engine {
124 fn from(engine: PbEngine) -> Self {
125 match engine {
126 PbEngine::Hummock => Self::Hummock,
127 PbEngine::Iceberg => Self::Iceberg,
128 PbEngine::Unspecified => Self::Hummock,
129 }
130 }
131}
132
133#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
134#[sea_orm(table_name = "table")]
135pub struct Model {
136 #[sea_orm(primary_key, auto_increment = false)]
137 pub table_id: TableId,
138 pub name: String,
139 pub optional_associated_source_id: Option<SourceId>,
140 pub table_type: TableType,
141 pub belongs_to_job_id: Option<ObjectId>,
142 pub columns: ColumnCatalogArray,
143 pub pk: ColumnOrderArray,
144 pub distribution_key: I32Array,
145 pub stream_key: I32Array,
146 pub append_only: bool,
147 pub fragment_id: Option<FragmentId>,
148 pub vnode_col_index: Option<i32>,
149 pub row_id_index: Option<i32>,
150 pub value_indices: I32Array,
151 pub definition: String,
152 pub handle_pk_conflict_behavior: HandleConflictBehavior,
153 pub version_column_index: Option<i32>,
154 pub read_prefix_len_hint: i32,
155 pub watermark_indices: I32Array,
156 pub dist_key_in_pk: I32Array,
157 pub dml_fragment_id: Option<FragmentId>,
158 pub cardinality: Option<Cardinality>,
159 pub cleaned_by_watermark: bool,
160 pub description: Option<String>,
161 pub version: Option<TableVersion>,
162 pub retention_seconds: Option<i32>,
163 pub incoming_sinks: I32Array,
164 pub cdc_table_id: Option<String>,
165 pub vnode_count: i32,
166 pub webhook_info: Option<WebhookSourceInfo>,
167 pub engine: Option<Engine>,
168 pub clean_watermark_index_in_pk: Option<i32>,
169}
170
171#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
172pub enum Relation {
173 #[sea_orm(
174 belongs_to = "super::fragment::Entity",
175 from = "Column::DmlFragmentId",
176 to = "super::fragment::Column::FragmentId",
177 on_update = "NoAction",
178 on_delete = "NoAction"
179 )]
180 Fragment2,
181 #[sea_orm(
182 belongs_to = "super::fragment::Entity",
183 from = "Column::FragmentId",
184 to = "super::fragment::Column::FragmentId",
185 on_update = "NoAction",
186 on_delete = "NoAction"
187 )]
188 Fragment1,
189 #[sea_orm(
190 belongs_to = "super::object::Entity",
191 from = "Column::BelongsToJobId",
192 to = "super::object::Column::Oid",
193 on_update = "NoAction",
194 on_delete = "Cascade"
195 )]
196 Object2,
197 #[sea_orm(
198 belongs_to = "super::object::Entity",
199 from = "Column::TableId",
200 to = "super::object::Column::Oid",
201 on_update = "NoAction",
202 on_delete = "Cascade"
203 )]
204 Object1,
205 #[sea_orm(
206 belongs_to = "super::source::Entity",
207 from = "Column::OptionalAssociatedSourceId",
208 to = "super::source::Column::SourceId",
209 on_update = "NoAction",
210 on_delete = "NoAction"
211 )]
212 Source,
213
214 #[sea_orm(
216 belongs_to = "super::object_dependency::Entity",
217 from = "Column::TableId",
218 to = "super::object_dependency::Column::UsedBy",
219 on_update = "NoAction",
220 on_delete = "Cascade"
221 )]
222 ObjectDependency,
223}
224
225impl Related<super::object::Entity> for Entity {
226 fn to() -> RelationDef {
227 Relation::Object1.def()
228 }
229}
230
231impl Related<super::source::Entity> for Entity {
232 fn to() -> RelationDef {
233 Relation::Source.def()
234 }
235}
236
237impl ActiveModelBehavior for ActiveModel {}
238
239impl From<PbTable> for ActiveModel {
240 fn from(pb_table: PbTable) -> Self {
241 let table_type = pb_table.table_type();
242 let handle_pk_conflict_behavior = pb_table.handle_pk_conflict_behavior();
243
244 let vnode_count = pb_table
248 .vnode_count_inner()
249 .value_opt()
250 .map(|v| v as _)
251 .map_or(NotSet, Set);
252 let fragment_id = if pb_table.fragment_id == OBJECT_ID_PLACEHOLDER {
253 NotSet
254 } else {
255 Set(Some(pb_table.fragment_id as FragmentId))
256 };
257 let dml_fragment_id = pb_table
258 .dml_fragment_id
259 .map(|x| Set(Some(x as FragmentId)))
260 .unwrap_or_default();
261 let optional_associated_source_id =
262 if let Some(OptionalAssociatedSourceId::AssociatedSourceId(src_id)) =
263 pb_table.optional_associated_source_id
264 {
265 Set(Some(src_id as SourceId))
266 } else {
267 NotSet
268 };
269
270 Self {
271 table_id: Set(pb_table.id as _),
272 name: Set(pb_table.name),
273 optional_associated_source_id,
274 table_type: Set(table_type.into()),
275 belongs_to_job_id: Set(pb_table.job_id.map(|x| x as _)),
276 columns: Set(pb_table.columns.into()),
277 pk: Set(pb_table.pk.into()),
278 distribution_key: Set(pb_table.distribution_key.into()),
279 stream_key: Set(pb_table.stream_key.into()),
280 append_only: Set(pb_table.append_only),
281 fragment_id,
282 vnode_col_index: Set(pb_table.vnode_col_index.map(|x| x as i32)),
283 row_id_index: Set(pb_table.row_id_index.map(|x| x as i32)),
284 value_indices: Set(pb_table.value_indices.into()),
285 definition: Set(pb_table.definition),
286 handle_pk_conflict_behavior: Set(handle_pk_conflict_behavior.into()),
287 version_column_index: Set(pb_table.version_column_index.map(|x| x as i32)),
288 read_prefix_len_hint: Set(pb_table.read_prefix_len_hint as _),
289 watermark_indices: Set(pb_table.watermark_indices.into()),
290 dist_key_in_pk: Set(pb_table.dist_key_in_pk.into()),
291 dml_fragment_id,
292 cardinality: Set(pb_table.cardinality.as_ref().map(|x| x.into())),
293 cleaned_by_watermark: Set(pb_table.cleaned_by_watermark),
294 description: Set(pb_table.description),
295 version: Set(pb_table.version.as_ref().map(|v| v.into())),
296 retention_seconds: Set(pb_table.retention_seconds.map(|i| i as _)),
297 incoming_sinks: Set(pb_table.incoming_sinks.into()),
298 cdc_table_id: Set(pb_table.cdc_table_id),
299 vnode_count,
300 webhook_info: Set(pb_table.webhook_info.as_ref().map(WebhookSourceInfo::from)),
301 engine: Set(pb_table
302 .engine
303 .map(|engine| Engine::from(PbEngine::try_from(engine).expect("Invalid engine")))),
304 clean_watermark_index_in_pk: Set(pb_table.clean_watermark_index_in_pk),
305 }
306 }
307}