risingwave_meta_model/
table.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::catalog::OBJECT_ID_PLACEHOLDER;
16use risingwave_common::hash::VnodeCountCompat;
17use risingwave_common::id::JobId;
18use risingwave_pb::catalog::table::{
19    CdcTableType as PbCdcTableType, OptionalAssociatedSourceId, PbEngine, PbTableType,
20};
21use risingwave_pb::catalog::{PbHandleConflictBehavior, PbTable, PbVectorIndexInfo};
22use sea_orm::ActiveValue::Set;
23use sea_orm::NotSet;
24use sea_orm::entity::prelude::*;
25use serde::{Deserialize, Serialize};
26
27use crate::{
28    Cardinality, ColumnCatalogArray, ColumnOrderArray, FragmentId, I32Array, SourceId, TableId,
29    TableVersion, WebhookSourceInfo,
30};
31
32#[derive(
33    Clone, Debug, PartialEq, Hash, Copy, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize,
34)]
35#[sea_orm(rs_type = "String", db_type = "string(None)")]
36pub enum TableType {
37    #[sea_orm(string_value = "TABLE")]
38    Table,
39    #[sea_orm(string_value = "MATERIALIZED_VIEW")]
40    MaterializedView,
41    #[sea_orm(string_value = "INDEX")]
42    Index,
43    #[sea_orm(string_value = "INTERNAL")]
44    Internal,
45    #[sea_orm(string_value = "VECTOR_INDEX")]
46    VectorIndex,
47}
48
49impl From<TableType> for PbTableType {
50    fn from(table_type: TableType) -> Self {
51        match table_type {
52            TableType::Table => Self::Table,
53            TableType::MaterializedView => Self::MaterializedView,
54            TableType::Index => Self::Index,
55            TableType::Internal => Self::Internal,
56            TableType::VectorIndex => Self::VectorIndex,
57        }
58    }
59}
60
61impl From<PbTableType> for TableType {
62    fn from(table_type: PbTableType) -> Self {
63        match table_type {
64            PbTableType::Table => Self::Table,
65            PbTableType::MaterializedView => Self::MaterializedView,
66            PbTableType::Index => Self::Index,
67            PbTableType::Internal => Self::Internal,
68            PbTableType::VectorIndex => Self::VectorIndex,
69            PbTableType::Unspecified => unreachable!("Unspecified table type"),
70        }
71    }
72}
73
74#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
75#[sea_orm(rs_type = "String", db_type = "string(None)")]
76pub enum HandleConflictBehavior {
77    #[sea_orm(string_value = "OVERWRITE")]
78    Overwrite,
79    #[sea_orm(string_value = "IGNORE")]
80    Ignore,
81    #[sea_orm(string_value = "NO_CHECK")]
82    NoCheck,
83    #[sea_orm(string_value = "DO_UPDATE_IF_NOT_NULL")]
84    DoUpdateIfNotNull,
85}
86
87impl From<HandleConflictBehavior> for PbHandleConflictBehavior {
88    fn from(handle_conflict_behavior: HandleConflictBehavior) -> Self {
89        match handle_conflict_behavior {
90            HandleConflictBehavior::Overwrite => Self::Overwrite,
91            HandleConflictBehavior::Ignore => Self::Ignore,
92            HandleConflictBehavior::NoCheck => Self::NoCheck,
93            HandleConflictBehavior::DoUpdateIfNotNull => Self::DoUpdateIfNotNull,
94        }
95    }
96}
97
98impl From<PbHandleConflictBehavior> for HandleConflictBehavior {
99    fn from(handle_conflict_behavior: PbHandleConflictBehavior) -> Self {
100        match handle_conflict_behavior {
101            PbHandleConflictBehavior::Overwrite => Self::Overwrite,
102            PbHandleConflictBehavior::Ignore => Self::Ignore,
103            PbHandleConflictBehavior::NoCheck => Self::NoCheck,
104            PbHandleConflictBehavior::DoUpdateIfNotNull => Self::DoUpdateIfNotNull,
105            PbHandleConflictBehavior::Unspecified => {
106                unreachable!("Unspecified handle conflict behavior")
107            }
108        }
109    }
110}
111
112#[derive(Clone, Copy, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
113#[sea_orm(rs_type = "String", db_type = "string(None)")]
114pub enum RefreshState {
115    /// The table is refreshable and the current state is `Pending`.
116    #[sea_orm(string_value = "IDLE")]
117    Idle,
118    /// The table is refreshable and the current state is `Refreshing` (`RefreshStart` barrier passed).
119    #[sea_orm(string_value = "REFRESHING")]
120    Refreshing,
121    /// The table is refreshable and the current state is `Finishing`. (`LoadFinish` barrier passed).
122    #[sea_orm(string_value = "FINISHING")]
123    Finishing,
124}
125
126impl From<RefreshState> for risingwave_pb::catalog::RefreshState {
127    fn from(refresh_state: RefreshState) -> Self {
128        match refresh_state {
129            RefreshState::Idle => Self::Idle,
130            RefreshState::Refreshing => Self::Refreshing,
131            RefreshState::Finishing => Self::Finishing,
132        }
133    }
134}
135
136impl From<risingwave_pb::catalog::RefreshState> for RefreshState {
137    fn from(refresh_state: risingwave_pb::catalog::RefreshState) -> Self {
138        match refresh_state {
139            risingwave_pb::catalog::RefreshState::Idle => Self::Idle,
140            risingwave_pb::catalog::RefreshState::Refreshing => Self::Refreshing,
141            risingwave_pb::catalog::RefreshState::Finishing => Self::Finishing,
142            risingwave_pb::catalog::RefreshState::Unspecified => {
143                unreachable!("Unspecified refresh state")
144            }
145        }
146    }
147}
148
149impl std::fmt::Display for RefreshState {
150    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
151        match self {
152            RefreshState::Idle => write!(f, "IDLE"),
153            RefreshState::Refreshing => write!(f, "REFRESHING"),
154            RefreshState::Finishing => write!(f, "FINISHING"),
155        }
156    }
157}
158
159#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
160#[sea_orm(rs_type = "String", db_type = "string(None)")]
161pub enum Engine {
162    #[sea_orm(string_value = "HUMMOCK")]
163    Hummock,
164    #[sea_orm(string_value = "ICEBERG")]
165    Iceberg,
166}
167
168impl From<Engine> for PbEngine {
169    fn from(engine: Engine) -> Self {
170        match engine {
171            Engine::Hummock => Self::Hummock,
172            Engine::Iceberg => Self::Iceberg,
173        }
174    }
175}
176
177impl From<PbEngine> for Engine {
178    fn from(engine: PbEngine) -> Self {
179        match engine {
180            PbEngine::Hummock => Self::Hummock,
181            PbEngine::Iceberg => Self::Iceberg,
182            PbEngine::Unspecified => Self::Hummock,
183        }
184    }
185}
186
187#[derive(Clone, Debug, PartialEq, Eq, Hash, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
188#[sea_orm(rs_type = "String", db_type = "string(None)")]
189pub enum CdcTableType {
190    #[sea_orm(string_value = "UNSPECIFIED")]
191    Unspecified,
192    #[sea_orm(string_value = "POSTGRES")]
193    Postgres,
194    #[sea_orm(string_value = "MYSQL")]
195    Mysql,
196    #[sea_orm(string_value = "SQLSERVER")]
197    Sqlserver,
198    #[sea_orm(string_value = "MONGO")]
199    Mongo,
200    #[sea_orm(string_value = "CITUS")]
201    Citus,
202}
203
204impl From<CdcTableType> for PbCdcTableType {
205    fn from(cdc_table_type: CdcTableType) -> Self {
206        match cdc_table_type {
207            CdcTableType::Postgres => Self::Postgres,
208            CdcTableType::Mysql => Self::Mysql,
209            CdcTableType::Sqlserver => Self::Sqlserver,
210            CdcTableType::Mongo => Self::Mongo,
211            CdcTableType::Citus => Self::Citus,
212            CdcTableType::Unspecified => Self::Unspecified,
213        }
214    }
215}
216
217impl From<PbCdcTableType> for CdcTableType {
218    fn from(cdc_table_type: PbCdcTableType) -> Self {
219        match cdc_table_type {
220            PbCdcTableType::Postgres => Self::Postgres,
221            PbCdcTableType::Mysql => Self::Mysql,
222            PbCdcTableType::Sqlserver => Self::Sqlserver,
223            PbCdcTableType::Mongo => Self::Mongo,
224            PbCdcTableType::Citus => Self::Citus,
225            PbCdcTableType::Unspecified => Self::Unspecified,
226        }
227    }
228}
229
230crate::derive_from_blob!(VectorIndexInfo, PbVectorIndexInfo);
231
232#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
233#[sea_orm(table_name = "table")]
234pub struct Model {
235    #[sea_orm(primary_key, auto_increment = false)]
236    pub table_id: TableId,
237    pub name: String,
238    pub optional_associated_source_id: Option<SourceId>,
239    pub table_type: TableType,
240    pub belongs_to_job_id: Option<JobId>,
241    pub columns: ColumnCatalogArray,
242    pub pk: ColumnOrderArray,
243    pub distribution_key: I32Array,
244    pub stream_key: I32Array,
245    pub append_only: bool,
246    pub fragment_id: Option<FragmentId>,
247    pub vnode_col_index: Option<i32>,
248    pub row_id_index: Option<i32>,
249    pub value_indices: I32Array,
250    pub definition: String,
251    pub handle_pk_conflict_behavior: HandleConflictBehavior,
252    pub version_column_indices: Option<I32Array>,
253    pub read_prefix_len_hint: i32,
254    pub watermark_indices: I32Array,
255    pub dist_key_in_pk: I32Array,
256    pub dml_fragment_id: Option<FragmentId>,
257    pub cardinality: Option<Cardinality>,
258    pub cleaned_by_watermark: bool,
259    pub description: Option<String>,
260    pub version: Option<TableVersion>,
261    pub retention_seconds: Option<i32>,
262    pub cdc_table_id: Option<String>,
263    pub vnode_count: i32,
264    pub webhook_info: Option<WebhookSourceInfo>,
265    pub engine: Option<Engine>,
266    pub clean_watermark_index_in_pk: Option<i32>,
267    pub refreshable: bool,
268    pub vector_index_info: Option<VectorIndexInfo>,
269    pub cdc_table_type: Option<CdcTableType>,
270    pub refresh_state: Option<RefreshState>,
271}
272
273#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
274pub enum Relation {
275    #[sea_orm(
276        belongs_to = "super::fragment::Entity",
277        from = "Column::DmlFragmentId",
278        to = "super::fragment::Column::FragmentId",
279        on_update = "NoAction",
280        on_delete = "NoAction"
281    )]
282    Fragment2,
283    #[sea_orm(
284        belongs_to = "super::fragment::Entity",
285        from = "Column::FragmentId",
286        to = "super::fragment::Column::FragmentId",
287        on_update = "NoAction",
288        on_delete = "NoAction"
289    )]
290    Fragment1,
291    #[sea_orm(
292        belongs_to = "super::object::Entity",
293        from = "Column::BelongsToJobId",
294        to = "super::object::Column::Oid",
295        on_update = "NoAction",
296        on_delete = "Cascade"
297    )]
298    Object2,
299    #[sea_orm(
300        belongs_to = "super::object::Entity",
301        from = "Column::TableId",
302        to = "super::object::Column::Oid",
303        on_update = "NoAction",
304        on_delete = "Cascade"
305    )]
306    Object1,
307    #[sea_orm(
308        belongs_to = "super::source::Entity",
309        from = "Column::OptionalAssociatedSourceId",
310        to = "super::source::Column::SourceId",
311        on_update = "NoAction",
312        on_delete = "NoAction"
313    )]
314    Source,
315
316    // To join object_dependency on the used_by column
317    #[sea_orm(
318        belongs_to = "super::object_dependency::Entity",
319        from = "Column::TableId",
320        to = "super::object_dependency::Column::UsedBy",
321        on_update = "NoAction",
322        on_delete = "Cascade"
323    )]
324    ObjectDependency,
325}
326
327impl Related<super::object::Entity> for Entity {
328    fn to() -> RelationDef {
329        Relation::Object1.def()
330    }
331}
332
333impl Related<super::source::Entity> for Entity {
334    fn to() -> RelationDef {
335        Relation::Source.def()
336    }
337}
338
339impl ActiveModelBehavior for ActiveModel {}
340
341impl From<PbTable> for ActiveModel {
342    fn from(pb_table: PbTable) -> Self {
343        let table_type = pb_table.table_type();
344        let handle_pk_conflict_behavior = pb_table.handle_pk_conflict_behavior();
345        let refresh_state = pb_table.refresh_state();
346
347        // `PbTable` here should be sourced from the wire, not from persistence.
348        // A placeholder `maybe_vnode_count` field should be treated as `NotSet`, instead of calling
349        // the compatibility code.
350        let vnode_count = pb_table
351            .vnode_count_inner()
352            .value_opt()
353            .map(|v| v as _)
354            .map_or(NotSet, Set);
355        let fragment_id = if pb_table.fragment_id == OBJECT_ID_PLACEHOLDER {
356            NotSet
357        } else {
358            Set(Some(pb_table.fragment_id as FragmentId))
359        };
360        let dml_fragment_id = pb_table
361            .dml_fragment_id
362            .map(|x| Set(Some(x as FragmentId)))
363            .unwrap_or_default();
364        let optional_associated_source_id =
365            if let Some(OptionalAssociatedSourceId::AssociatedSourceId(src_id)) =
366                pb_table.optional_associated_source_id
367            {
368                Set(Some(src_id as SourceId))
369            } else {
370                NotSet
371            };
372
373        Self {
374            table_id: Set(pb_table.id.into()),
375            name: Set(pb_table.name),
376            optional_associated_source_id,
377            table_type: Set(table_type.into()),
378            belongs_to_job_id: Set(pb_table.job_id.map(|x| x.into())),
379            columns: Set(pb_table.columns.into()),
380            pk: Set(pb_table.pk.into()),
381            distribution_key: Set(pb_table.distribution_key.into()),
382            stream_key: Set(pb_table.stream_key.into()),
383            append_only: Set(pb_table.append_only),
384            fragment_id,
385            vnode_col_index: Set(pb_table.vnode_col_index.map(|x| x as i32)),
386            row_id_index: Set(pb_table.row_id_index.map(|x| x as i32)),
387            value_indices: Set(pb_table.value_indices.into()),
388            definition: Set(pb_table.definition),
389            handle_pk_conflict_behavior: Set(handle_pk_conflict_behavior.into()),
390            version_column_indices: Set(Some(
391                pb_table
392                    .version_column_indices
393                    .iter()
394                    .map(|x| *x as i32)
395                    .collect::<Vec<_>>()
396                    .into(),
397            )),
398            read_prefix_len_hint: Set(pb_table.read_prefix_len_hint as _),
399            watermark_indices: Set(pb_table.watermark_indices.into()),
400            dist_key_in_pk: Set(pb_table.dist_key_in_pk.into()),
401            dml_fragment_id,
402            cardinality: Set(pb_table.cardinality.as_ref().map(|x| x.into())),
403            cleaned_by_watermark: Set(pb_table.cleaned_by_watermark),
404            description: Set(pb_table.description),
405            version: Set(pb_table.version.as_ref().map(|v| v.into())),
406            retention_seconds: Set(pb_table.retention_seconds.map(|i| i as _)),
407            cdc_table_id: Set(pb_table.cdc_table_id),
408            vnode_count,
409            webhook_info: Set(pb_table.webhook_info.as_ref().map(WebhookSourceInfo::from)),
410            engine: Set(pb_table
411                .engine
412                .map(|engine| Engine::from(PbEngine::try_from(engine).expect("Invalid engine")))),
413            clean_watermark_index_in_pk: Set(pb_table.clean_watermark_index_in_pk),
414            refreshable: Set(pb_table.refreshable),
415            vector_index_info: Set(pb_table
416                .vector_index_info
417                .as_ref()
418                .map(VectorIndexInfo::from)),
419            cdc_table_type: Set(pb_table.cdc_table_type.map(|cdc_table_type| {
420                match cdc_table_type {
421                    0 => CdcTableType::Postgres, // Map Unspecified to Postgres as default
422                    1 => CdcTableType::Postgres,
423                    2 => CdcTableType::Mysql,
424                    3 => CdcTableType::Sqlserver,
425                    4 => CdcTableType::Mongo,
426                    _ => panic!("Invalid CDC table type: {cdc_table_type}"),
427                }
428            })),
429            refresh_state: Set(Some(refresh_state.into())),
430        }
431    }
432}