1use risingwave_common::catalog::OBJECT_ID_PLACEHOLDER;
16use risingwave_common::hash::VnodeCountCompat;
17use risingwave_pb::catalog::table::{
18 CdcTableType as PbCdcTableType, OptionalAssociatedSourceId, PbEngine, PbTableType,
19};
20use risingwave_pb::catalog::{PbHandleConflictBehavior, PbTable, PbVectorIndexInfo};
21use sea_orm::ActiveValue::Set;
22use sea_orm::NotSet;
23use sea_orm::entity::prelude::*;
24use serde::{Deserialize, Serialize};
25
26use crate::{
27 Cardinality, ColumnCatalogArray, ColumnOrderArray, FragmentId, I32Array, ObjectId, SourceId,
28 TableId, TableVersion, WebhookSourceInfo,
29};
30
31#[derive(
32 Clone, Debug, PartialEq, Hash, Copy, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize,
33)]
34#[sea_orm(rs_type = "String", db_type = "string(None)")]
35pub enum TableType {
36 #[sea_orm(string_value = "TABLE")]
37 Table,
38 #[sea_orm(string_value = "MATERIALIZED_VIEW")]
39 MaterializedView,
40 #[sea_orm(string_value = "INDEX")]
41 Index,
42 #[sea_orm(string_value = "INTERNAL")]
43 Internal,
44 #[sea_orm(string_value = "VECTOR_INDEX")]
45 VectorIndex,
46}
47
48impl From<TableType> for PbTableType {
49 fn from(table_type: TableType) -> Self {
50 match table_type {
51 TableType::Table => Self::Table,
52 TableType::MaterializedView => Self::MaterializedView,
53 TableType::Index => Self::Index,
54 TableType::Internal => Self::Internal,
55 TableType::VectorIndex => Self::VectorIndex,
56 }
57 }
58}
59
60impl From<PbTableType> for TableType {
61 fn from(table_type: PbTableType) -> Self {
62 match table_type {
63 PbTableType::Table => Self::Table,
64 PbTableType::MaterializedView => Self::MaterializedView,
65 PbTableType::Index => Self::Index,
66 PbTableType::Internal => Self::Internal,
67 PbTableType::VectorIndex => Self::VectorIndex,
68 PbTableType::Unspecified => unreachable!("Unspecified table type"),
69 }
70 }
71}
72
73#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
74#[sea_orm(rs_type = "String", db_type = "string(None)")]
75pub enum HandleConflictBehavior {
76 #[sea_orm(string_value = "OVERWRITE")]
77 Overwrite,
78 #[sea_orm(string_value = "IGNORE")]
79 Ignore,
80 #[sea_orm(string_value = "NO_CHECK")]
81 NoCheck,
82 #[sea_orm(string_value = "DO_UPDATE_IF_NOT_NULL")]
83 DoUpdateIfNotNull,
84}
85
86impl From<HandleConflictBehavior> for PbHandleConflictBehavior {
87 fn from(handle_conflict_behavior: HandleConflictBehavior) -> Self {
88 match handle_conflict_behavior {
89 HandleConflictBehavior::Overwrite => Self::Overwrite,
90 HandleConflictBehavior::Ignore => Self::Ignore,
91 HandleConflictBehavior::NoCheck => Self::NoCheck,
92 HandleConflictBehavior::DoUpdateIfNotNull => Self::DoUpdateIfNotNull,
93 }
94 }
95}
96
97impl From<PbHandleConflictBehavior> for HandleConflictBehavior {
98 fn from(handle_conflict_behavior: PbHandleConflictBehavior) -> Self {
99 match handle_conflict_behavior {
100 PbHandleConflictBehavior::Overwrite => Self::Overwrite,
101 PbHandleConflictBehavior::Ignore => Self::Ignore,
102 PbHandleConflictBehavior::NoCheck => Self::NoCheck,
103 PbHandleConflictBehavior::DoUpdateIfNotNull => Self::DoUpdateIfNotNull,
104 PbHandleConflictBehavior::Unspecified => {
105 unreachable!("Unspecified handle conflict behavior")
106 }
107 }
108 }
109}
110
111#[derive(Clone, Copy, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
112#[sea_orm(rs_type = "String", db_type = "string(None)")]
113pub enum RefreshState {
114 #[sea_orm(string_value = "IDLE")]
116 Idle,
117 #[sea_orm(string_value = "REFRESHING")]
119 Refreshing,
120 #[sea_orm(string_value = "FINISHING")]
122 Finishing,
123}
124
125impl From<RefreshState> for risingwave_pb::catalog::RefreshState {
126 fn from(refresh_state: RefreshState) -> Self {
127 match refresh_state {
128 RefreshState::Idle => Self::Idle,
129 RefreshState::Refreshing => Self::Refreshing,
130 RefreshState::Finishing => Self::Finishing,
131 }
132 }
133}
134
135impl From<risingwave_pb::catalog::RefreshState> for RefreshState {
136 fn from(refresh_state: risingwave_pb::catalog::RefreshState) -> Self {
137 match refresh_state {
138 risingwave_pb::catalog::RefreshState::Idle => Self::Idle,
139 risingwave_pb::catalog::RefreshState::Refreshing => Self::Refreshing,
140 risingwave_pb::catalog::RefreshState::Finishing => Self::Finishing,
141 risingwave_pb::catalog::RefreshState::Unspecified => {
142 unreachable!("Unspecified refresh state")
143 }
144 }
145 }
146}
147
148impl std::fmt::Display for RefreshState {
149 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
150 match self {
151 RefreshState::Idle => write!(f, "IDLE"),
152 RefreshState::Refreshing => write!(f, "REFRESHING"),
153 RefreshState::Finishing => write!(f, "FINISHING"),
154 }
155 }
156}
157
158#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
159#[sea_orm(rs_type = "String", db_type = "string(None)")]
160pub enum Engine {
161 #[sea_orm(string_value = "HUMMOCK")]
162 Hummock,
163 #[sea_orm(string_value = "ICEBERG")]
164 Iceberg,
165}
166
167impl From<Engine> for PbEngine {
168 fn from(engine: Engine) -> Self {
169 match engine {
170 Engine::Hummock => Self::Hummock,
171 Engine::Iceberg => Self::Iceberg,
172 }
173 }
174}
175
176impl From<PbEngine> for Engine {
177 fn from(engine: PbEngine) -> Self {
178 match engine {
179 PbEngine::Hummock => Self::Hummock,
180 PbEngine::Iceberg => Self::Iceberg,
181 PbEngine::Unspecified => Self::Hummock,
182 }
183 }
184}
185
186#[derive(Clone, Debug, PartialEq, Eq, Hash, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
187#[sea_orm(rs_type = "String", db_type = "string(None)")]
188pub enum CdcTableType {
189 #[sea_orm(string_value = "UNSPECIFIED")]
190 Unspecified,
191 #[sea_orm(string_value = "POSTGRES")]
192 Postgres,
193 #[sea_orm(string_value = "MYSQL")]
194 Mysql,
195 #[sea_orm(string_value = "SQLSERVER")]
196 Sqlserver,
197 #[sea_orm(string_value = "MONGO")]
198 Mongo,
199 #[sea_orm(string_value = "CITUS")]
200 Citus,
201}
202
203impl From<CdcTableType> for PbCdcTableType {
204 fn from(cdc_table_type: CdcTableType) -> Self {
205 match cdc_table_type {
206 CdcTableType::Postgres => Self::Postgres,
207 CdcTableType::Mysql => Self::Mysql,
208 CdcTableType::Sqlserver => Self::Sqlserver,
209 CdcTableType::Mongo => Self::Mongo,
210 CdcTableType::Citus => Self::Citus,
211 CdcTableType::Unspecified => Self::Unspecified,
212 }
213 }
214}
215
216impl From<PbCdcTableType> for CdcTableType {
217 fn from(cdc_table_type: PbCdcTableType) -> Self {
218 match cdc_table_type {
219 PbCdcTableType::Postgres => Self::Postgres,
220 PbCdcTableType::Mysql => Self::Mysql,
221 PbCdcTableType::Sqlserver => Self::Sqlserver,
222 PbCdcTableType::Mongo => Self::Mongo,
223 PbCdcTableType::Citus => Self::Citus,
224 PbCdcTableType::Unspecified => Self::Unspecified,
225 }
226 }
227}
228
229crate::derive_from_blob!(VectorIndexInfo, PbVectorIndexInfo);
230
231#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
232#[sea_orm(table_name = "table")]
233pub struct Model {
234 #[sea_orm(primary_key, auto_increment = false)]
235 pub table_id: TableId,
236 pub name: String,
237 pub optional_associated_source_id: Option<SourceId>,
238 pub table_type: TableType,
239 pub belongs_to_job_id: Option<ObjectId>,
240 pub columns: ColumnCatalogArray,
241 pub pk: ColumnOrderArray,
242 pub distribution_key: I32Array,
243 pub stream_key: I32Array,
244 pub append_only: bool,
245 pub fragment_id: Option<FragmentId>,
246 pub vnode_col_index: Option<i32>,
247 pub row_id_index: Option<i32>,
248 pub value_indices: I32Array,
249 pub definition: String,
250 pub handle_pk_conflict_behavior: HandleConflictBehavior,
251 pub version_column_indices: Option<I32Array>,
252 pub read_prefix_len_hint: i32,
253 pub watermark_indices: I32Array,
254 pub dist_key_in_pk: I32Array,
255 pub dml_fragment_id: Option<FragmentId>,
256 pub cardinality: Option<Cardinality>,
257 pub cleaned_by_watermark: bool,
258 pub description: Option<String>,
259 pub version: Option<TableVersion>,
260 pub retention_seconds: Option<i32>,
261 pub cdc_table_id: Option<String>,
262 pub vnode_count: i32,
263 pub webhook_info: Option<WebhookSourceInfo>,
264 pub engine: Option<Engine>,
265 pub clean_watermark_index_in_pk: Option<i32>,
266 pub refreshable: bool,
267 pub vector_index_info: Option<VectorIndexInfo>,
268 pub cdc_table_type: Option<CdcTableType>,
269 pub refresh_state: Option<RefreshState>,
270}
271
272#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
273pub enum Relation {
274 #[sea_orm(
275 belongs_to = "super::fragment::Entity",
276 from = "Column::DmlFragmentId",
277 to = "super::fragment::Column::FragmentId",
278 on_update = "NoAction",
279 on_delete = "NoAction"
280 )]
281 Fragment2,
282 #[sea_orm(
283 belongs_to = "super::fragment::Entity",
284 from = "Column::FragmentId",
285 to = "super::fragment::Column::FragmentId",
286 on_update = "NoAction",
287 on_delete = "NoAction"
288 )]
289 Fragment1,
290 #[sea_orm(
291 belongs_to = "super::object::Entity",
292 from = "Column::BelongsToJobId",
293 to = "super::object::Column::Oid",
294 on_update = "NoAction",
295 on_delete = "Cascade"
296 )]
297 Object2,
298 #[sea_orm(
299 belongs_to = "super::object::Entity",
300 from = "Column::TableId",
301 to = "super::object::Column::Oid",
302 on_update = "NoAction",
303 on_delete = "Cascade"
304 )]
305 Object1,
306 #[sea_orm(
307 belongs_to = "super::source::Entity",
308 from = "Column::OptionalAssociatedSourceId",
309 to = "super::source::Column::SourceId",
310 on_update = "NoAction",
311 on_delete = "NoAction"
312 )]
313 Source,
314
315 #[sea_orm(
317 belongs_to = "super::object_dependency::Entity",
318 from = "Column::TableId",
319 to = "super::object_dependency::Column::UsedBy",
320 on_update = "NoAction",
321 on_delete = "Cascade"
322 )]
323 ObjectDependency,
324}
325
326impl Related<super::object::Entity> for Entity {
327 fn to() -> RelationDef {
328 Relation::Object1.def()
329 }
330}
331
332impl Related<super::source::Entity> for Entity {
333 fn to() -> RelationDef {
334 Relation::Source.def()
335 }
336}
337
338impl ActiveModelBehavior for ActiveModel {}
339
340impl From<PbTable> for ActiveModel {
341 fn from(pb_table: PbTable) -> Self {
342 let table_type = pb_table.table_type();
343 let handle_pk_conflict_behavior = pb_table.handle_pk_conflict_behavior();
344 let refresh_state = pb_table.refresh_state();
345
346 let vnode_count = pb_table
350 .vnode_count_inner()
351 .value_opt()
352 .map(|v| v as _)
353 .map_or(NotSet, Set);
354 let fragment_id = if pb_table.fragment_id == OBJECT_ID_PLACEHOLDER {
355 NotSet
356 } else {
357 Set(Some(pb_table.fragment_id as FragmentId))
358 };
359 let dml_fragment_id = pb_table
360 .dml_fragment_id
361 .map(|x| Set(Some(x as FragmentId)))
362 .unwrap_or_default();
363 let optional_associated_source_id =
364 if let Some(OptionalAssociatedSourceId::AssociatedSourceId(src_id)) =
365 pb_table.optional_associated_source_id
366 {
367 Set(Some(src_id as SourceId))
368 } else {
369 NotSet
370 };
371
372 Self {
373 table_id: Set(pb_table.id as _),
374 name: Set(pb_table.name),
375 optional_associated_source_id,
376 table_type: Set(table_type.into()),
377 belongs_to_job_id: Set(pb_table.job_id.map(|x| x as _)),
378 columns: Set(pb_table.columns.into()),
379 pk: Set(pb_table.pk.into()),
380 distribution_key: Set(pb_table.distribution_key.into()),
381 stream_key: Set(pb_table.stream_key.into()),
382 append_only: Set(pb_table.append_only),
383 fragment_id,
384 vnode_col_index: Set(pb_table.vnode_col_index.map(|x| x as i32)),
385 row_id_index: Set(pb_table.row_id_index.map(|x| x as i32)),
386 value_indices: Set(pb_table.value_indices.into()),
387 definition: Set(pb_table.definition),
388 handle_pk_conflict_behavior: Set(handle_pk_conflict_behavior.into()),
389 version_column_indices: Set(Some(
390 pb_table
391 .version_column_indices
392 .iter()
393 .map(|x| *x as i32)
394 .collect::<Vec<_>>()
395 .into(),
396 )),
397 read_prefix_len_hint: Set(pb_table.read_prefix_len_hint as _),
398 watermark_indices: Set(pb_table.watermark_indices.into()),
399 dist_key_in_pk: Set(pb_table.dist_key_in_pk.into()),
400 dml_fragment_id,
401 cardinality: Set(pb_table.cardinality.as_ref().map(|x| x.into())),
402 cleaned_by_watermark: Set(pb_table.cleaned_by_watermark),
403 description: Set(pb_table.description),
404 version: Set(pb_table.version.as_ref().map(|v| v.into())),
405 retention_seconds: Set(pb_table.retention_seconds.map(|i| i as _)),
406 cdc_table_id: Set(pb_table.cdc_table_id),
407 vnode_count,
408 webhook_info: Set(pb_table.webhook_info.as_ref().map(WebhookSourceInfo::from)),
409 engine: Set(pb_table
410 .engine
411 .map(|engine| Engine::from(PbEngine::try_from(engine).expect("Invalid engine")))),
412 clean_watermark_index_in_pk: Set(pb_table.clean_watermark_index_in_pk),
413 refreshable: Set(pb_table.refreshable),
414 vector_index_info: Set(pb_table
415 .vector_index_info
416 .as_ref()
417 .map(VectorIndexInfo::from)),
418 cdc_table_type: Set(pb_table.cdc_table_type.map(|cdc_table_type| {
419 match cdc_table_type {
420 0 => CdcTableType::Postgres, 1 => CdcTableType::Postgres,
422 2 => CdcTableType::Mysql,
423 3 => CdcTableType::Sqlserver,
424 4 => CdcTableType::Mongo,
425 _ => panic!("Invalid CDC table type: {cdc_table_type}"),
426 }
427 })),
428 refresh_state: Set(Some(refresh_state.into())),
429 }
430 }
431}