1use risingwave_common::catalog::OBJECT_ID_PLACEHOLDER;
16use risingwave_common::hash::VnodeCountCompat;
17use risingwave_common::id::JobId;
18use risingwave_pb::catalog::table::{CdcTableType as PbCdcTableType, PbEngine, PbTableType};
19use risingwave_pb::catalog::{PbHandleConflictBehavior, PbTable, PbVectorIndexInfo};
20use sea_orm::ActiveValue::Set;
21use sea_orm::NotSet;
22use sea_orm::entity::prelude::*;
23use serde::{Deserialize, Serialize};
24
25use crate::{
26 Cardinality, ColumnCatalogArray, ColumnOrderArray, FragmentId, I32Array, SourceId, TableId,
27 TableVersion, WebhookSourceInfo,
28};
29
30#[derive(
31 Clone, Debug, PartialEq, Hash, Copy, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize,
32)]
33#[sea_orm(rs_type = "String", db_type = "string(None)")]
34pub enum TableType {
35 #[sea_orm(string_value = "TABLE")]
36 Table,
37 #[sea_orm(string_value = "MATERIALIZED_VIEW")]
38 MaterializedView,
39 #[sea_orm(string_value = "INDEX")]
40 Index,
41 #[sea_orm(string_value = "INTERNAL")]
42 Internal,
43 #[sea_orm(string_value = "VECTOR_INDEX")]
44 VectorIndex,
45}
46
47impl From<TableType> for PbTableType {
48 fn from(table_type: TableType) -> Self {
49 match table_type {
50 TableType::Table => Self::Table,
51 TableType::MaterializedView => Self::MaterializedView,
52 TableType::Index => Self::Index,
53 TableType::Internal => Self::Internal,
54 TableType::VectorIndex => Self::VectorIndex,
55 }
56 }
57}
58
59impl From<PbTableType> for TableType {
60 fn from(table_type: PbTableType) -> Self {
61 match table_type {
62 PbTableType::Table => Self::Table,
63 PbTableType::MaterializedView => Self::MaterializedView,
64 PbTableType::Index => Self::Index,
65 PbTableType::Internal => Self::Internal,
66 PbTableType::VectorIndex => Self::VectorIndex,
67 PbTableType::Unspecified => unreachable!("Unspecified table type"),
68 }
69 }
70}
71
72#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
73#[sea_orm(rs_type = "String", db_type = "string(None)")]
74pub enum HandleConflictBehavior {
75 #[sea_orm(string_value = "OVERWRITE")]
76 Overwrite,
77 #[sea_orm(string_value = "IGNORE")]
78 Ignore,
79 #[sea_orm(string_value = "NO_CHECK")]
80 NoCheck,
81 #[sea_orm(string_value = "DO_UPDATE_IF_NOT_NULL")]
82 DoUpdateIfNotNull,
83}
84
85impl From<HandleConflictBehavior> for PbHandleConflictBehavior {
86 fn from(handle_conflict_behavior: HandleConflictBehavior) -> Self {
87 match handle_conflict_behavior {
88 HandleConflictBehavior::Overwrite => Self::Overwrite,
89 HandleConflictBehavior::Ignore => Self::Ignore,
90 HandleConflictBehavior::NoCheck => Self::NoCheck,
91 HandleConflictBehavior::DoUpdateIfNotNull => Self::DoUpdateIfNotNull,
92 }
93 }
94}
95
96impl From<PbHandleConflictBehavior> for HandleConflictBehavior {
97 fn from(handle_conflict_behavior: PbHandleConflictBehavior) -> Self {
98 match handle_conflict_behavior {
99 PbHandleConflictBehavior::Overwrite => Self::Overwrite,
100 PbHandleConflictBehavior::Ignore => Self::Ignore,
101 PbHandleConflictBehavior::NoCheck => Self::NoCheck,
102 PbHandleConflictBehavior::DoUpdateIfNotNull => Self::DoUpdateIfNotNull,
103 PbHandleConflictBehavior::Unspecified => {
104 unreachable!("Unspecified handle conflict behavior")
105 }
106 }
107 }
108}
109
110#[derive(Clone, Copy, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
111#[sea_orm(rs_type = "String", db_type = "string(None)")]
112pub enum RefreshState {
113 #[sea_orm(string_value = "IDLE")]
115 Idle,
116 #[sea_orm(string_value = "REFRESHING")]
118 Refreshing,
119 #[sea_orm(string_value = "FINISHING")]
121 Finishing,
122}
123
124impl From<RefreshState> for risingwave_pb::catalog::RefreshState {
125 fn from(refresh_state: RefreshState) -> Self {
126 match refresh_state {
127 RefreshState::Idle => Self::Idle,
128 RefreshState::Refreshing => Self::Refreshing,
129 RefreshState::Finishing => Self::Finishing,
130 }
131 }
132}
133
134impl From<risingwave_pb::catalog::RefreshState> for RefreshState {
135 fn from(refresh_state: risingwave_pb::catalog::RefreshState) -> Self {
136 match refresh_state {
137 risingwave_pb::catalog::RefreshState::Idle => Self::Idle,
138 risingwave_pb::catalog::RefreshState::Refreshing => Self::Refreshing,
139 risingwave_pb::catalog::RefreshState::Finishing => Self::Finishing,
140 risingwave_pb::catalog::RefreshState::Unspecified => {
141 unreachable!("Unspecified refresh state")
142 }
143 }
144 }
145}
146
147impl std::fmt::Display for RefreshState {
148 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
149 match self {
150 RefreshState::Idle => write!(f, "IDLE"),
151 RefreshState::Refreshing => write!(f, "REFRESHING"),
152 RefreshState::Finishing => write!(f, "FINISHING"),
153 }
154 }
155}
156
157#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
158#[sea_orm(rs_type = "String", db_type = "string(None)")]
159pub enum Engine {
160 #[sea_orm(string_value = "HUMMOCK")]
161 Hummock,
162 #[sea_orm(string_value = "ICEBERG")]
163 Iceberg,
164}
165
166impl From<Engine> for PbEngine {
167 fn from(engine: Engine) -> Self {
168 match engine {
169 Engine::Hummock => Self::Hummock,
170 Engine::Iceberg => Self::Iceberg,
171 }
172 }
173}
174
175impl From<PbEngine> for Engine {
176 fn from(engine: PbEngine) -> Self {
177 match engine {
178 PbEngine::Hummock => Self::Hummock,
179 PbEngine::Iceberg => Self::Iceberg,
180 PbEngine::Unspecified => Self::Hummock,
181 }
182 }
183}
184
185#[derive(Clone, Debug, PartialEq, Eq, Hash, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
186#[sea_orm(rs_type = "String", db_type = "string(None)")]
187pub enum CdcTableType {
188 #[sea_orm(string_value = "UNSPECIFIED")]
189 Unspecified,
190 #[sea_orm(string_value = "POSTGRES")]
191 Postgres,
192 #[sea_orm(string_value = "MYSQL")]
193 Mysql,
194 #[sea_orm(string_value = "SQLSERVER")]
195 Sqlserver,
196 #[sea_orm(string_value = "MONGO")]
197 Mongo,
198 #[sea_orm(string_value = "CITUS")]
199 Citus,
200}
201
202impl From<CdcTableType> for PbCdcTableType {
203 fn from(cdc_table_type: CdcTableType) -> Self {
204 match cdc_table_type {
205 CdcTableType::Postgres => Self::Postgres,
206 CdcTableType::Mysql => Self::Mysql,
207 CdcTableType::Sqlserver => Self::Sqlserver,
208 CdcTableType::Mongo => Self::Mongo,
209 CdcTableType::Citus => Self::Citus,
210 CdcTableType::Unspecified => Self::Unspecified,
211 }
212 }
213}
214
215impl From<PbCdcTableType> for CdcTableType {
216 fn from(cdc_table_type: PbCdcTableType) -> Self {
217 match cdc_table_type {
218 PbCdcTableType::Postgres => Self::Postgres,
219 PbCdcTableType::Mysql => Self::Mysql,
220 PbCdcTableType::Sqlserver => Self::Sqlserver,
221 PbCdcTableType::Mongo => Self::Mongo,
222 PbCdcTableType::Citus => Self::Citus,
223 PbCdcTableType::Unspecified => Self::Unspecified,
224 }
225 }
226}
227
228crate::derive_from_blob!(VectorIndexInfo, PbVectorIndexInfo);
229
230#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
231#[sea_orm(table_name = "table")]
232pub struct Model {
233 #[sea_orm(primary_key, auto_increment = false)]
234 pub table_id: TableId,
235 pub name: String,
236 pub optional_associated_source_id: Option<SourceId>,
237 pub table_type: TableType,
238 pub belongs_to_job_id: Option<JobId>,
239 pub columns: ColumnCatalogArray,
240 pub pk: ColumnOrderArray,
241 pub distribution_key: I32Array,
242 pub stream_key: I32Array,
243 pub append_only: bool,
244 pub fragment_id: Option<FragmentId>,
245 pub vnode_col_index: Option<i32>,
246 pub row_id_index: Option<i32>,
247 pub value_indices: I32Array,
248 pub definition: String,
249 pub handle_pk_conflict_behavior: HandleConflictBehavior,
250 pub version_column_indices: Option<I32Array>,
251 pub read_prefix_len_hint: i32,
252 pub watermark_indices: I32Array,
253 pub dist_key_in_pk: I32Array,
254 pub dml_fragment_id: Option<FragmentId>,
255 pub cardinality: Option<Cardinality>,
256 pub cleaned_by_watermark: bool,
257 pub description: Option<String>,
258 pub version: Option<TableVersion>,
259 pub retention_seconds: Option<i32>,
260 pub cdc_table_id: Option<String>,
261 pub vnode_count: i32,
262 pub webhook_info: Option<WebhookSourceInfo>,
263 pub engine: Option<Engine>,
264 pub clean_watermark_index_in_pk: Option<i32>,
265 pub refreshable: bool,
266 pub vector_index_info: Option<VectorIndexInfo>,
267 pub cdc_table_type: Option<CdcTableType>,
268 pub refresh_state: Option<RefreshState>,
269}
270
271#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
272pub enum Relation {
273 #[sea_orm(
274 belongs_to = "super::fragment::Entity",
275 from = "Column::DmlFragmentId",
276 to = "super::fragment::Column::FragmentId",
277 on_update = "NoAction",
278 on_delete = "NoAction"
279 )]
280 Fragment2,
281 #[sea_orm(
282 belongs_to = "super::fragment::Entity",
283 from = "Column::FragmentId",
284 to = "super::fragment::Column::FragmentId",
285 on_update = "NoAction",
286 on_delete = "NoAction"
287 )]
288 Fragment1,
289 #[sea_orm(
290 belongs_to = "super::object::Entity",
291 from = "Column::BelongsToJobId",
292 to = "super::object::Column::Oid",
293 on_update = "NoAction",
294 on_delete = "Cascade"
295 )]
296 Object2,
297 #[sea_orm(
298 belongs_to = "super::object::Entity",
299 from = "Column::TableId",
300 to = "super::object::Column::Oid",
301 on_update = "NoAction",
302 on_delete = "Cascade"
303 )]
304 Object1,
305 #[sea_orm(
306 belongs_to = "super::source::Entity",
307 from = "Column::OptionalAssociatedSourceId",
308 to = "super::source::Column::SourceId",
309 on_update = "NoAction",
310 on_delete = "NoAction"
311 )]
312 Source,
313
314 #[sea_orm(
316 belongs_to = "super::object_dependency::Entity",
317 from = "Column::TableId",
318 to = "super::object_dependency::Column::UsedBy",
319 on_update = "NoAction",
320 on_delete = "Cascade"
321 )]
322 ObjectDependency,
323}
324
325impl Related<super::object::Entity> for Entity {
326 fn to() -> RelationDef {
327 Relation::Object1.def()
328 }
329}
330
331impl Related<super::source::Entity> for Entity {
332 fn to() -> RelationDef {
333 Relation::Source.def()
334 }
335}
336
337impl ActiveModelBehavior for ActiveModel {}
338
339impl From<PbTable> for ActiveModel {
340 fn from(pb_table: PbTable) -> Self {
341 let table_type = pb_table.table_type();
342 let handle_pk_conflict_behavior = pb_table.handle_pk_conflict_behavior();
343 let refresh_state = pb_table.refresh_state();
344
345 let vnode_count = pb_table
349 .vnode_count_inner()
350 .value_opt()
351 .map(|v| v as _)
352 .map_or(NotSet, Set);
353 let fragment_id = if pb_table.fragment_id == OBJECT_ID_PLACEHOLDER {
354 NotSet
355 } else {
356 Set(Some(pb_table.fragment_id))
357 };
358 let dml_fragment_id = pb_table
359 .dml_fragment_id
360 .map(|x| Set(Some(x)))
361 .unwrap_or_default();
362 let optional_associated_source_id =
363 if let Some(src_id) = pb_table.optional_associated_source_id {
364 Set(Some(src_id.into()))
365 } else {
366 NotSet
367 };
368
369 Self {
370 table_id: Set(pb_table.id),
371 name: Set(pb_table.name),
372 optional_associated_source_id,
373 table_type: Set(table_type.into()),
374 belongs_to_job_id: Set(pb_table.job_id),
375 columns: Set(pb_table.columns.into()),
376 pk: Set(pb_table.pk.into()),
377 distribution_key: Set(pb_table.distribution_key.into()),
378 stream_key: Set(pb_table.stream_key.into()),
379 append_only: Set(pb_table.append_only),
380 fragment_id,
381 vnode_col_index: Set(pb_table.vnode_col_index.map(|x| x as i32)),
382 row_id_index: Set(pb_table.row_id_index.map(|x| x as i32)),
383 value_indices: Set(pb_table.value_indices.into()),
384 definition: Set(pb_table.definition),
385 handle_pk_conflict_behavior: Set(handle_pk_conflict_behavior.into()),
386 version_column_indices: Set(Some(
387 pb_table
388 .version_column_indices
389 .iter()
390 .map(|x| *x as i32)
391 .collect::<Vec<_>>()
392 .into(),
393 )),
394 read_prefix_len_hint: Set(pb_table.read_prefix_len_hint as _),
395 watermark_indices: Set(pb_table.watermark_indices.into()),
396 dist_key_in_pk: Set(pb_table.dist_key_in_pk.into()),
397 dml_fragment_id,
398 cardinality: Set(pb_table.cardinality.as_ref().map(|x| x.into())),
399 cleaned_by_watermark: Set(pb_table.cleaned_by_watermark),
400 description: Set(pb_table.description),
401 version: Set(pb_table.version.as_ref().map(|v| v.into())),
402 retention_seconds: Set(pb_table.retention_seconds.map(|i| i as _)),
403 cdc_table_id: Set(pb_table.cdc_table_id),
404 vnode_count,
405 webhook_info: Set(pb_table.webhook_info.as_ref().map(WebhookSourceInfo::from)),
406 engine: Set(pb_table
407 .engine
408 .map(|engine| Engine::from(PbEngine::try_from(engine).expect("Invalid engine")))),
409 clean_watermark_index_in_pk: Set(pb_table.clean_watermark_index_in_pk),
410 refreshable: Set(pb_table.refreshable),
411 vector_index_info: Set(pb_table
412 .vector_index_info
413 .as_ref()
414 .map(VectorIndexInfo::from)),
415 cdc_table_type: Set(pb_table.cdc_table_type.map(|cdc_table_type| {
416 match cdc_table_type {
417 0 => CdcTableType::Postgres, 1 => CdcTableType::Postgres,
419 2 => CdcTableType::Mysql,
420 3 => CdcTableType::Sqlserver,
421 4 => CdcTableType::Mongo,
422 _ => panic!("Invalid CDC table type: {cdc_table_type}"),
423 }
424 })),
425 refresh_state: Set(Some(refresh_state.into())),
426 }
427 }
428}