1use risingwave_common::catalog::OBJECT_ID_PLACEHOLDER;
16use risingwave_common::hash::VnodeCountCompat;
17use risingwave_pb::catalog::table::{
18 CdcTableType as PbCdcTableType, OptionalAssociatedSourceId, PbEngine, PbTableType,
19};
20use risingwave_pb::catalog::{PbHandleConflictBehavior, PbTable, PbVectorIndexInfo};
21use sea_orm::ActiveValue::Set;
22use sea_orm::NotSet;
23use sea_orm::entity::prelude::*;
24use serde::{Deserialize, Serialize};
25
26use crate::{
27 Cardinality, ColumnCatalogArray, ColumnOrderArray, FragmentId, I32Array, ObjectId, SourceId,
28 TableId, TableVersion, WebhookSourceInfo,
29};
30
31#[derive(
32 Clone, Debug, PartialEq, Hash, Copy, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize,
33)]
34#[sea_orm(rs_type = "String", db_type = "string(None)")]
35pub enum TableType {
36 #[sea_orm(string_value = "TABLE")]
37 Table,
38 #[sea_orm(string_value = "MATERIALIZED_VIEW")]
39 MaterializedView,
40 #[sea_orm(string_value = "INDEX")]
41 Index,
42 #[sea_orm(string_value = "INTERNAL")]
43 Internal,
44 #[sea_orm(string_value = "VECTOR_INDEX")]
45 VectorIndex,
46}
47
48impl From<TableType> for PbTableType {
49 fn from(table_type: TableType) -> Self {
50 match table_type {
51 TableType::Table => Self::Table,
52 TableType::MaterializedView => Self::MaterializedView,
53 TableType::Index => Self::Index,
54 TableType::Internal => Self::Internal,
55 TableType::VectorIndex => Self::VectorIndex,
56 }
57 }
58}
59
60impl From<PbTableType> for TableType {
61 fn from(table_type: PbTableType) -> Self {
62 match table_type {
63 PbTableType::Table => Self::Table,
64 PbTableType::MaterializedView => Self::MaterializedView,
65 PbTableType::Index => Self::Index,
66 PbTableType::Internal => Self::Internal,
67 PbTableType::VectorIndex => Self::VectorIndex,
68 PbTableType::Unspecified => unreachable!("Unspecified table type"),
69 }
70 }
71}
72
73#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
74#[sea_orm(rs_type = "String", db_type = "string(None)")]
75pub enum HandleConflictBehavior {
76 #[sea_orm(string_value = "OVERWRITE")]
77 Overwrite,
78 #[sea_orm(string_value = "IGNORE")]
79 Ignore,
80 #[sea_orm(string_value = "NO_CHECK")]
81 NoCheck,
82 #[sea_orm(string_value = "DO_UPDATE_IF_NOT_NULL")]
83 DoUpdateIfNotNull,
84}
85
86impl From<HandleConflictBehavior> for PbHandleConflictBehavior {
87 fn from(handle_conflict_behavior: HandleConflictBehavior) -> Self {
88 match handle_conflict_behavior {
89 HandleConflictBehavior::Overwrite => Self::Overwrite,
90 HandleConflictBehavior::Ignore => Self::Ignore,
91 HandleConflictBehavior::NoCheck => Self::NoCheck,
92 HandleConflictBehavior::DoUpdateIfNotNull => Self::DoUpdateIfNotNull,
93 }
94 }
95}
96
97impl From<PbHandleConflictBehavior> for HandleConflictBehavior {
98 fn from(handle_conflict_behavior: PbHandleConflictBehavior) -> Self {
99 match handle_conflict_behavior {
100 PbHandleConflictBehavior::Overwrite => Self::Overwrite,
101 PbHandleConflictBehavior::Ignore => Self::Ignore,
102 PbHandleConflictBehavior::NoCheck => Self::NoCheck,
103 PbHandleConflictBehavior::DoUpdateIfNotNull => Self::DoUpdateIfNotNull,
104 PbHandleConflictBehavior::Unspecified => {
105 unreachable!("Unspecified handle conflict behavior")
106 }
107 }
108 }
109}
110
111#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
112#[sea_orm(rs_type = "String", db_type = "string(None)")]
113pub enum Engine {
114 #[sea_orm(string_value = "HUMMOCK")]
115 Hummock,
116 #[sea_orm(string_value = "ICEBERG")]
117 Iceberg,
118}
119
120impl From<Engine> for PbEngine {
121 fn from(engine: Engine) -> Self {
122 match engine {
123 Engine::Hummock => Self::Hummock,
124 Engine::Iceberg => Self::Iceberg,
125 }
126 }
127}
128
129impl From<PbEngine> for Engine {
130 fn from(engine: PbEngine) -> Self {
131 match engine {
132 PbEngine::Hummock => Self::Hummock,
133 PbEngine::Iceberg => Self::Iceberg,
134 PbEngine::Unspecified => Self::Hummock,
135 }
136 }
137}
138
139#[derive(Clone, Debug, PartialEq, Eq, Hash, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
140#[sea_orm(rs_type = "String", db_type = "string(None)")]
141pub enum CdcTableType {
142 #[sea_orm(string_value = "UNSPECIFIED")]
143 Unspecified,
144 #[sea_orm(string_value = "POSTGRES")]
145 Postgres,
146 #[sea_orm(string_value = "MYSQL")]
147 Mysql,
148 #[sea_orm(string_value = "SQLSERVER")]
149 Sqlserver,
150 #[sea_orm(string_value = "MONGO")]
151 Mongo,
152 #[sea_orm(string_value = "CITUS")]
153 Citus,
154}
155
156impl From<CdcTableType> for PbCdcTableType {
157 fn from(cdc_table_type: CdcTableType) -> Self {
158 match cdc_table_type {
159 CdcTableType::Postgres => Self::Postgres,
160 CdcTableType::Mysql => Self::Mysql,
161 CdcTableType::Sqlserver => Self::Sqlserver,
162 CdcTableType::Mongo => Self::Mongo,
163 CdcTableType::Citus => Self::Citus,
164 CdcTableType::Unspecified => Self::Unspecified,
165 }
166 }
167}
168
169impl From<PbCdcTableType> for CdcTableType {
170 fn from(cdc_table_type: PbCdcTableType) -> Self {
171 match cdc_table_type {
172 PbCdcTableType::Postgres => Self::Postgres,
173 PbCdcTableType::Mysql => Self::Mysql,
174 PbCdcTableType::Sqlserver => Self::Sqlserver,
175 PbCdcTableType::Mongo => Self::Mongo,
176 PbCdcTableType::Citus => Self::Citus,
177 PbCdcTableType::Unspecified => Self::Unspecified,
178 }
179 }
180}
181
182crate::derive_from_blob!(VectorIndexInfo, PbVectorIndexInfo);
183
184#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
185#[sea_orm(table_name = "table")]
186pub struct Model {
187 #[sea_orm(primary_key, auto_increment = false)]
188 pub table_id: TableId,
189 pub name: String,
190 pub optional_associated_source_id: Option<SourceId>,
191 pub table_type: TableType,
192 pub belongs_to_job_id: Option<ObjectId>,
193 pub columns: ColumnCatalogArray,
194 pub pk: ColumnOrderArray,
195 pub distribution_key: I32Array,
196 pub stream_key: I32Array,
197 pub append_only: bool,
198 pub fragment_id: Option<FragmentId>,
199 pub vnode_col_index: Option<i32>,
200 pub row_id_index: Option<i32>,
201 pub value_indices: I32Array,
202 pub definition: String,
203 pub handle_pk_conflict_behavior: HandleConflictBehavior,
204 pub version_column_indices: Option<I32Array>,
205 pub read_prefix_len_hint: i32,
206 pub watermark_indices: I32Array,
207 pub dist_key_in_pk: I32Array,
208 pub dml_fragment_id: Option<FragmentId>,
209 pub cardinality: Option<Cardinality>,
210 pub cleaned_by_watermark: bool,
211 pub description: Option<String>,
212 pub version: Option<TableVersion>,
213 pub retention_seconds: Option<i32>,
214 pub cdc_table_id: Option<String>,
215 pub vnode_count: i32,
216 pub webhook_info: Option<WebhookSourceInfo>,
217 pub engine: Option<Engine>,
218 pub clean_watermark_index_in_pk: Option<i32>,
219 pub refreshable: bool,
220 pub vector_index_info: Option<VectorIndexInfo>,
221 pub cdc_table_type: Option<CdcTableType>,
222}
223
224#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
225pub enum Relation {
226 #[sea_orm(
227 belongs_to = "super::fragment::Entity",
228 from = "Column::DmlFragmentId",
229 to = "super::fragment::Column::FragmentId",
230 on_update = "NoAction",
231 on_delete = "NoAction"
232 )]
233 Fragment2,
234 #[sea_orm(
235 belongs_to = "super::fragment::Entity",
236 from = "Column::FragmentId",
237 to = "super::fragment::Column::FragmentId",
238 on_update = "NoAction",
239 on_delete = "NoAction"
240 )]
241 Fragment1,
242 #[sea_orm(
243 belongs_to = "super::object::Entity",
244 from = "Column::BelongsToJobId",
245 to = "super::object::Column::Oid",
246 on_update = "NoAction",
247 on_delete = "Cascade"
248 )]
249 Object2,
250 #[sea_orm(
251 belongs_to = "super::object::Entity",
252 from = "Column::TableId",
253 to = "super::object::Column::Oid",
254 on_update = "NoAction",
255 on_delete = "Cascade"
256 )]
257 Object1,
258 #[sea_orm(
259 belongs_to = "super::source::Entity",
260 from = "Column::OptionalAssociatedSourceId",
261 to = "super::source::Column::SourceId",
262 on_update = "NoAction",
263 on_delete = "NoAction"
264 )]
265 Source,
266
267 #[sea_orm(
269 belongs_to = "super::object_dependency::Entity",
270 from = "Column::TableId",
271 to = "super::object_dependency::Column::UsedBy",
272 on_update = "NoAction",
273 on_delete = "Cascade"
274 )]
275 ObjectDependency,
276}
277
278impl Related<super::object::Entity> for Entity {
279 fn to() -> RelationDef {
280 Relation::Object1.def()
281 }
282}
283
284impl Related<super::source::Entity> for Entity {
285 fn to() -> RelationDef {
286 Relation::Source.def()
287 }
288}
289
290impl ActiveModelBehavior for ActiveModel {}
291
292impl From<PbTable> for ActiveModel {
293 fn from(pb_table: PbTable) -> Self {
294 let table_type = pb_table.table_type();
295 let handle_pk_conflict_behavior = pb_table.handle_pk_conflict_behavior();
296
297 let vnode_count = pb_table
301 .vnode_count_inner()
302 .value_opt()
303 .map(|v| v as _)
304 .map_or(NotSet, Set);
305 let fragment_id = if pb_table.fragment_id == OBJECT_ID_PLACEHOLDER {
306 NotSet
307 } else {
308 Set(Some(pb_table.fragment_id as FragmentId))
309 };
310 let dml_fragment_id = pb_table
311 .dml_fragment_id
312 .map(|x| Set(Some(x as FragmentId)))
313 .unwrap_or_default();
314 let optional_associated_source_id =
315 if let Some(OptionalAssociatedSourceId::AssociatedSourceId(src_id)) =
316 pb_table.optional_associated_source_id
317 {
318 Set(Some(src_id as SourceId))
319 } else {
320 NotSet
321 };
322
323 Self {
324 table_id: Set(pb_table.id as _),
325 name: Set(pb_table.name),
326 optional_associated_source_id,
327 table_type: Set(table_type.into()),
328 belongs_to_job_id: Set(pb_table.job_id.map(|x| x as _)),
329 columns: Set(pb_table.columns.into()),
330 pk: Set(pb_table.pk.into()),
331 distribution_key: Set(pb_table.distribution_key.into()),
332 stream_key: Set(pb_table.stream_key.into()),
333 append_only: Set(pb_table.append_only),
334 fragment_id,
335 vnode_col_index: Set(pb_table.vnode_col_index.map(|x| x as i32)),
336 row_id_index: Set(pb_table.row_id_index.map(|x| x as i32)),
337 value_indices: Set(pb_table.value_indices.into()),
338 definition: Set(pb_table.definition),
339 handle_pk_conflict_behavior: Set(handle_pk_conflict_behavior.into()),
340 version_column_indices: Set(Some(
341 pb_table
342 .version_column_indices
343 .iter()
344 .map(|x| *x as i32)
345 .collect::<Vec<_>>()
346 .into(),
347 )),
348 read_prefix_len_hint: Set(pb_table.read_prefix_len_hint as _),
349 watermark_indices: Set(pb_table.watermark_indices.into()),
350 dist_key_in_pk: Set(pb_table.dist_key_in_pk.into()),
351 dml_fragment_id,
352 cardinality: Set(pb_table.cardinality.as_ref().map(|x| x.into())),
353 cleaned_by_watermark: Set(pb_table.cleaned_by_watermark),
354 description: Set(pb_table.description),
355 version: Set(pb_table.version.as_ref().map(|v| v.into())),
356 retention_seconds: Set(pb_table.retention_seconds.map(|i| i as _)),
357 cdc_table_id: Set(pb_table.cdc_table_id),
358 vnode_count,
359 webhook_info: Set(pb_table.webhook_info.as_ref().map(WebhookSourceInfo::from)),
360 engine: Set(pb_table
361 .engine
362 .map(|engine| Engine::from(PbEngine::try_from(engine).expect("Invalid engine")))),
363 clean_watermark_index_in_pk: Set(pb_table.clean_watermark_index_in_pk),
364 refreshable: Set(pb_table.refreshable),
365 vector_index_info: Set(pb_table
366 .vector_index_info
367 .as_ref()
368 .map(VectorIndexInfo::from)),
369 cdc_table_type: Set(pb_table.cdc_table_type.map(|cdc_table_type| {
370 match cdc_table_type {
371 0 => CdcTableType::Postgres, 1 => CdcTableType::Postgres,
373 2 => CdcTableType::Mysql,
374 3 => CdcTableType::Sqlserver,
375 4 => CdcTableType::Mongo,
376 _ => panic!("Invalid CDC table type: {cdc_table_type}"),
377 }
378 })),
379 }
380 }
381}