1use risingwave_common::catalog::OBJECT_ID_PLACEHOLDER;
16use risingwave_common::hash::VnodeCountCompat;
17use risingwave_common::id::JobId;
18use risingwave_pb::catalog::table::{CdcTableType as PbCdcTableType, PbEngine, PbTableType};
19use risingwave_pb::catalog::{PbHandleConflictBehavior, PbTable, PbVectorIndexInfo};
20use sea_orm::ActiveValue::Set;
21use sea_orm::NotSet;
22use sea_orm::entity::prelude::*;
23use serde::{Deserialize, Serialize};
24
25use crate::{
26 Cardinality, ColumnCatalogArray, ColumnOrderArray, FragmentId, I32Array, SourceId, TableId,
27 TableVersion, WebhookSourceInfo,
28};
29
30#[derive(
31 Clone, Debug, PartialEq, Hash, Copy, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize,
32)]
33#[sea_orm(rs_type = "String", db_type = "string(None)")]
34pub enum TableType {
35 #[sea_orm(string_value = "TABLE")]
36 Table,
37 #[sea_orm(string_value = "MATERIALIZED_VIEW")]
38 MaterializedView,
39 #[sea_orm(string_value = "INDEX")]
40 Index,
41 #[sea_orm(string_value = "INTERNAL")]
42 Internal,
43 #[sea_orm(string_value = "VECTOR_INDEX")]
44 VectorIndex,
45}
46
47impl From<TableType> for PbTableType {
48 fn from(table_type: TableType) -> Self {
49 match table_type {
50 TableType::Table => Self::Table,
51 TableType::MaterializedView => Self::MaterializedView,
52 TableType::Index => Self::Index,
53 TableType::Internal => Self::Internal,
54 TableType::VectorIndex => Self::VectorIndex,
55 }
56 }
57}
58
59impl From<PbTableType> for TableType {
60 fn from(table_type: PbTableType) -> Self {
61 match table_type {
62 PbTableType::Table => Self::Table,
63 PbTableType::MaterializedView => Self::MaterializedView,
64 PbTableType::Index => Self::Index,
65 PbTableType::Internal => Self::Internal,
66 PbTableType::VectorIndex => Self::VectorIndex,
67 PbTableType::Unspecified => unreachable!("Unspecified table type"),
68 }
69 }
70}
71
72#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
73#[sea_orm(rs_type = "String", db_type = "string(None)")]
74pub enum HandleConflictBehavior {
75 #[sea_orm(string_value = "OVERWRITE")]
76 Overwrite,
77 #[sea_orm(string_value = "IGNORE")]
78 Ignore,
79 #[sea_orm(string_value = "NO_CHECK")]
80 NoCheck,
81 #[sea_orm(string_value = "DO_UPDATE_IF_NOT_NULL")]
82 DoUpdateIfNotNull,
83}
84
85impl From<HandleConflictBehavior> for PbHandleConflictBehavior {
86 fn from(handle_conflict_behavior: HandleConflictBehavior) -> Self {
87 match handle_conflict_behavior {
88 HandleConflictBehavior::Overwrite => Self::Overwrite,
89 HandleConflictBehavior::Ignore => Self::Ignore,
90 HandleConflictBehavior::NoCheck => Self::NoCheck,
91 HandleConflictBehavior::DoUpdateIfNotNull => Self::DoUpdateIfNotNull,
92 }
93 }
94}
95
96impl From<PbHandleConflictBehavior> for HandleConflictBehavior {
97 fn from(handle_conflict_behavior: PbHandleConflictBehavior) -> Self {
98 match handle_conflict_behavior {
99 PbHandleConflictBehavior::Overwrite => Self::Overwrite,
100 PbHandleConflictBehavior::Ignore => Self::Ignore,
101 PbHandleConflictBehavior::NoCheck => Self::NoCheck,
102 PbHandleConflictBehavior::DoUpdateIfNotNull => Self::DoUpdateIfNotNull,
103 PbHandleConflictBehavior::Unspecified => {
104 unreachable!("Unspecified handle conflict behavior")
105 }
106 }
107 }
108}
109
110#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
111#[sea_orm(rs_type = "String", db_type = "string(None)")]
112pub enum Engine {
113 #[sea_orm(string_value = "HUMMOCK")]
114 Hummock,
115 #[sea_orm(string_value = "ICEBERG")]
116 Iceberg,
117}
118
119impl From<Engine> for PbEngine {
120 fn from(engine: Engine) -> Self {
121 match engine {
122 Engine::Hummock => Self::Hummock,
123 Engine::Iceberg => Self::Iceberg,
124 }
125 }
126}
127
128impl From<PbEngine> for Engine {
129 fn from(engine: PbEngine) -> Self {
130 match engine {
131 PbEngine::Hummock => Self::Hummock,
132 PbEngine::Iceberg => Self::Iceberg,
133 PbEngine::Unspecified => Self::Hummock,
134 }
135 }
136}
137
138#[derive(Clone, Debug, PartialEq, Eq, Hash, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
139#[sea_orm(rs_type = "String", db_type = "string(None)")]
140pub enum CdcTableType {
141 #[sea_orm(string_value = "UNSPECIFIED")]
142 Unspecified,
143 #[sea_orm(string_value = "POSTGRES")]
144 Postgres,
145 #[sea_orm(string_value = "MYSQL")]
146 Mysql,
147 #[sea_orm(string_value = "SQLSERVER")]
148 Sqlserver,
149 #[sea_orm(string_value = "MONGO")]
150 Mongo,
151 #[sea_orm(string_value = "CITUS")]
152 Citus,
153}
154
155impl From<CdcTableType> for PbCdcTableType {
156 fn from(cdc_table_type: CdcTableType) -> Self {
157 match cdc_table_type {
158 CdcTableType::Postgres => Self::Postgres,
159 CdcTableType::Mysql => Self::Mysql,
160 CdcTableType::Sqlserver => Self::Sqlserver,
161 CdcTableType::Mongo => Self::Mongo,
162 CdcTableType::Citus => Self::Citus,
163 CdcTableType::Unspecified => Self::Unspecified,
164 }
165 }
166}
167
168impl From<PbCdcTableType> for CdcTableType {
169 fn from(cdc_table_type: PbCdcTableType) -> Self {
170 match cdc_table_type {
171 PbCdcTableType::Postgres => Self::Postgres,
172 PbCdcTableType::Mysql => Self::Mysql,
173 PbCdcTableType::Sqlserver => Self::Sqlserver,
174 PbCdcTableType::Mongo => Self::Mongo,
175 PbCdcTableType::Citus => Self::Citus,
176 PbCdcTableType::Unspecified => Self::Unspecified,
177 }
178 }
179}
180
181crate::derive_from_blob!(VectorIndexInfo, PbVectorIndexInfo);
182
183#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
184#[sea_orm(table_name = "table")]
185pub struct Model {
186 #[sea_orm(primary_key, auto_increment = false)]
187 pub table_id: TableId,
188 pub name: String,
189 pub optional_associated_source_id: Option<SourceId>,
190 pub table_type: TableType,
191 pub belongs_to_job_id: Option<JobId>,
192 pub columns: ColumnCatalogArray,
193 pub pk: ColumnOrderArray,
194 pub distribution_key: I32Array,
195 pub stream_key: I32Array,
196 pub append_only: bool,
197 pub fragment_id: Option<FragmentId>,
198 pub vnode_col_index: Option<i32>,
199 pub row_id_index: Option<i32>,
200 pub value_indices: I32Array,
201 pub definition: String,
202 pub handle_pk_conflict_behavior: HandleConflictBehavior,
203 pub version_column_indices: Option<I32Array>,
204 pub read_prefix_len_hint: i32,
205 pub watermark_indices: I32Array,
206 pub dist_key_in_pk: I32Array,
207 pub dml_fragment_id: Option<FragmentId>,
208 pub cardinality: Option<Cardinality>,
209 pub cleaned_by_watermark: bool,
210 pub description: Option<String>,
211 pub version: Option<TableVersion>,
212 pub retention_seconds: Option<i32>,
213 pub cdc_table_id: Option<String>,
214 pub vnode_count: i32,
215 pub webhook_info: Option<WebhookSourceInfo>,
216 pub engine: Option<Engine>,
217 pub clean_watermark_index_in_pk: Option<i32>,
218 pub refreshable: bool,
219 pub vector_index_info: Option<VectorIndexInfo>,
220 pub cdc_table_type: Option<CdcTableType>,
221}
222
223#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
224pub enum Relation {
225 #[sea_orm(
226 belongs_to = "super::fragment::Entity",
227 from = "Column::DmlFragmentId",
228 to = "super::fragment::Column::FragmentId",
229 on_update = "NoAction",
230 on_delete = "NoAction"
231 )]
232 Fragment2,
233 #[sea_orm(
234 belongs_to = "super::fragment::Entity",
235 from = "Column::FragmentId",
236 to = "super::fragment::Column::FragmentId",
237 on_update = "NoAction",
238 on_delete = "NoAction"
239 )]
240 Fragment1,
241 #[sea_orm(
242 belongs_to = "super::object::Entity",
243 from = "Column::BelongsToJobId",
244 to = "super::object::Column::Oid",
245 on_update = "NoAction",
246 on_delete = "Cascade"
247 )]
248 Object2,
249 #[sea_orm(
250 belongs_to = "super::object::Entity",
251 from = "Column::TableId",
252 to = "super::object::Column::Oid",
253 on_update = "NoAction",
254 on_delete = "Cascade"
255 )]
256 Object1,
257 #[sea_orm(
258 belongs_to = "super::source::Entity",
259 from = "Column::OptionalAssociatedSourceId",
260 to = "super::source::Column::SourceId",
261 on_update = "NoAction",
262 on_delete = "NoAction"
263 )]
264 Source,
265
266 #[sea_orm(
268 belongs_to = "super::object_dependency::Entity",
269 from = "Column::TableId",
270 to = "super::object_dependency::Column::UsedBy",
271 on_update = "NoAction",
272 on_delete = "Cascade"
273 )]
274 ObjectDependency,
275}
276
277impl Related<super::object::Entity> for Entity {
278 fn to() -> RelationDef {
279 Relation::Object1.def()
280 }
281}
282
283impl Related<super::source::Entity> for Entity {
284 fn to() -> RelationDef {
285 Relation::Source.def()
286 }
287}
288
289impl ActiveModelBehavior for ActiveModel {}
290
291impl From<PbTable> for ActiveModel {
292 fn from(pb_table: PbTable) -> Self {
293 let table_type = pb_table.table_type();
294 let handle_pk_conflict_behavior = pb_table.handle_pk_conflict_behavior();
295
296 let vnode_count = pb_table
300 .vnode_count_inner()
301 .value_opt()
302 .map(|v| v as _)
303 .map_or(NotSet, Set);
304 let fragment_id = if pb_table.fragment_id == OBJECT_ID_PLACEHOLDER {
305 NotSet
306 } else {
307 Set(Some(pb_table.fragment_id))
308 };
309 let dml_fragment_id = pb_table
310 .dml_fragment_id
311 .map(|x| Set(Some(x)))
312 .unwrap_or_default();
313 let optional_associated_source_id =
314 if let Some(src_id) = pb_table.optional_associated_source_id {
315 Set(Some(src_id.into()))
316 } else {
317 NotSet
318 };
319
320 Self {
321 table_id: Set(pb_table.id),
322 name: Set(pb_table.name),
323 optional_associated_source_id,
324 table_type: Set(table_type.into()),
325 belongs_to_job_id: Set(pb_table.job_id),
326 columns: Set(pb_table.columns.into()),
327 pk: Set(pb_table.pk.into()),
328 distribution_key: Set(pb_table.distribution_key.into()),
329 stream_key: Set(pb_table.stream_key.into()),
330 append_only: Set(pb_table.append_only),
331 fragment_id,
332 vnode_col_index: Set(pb_table.vnode_col_index.map(|x| x as i32)),
333 row_id_index: Set(pb_table.row_id_index.map(|x| x as i32)),
334 value_indices: Set(pb_table.value_indices.into()),
335 definition: Set(pb_table.definition),
336 handle_pk_conflict_behavior: Set(handle_pk_conflict_behavior.into()),
337 version_column_indices: Set(Some(
338 pb_table
339 .version_column_indices
340 .iter()
341 .map(|x| *x as i32)
342 .collect::<Vec<_>>()
343 .into(),
344 )),
345 read_prefix_len_hint: Set(pb_table.read_prefix_len_hint as _),
346 watermark_indices: Set(pb_table.watermark_indices.into()),
347 dist_key_in_pk: Set(pb_table.dist_key_in_pk.into()),
348 dml_fragment_id,
349 cardinality: Set(pb_table.cardinality.as_ref().map(|x| x.into())),
350 cleaned_by_watermark: Set(pb_table.cleaned_by_watermark),
351 description: Set(pb_table.description),
352 version: Set(pb_table.version.as_ref().map(|v| v.into())),
353 retention_seconds: Set(pb_table.retention_seconds.map(|i| i as _)),
354 cdc_table_id: Set(pb_table.cdc_table_id),
355 vnode_count,
356 webhook_info: Set(pb_table.webhook_info.as_ref().map(WebhookSourceInfo::from)),
357 engine: Set(pb_table
358 .engine
359 .map(|engine| Engine::from(PbEngine::try_from(engine).expect("Invalid engine")))),
360 clean_watermark_index_in_pk: Set(pb_table.clean_watermark_index_in_pk),
361 refreshable: Set(pb_table.refreshable),
362 vector_index_info: Set(pb_table
363 .vector_index_info
364 .as_ref()
365 .map(VectorIndexInfo::from)),
366 cdc_table_type: Set(pb_table.cdc_table_type.map(|cdc_table_type| {
367 match cdc_table_type {
368 0 => CdcTableType::Postgres, 1 => CdcTableType::Postgres,
370 2 => CdcTableType::Mysql,
371 3 => CdcTableType::Sqlserver,
372 4 => CdcTableType::Mongo,
373 _ => panic!("Invalid CDC table type: {cdc_table_type}"),
374 }
375 })),
376 }
377 }
378}