risingwave_meta_model/
table.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::catalog::OBJECT_ID_PLACEHOLDER;
16use risingwave_common::hash::VnodeCountCompat;
17use risingwave_common::id::JobId;
18use risingwave_pb::catalog::table::{CdcTableType as PbCdcTableType, PbEngine, PbTableType};
19use risingwave_pb::catalog::{PbHandleConflictBehavior, PbTable, PbVectorIndexInfo};
20use sea_orm::ActiveValue::Set;
21use sea_orm::NotSet;
22use sea_orm::entity::prelude::*;
23use serde::{Deserialize, Serialize};
24
25use crate::{
26    Cardinality, ColumnCatalogArray, ColumnOrderArray, FragmentId, I32Array, SourceId, TableId,
27    TableVersion, WebhookSourceInfo,
28};
29
30#[derive(
31    Clone, Debug, PartialEq, Hash, Copy, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize,
32)]
33#[sea_orm(rs_type = "String", db_type = "string(None)")]
34pub enum TableType {
35    #[sea_orm(string_value = "TABLE")]
36    Table,
37    #[sea_orm(string_value = "MATERIALIZED_VIEW")]
38    MaterializedView,
39    #[sea_orm(string_value = "INDEX")]
40    Index,
41    #[sea_orm(string_value = "INTERNAL")]
42    Internal,
43    #[sea_orm(string_value = "VECTOR_INDEX")]
44    VectorIndex,
45}
46
47impl From<TableType> for PbTableType {
48    fn from(table_type: TableType) -> Self {
49        match table_type {
50            TableType::Table => Self::Table,
51            TableType::MaterializedView => Self::MaterializedView,
52            TableType::Index => Self::Index,
53            TableType::Internal => Self::Internal,
54            TableType::VectorIndex => Self::VectorIndex,
55        }
56    }
57}
58
59impl From<PbTableType> for TableType {
60    fn from(table_type: PbTableType) -> Self {
61        match table_type {
62            PbTableType::Table => Self::Table,
63            PbTableType::MaterializedView => Self::MaterializedView,
64            PbTableType::Index => Self::Index,
65            PbTableType::Internal => Self::Internal,
66            PbTableType::VectorIndex => Self::VectorIndex,
67            PbTableType::Unspecified => unreachable!("Unspecified table type"),
68        }
69    }
70}
71
72#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
73#[sea_orm(rs_type = "String", db_type = "string(None)")]
74pub enum HandleConflictBehavior {
75    #[sea_orm(string_value = "OVERWRITE")]
76    Overwrite,
77    #[sea_orm(string_value = "IGNORE")]
78    Ignore,
79    #[sea_orm(string_value = "NO_CHECK")]
80    NoCheck,
81    #[sea_orm(string_value = "DO_UPDATE_IF_NOT_NULL")]
82    DoUpdateIfNotNull,
83}
84
85impl From<HandleConflictBehavior> for PbHandleConflictBehavior {
86    fn from(handle_conflict_behavior: HandleConflictBehavior) -> Self {
87        match handle_conflict_behavior {
88            HandleConflictBehavior::Overwrite => Self::Overwrite,
89            HandleConflictBehavior::Ignore => Self::Ignore,
90            HandleConflictBehavior::NoCheck => Self::NoCheck,
91            HandleConflictBehavior::DoUpdateIfNotNull => Self::DoUpdateIfNotNull,
92        }
93    }
94}
95
96impl From<PbHandleConflictBehavior> for HandleConflictBehavior {
97    fn from(handle_conflict_behavior: PbHandleConflictBehavior) -> Self {
98        match handle_conflict_behavior {
99            PbHandleConflictBehavior::Overwrite => Self::Overwrite,
100            PbHandleConflictBehavior::Ignore => Self::Ignore,
101            PbHandleConflictBehavior::NoCheck => Self::NoCheck,
102            PbHandleConflictBehavior::DoUpdateIfNotNull => Self::DoUpdateIfNotNull,
103            PbHandleConflictBehavior::Unspecified => {
104                unreachable!("Unspecified handle conflict behavior")
105            }
106        }
107    }
108}
109
110#[derive(Clone, Copy, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
111#[sea_orm(rs_type = "String", db_type = "string(None)")]
112pub enum RefreshState {
113    /// The table is refreshable and the current state is `Pending`.
114    #[sea_orm(string_value = "IDLE")]
115    Idle,
116    /// The table is refreshable and the current state is `Refreshing` (`RefreshStart` barrier passed).
117    #[sea_orm(string_value = "REFRESHING")]
118    Refreshing,
119    /// The table is refreshable and the current state is `Finishing`. (`LoadFinish` barrier passed).
120    #[sea_orm(string_value = "FINISHING")]
121    Finishing,
122}
123
124impl From<RefreshState> for risingwave_pb::catalog::RefreshState {
125    fn from(refresh_state: RefreshState) -> Self {
126        match refresh_state {
127            RefreshState::Idle => Self::Idle,
128            RefreshState::Refreshing => Self::Refreshing,
129            RefreshState::Finishing => Self::Finishing,
130        }
131    }
132}
133
134impl From<risingwave_pb::catalog::RefreshState> for RefreshState {
135    fn from(refresh_state: risingwave_pb::catalog::RefreshState) -> Self {
136        match refresh_state {
137            risingwave_pb::catalog::RefreshState::Idle => Self::Idle,
138            risingwave_pb::catalog::RefreshState::Refreshing => Self::Refreshing,
139            risingwave_pb::catalog::RefreshState::Finishing => Self::Finishing,
140            risingwave_pb::catalog::RefreshState::Unspecified => {
141                unreachable!("Unspecified refresh state")
142            }
143        }
144    }
145}
146
147impl std::fmt::Display for RefreshState {
148    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
149        match self {
150            RefreshState::Idle => write!(f, "IDLE"),
151            RefreshState::Refreshing => write!(f, "REFRESHING"),
152            RefreshState::Finishing => write!(f, "FINISHING"),
153        }
154    }
155}
156
157#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
158#[sea_orm(rs_type = "String", db_type = "string(None)")]
159pub enum Engine {
160    #[sea_orm(string_value = "HUMMOCK")]
161    Hummock,
162    #[sea_orm(string_value = "ICEBERG")]
163    Iceberg,
164}
165
166impl From<Engine> for PbEngine {
167    fn from(engine: Engine) -> Self {
168        match engine {
169            Engine::Hummock => Self::Hummock,
170            Engine::Iceberg => Self::Iceberg,
171        }
172    }
173}
174
175impl From<PbEngine> for Engine {
176    fn from(engine: PbEngine) -> Self {
177        match engine {
178            PbEngine::Hummock => Self::Hummock,
179            PbEngine::Iceberg => Self::Iceberg,
180            PbEngine::Unspecified => Self::Hummock,
181        }
182    }
183}
184
185#[derive(Clone, Debug, PartialEq, Eq, Hash, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
186#[sea_orm(rs_type = "String", db_type = "string(None)")]
187pub enum CdcTableType {
188    #[sea_orm(string_value = "UNSPECIFIED")]
189    Unspecified,
190    #[sea_orm(string_value = "POSTGRES")]
191    Postgres,
192    #[sea_orm(string_value = "MYSQL")]
193    Mysql,
194    #[sea_orm(string_value = "SQLSERVER")]
195    Sqlserver,
196    #[sea_orm(string_value = "MONGO")]
197    Mongo,
198    #[sea_orm(string_value = "CITUS")]
199    Citus,
200}
201
202impl From<CdcTableType> for PbCdcTableType {
203    fn from(cdc_table_type: CdcTableType) -> Self {
204        match cdc_table_type {
205            CdcTableType::Postgres => Self::Postgres,
206            CdcTableType::Mysql => Self::Mysql,
207            CdcTableType::Sqlserver => Self::Sqlserver,
208            CdcTableType::Mongo => Self::Mongo,
209            CdcTableType::Citus => Self::Citus,
210            CdcTableType::Unspecified => Self::Unspecified,
211        }
212    }
213}
214
215impl From<PbCdcTableType> for CdcTableType {
216    fn from(cdc_table_type: PbCdcTableType) -> Self {
217        match cdc_table_type {
218            PbCdcTableType::Postgres => Self::Postgres,
219            PbCdcTableType::Mysql => Self::Mysql,
220            PbCdcTableType::Sqlserver => Self::Sqlserver,
221            PbCdcTableType::Mongo => Self::Mongo,
222            PbCdcTableType::Citus => Self::Citus,
223            PbCdcTableType::Unspecified => Self::Unspecified,
224        }
225    }
226}
227
228crate::derive_from_blob!(VectorIndexInfo, PbVectorIndexInfo);
229
230#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
231#[sea_orm(table_name = "table")]
232pub struct Model {
233    #[sea_orm(primary_key, auto_increment = false)]
234    pub table_id: TableId,
235    pub name: String,
236    pub optional_associated_source_id: Option<SourceId>,
237    pub table_type: TableType,
238    pub belongs_to_job_id: Option<JobId>,
239    pub columns: ColumnCatalogArray,
240    pub pk: ColumnOrderArray,
241    pub distribution_key: I32Array,
242    pub stream_key: I32Array,
243    pub append_only: bool,
244    pub fragment_id: Option<FragmentId>,
245    pub vnode_col_index: Option<i32>,
246    pub row_id_index: Option<i32>,
247    pub value_indices: I32Array,
248    pub definition: String,
249    pub handle_pk_conflict_behavior: HandleConflictBehavior,
250    pub version_column_indices: Option<I32Array>,
251    pub read_prefix_len_hint: i32,
252    pub watermark_indices: I32Array,
253    pub dist_key_in_pk: I32Array,
254    pub dml_fragment_id: Option<FragmentId>,
255    pub cardinality: Option<Cardinality>,
256    pub cleaned_by_watermark: bool,
257    pub description: Option<String>,
258    pub version: Option<TableVersion>,
259    pub retention_seconds: Option<i32>,
260    pub cdc_table_id: Option<String>,
261    pub vnode_count: i32,
262    pub webhook_info: Option<WebhookSourceInfo>,
263    pub engine: Option<Engine>,
264    pub clean_watermark_index_in_pk: Option<i32>,
265    pub refreshable: bool,
266    pub vector_index_info: Option<VectorIndexInfo>,
267    pub cdc_table_type: Option<CdcTableType>,
268    pub refresh_state: Option<RefreshState>,
269}
270
271#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
272pub enum Relation {
273    #[sea_orm(
274        belongs_to = "super::fragment::Entity",
275        from = "Column::DmlFragmentId",
276        to = "super::fragment::Column::FragmentId",
277        on_update = "NoAction",
278        on_delete = "NoAction"
279    )]
280    Fragment2,
281    #[sea_orm(
282        belongs_to = "super::fragment::Entity",
283        from = "Column::FragmentId",
284        to = "super::fragment::Column::FragmentId",
285        on_update = "NoAction",
286        on_delete = "NoAction"
287    )]
288    Fragment1,
289    #[sea_orm(
290        belongs_to = "super::object::Entity",
291        from = "Column::BelongsToJobId",
292        to = "super::object::Column::Oid",
293        on_update = "NoAction",
294        on_delete = "Cascade"
295    )]
296    Object2,
297    #[sea_orm(
298        belongs_to = "super::object::Entity",
299        from = "Column::TableId",
300        to = "super::object::Column::Oid",
301        on_update = "NoAction",
302        on_delete = "Cascade"
303    )]
304    Object1,
305    #[sea_orm(
306        belongs_to = "super::source::Entity",
307        from = "Column::OptionalAssociatedSourceId",
308        to = "super::source::Column::SourceId",
309        on_update = "NoAction",
310        on_delete = "NoAction"
311    )]
312    Source,
313
314    // To join object_dependency on the used_by column
315    #[sea_orm(
316        belongs_to = "super::object_dependency::Entity",
317        from = "Column::TableId",
318        to = "super::object_dependency::Column::UsedBy",
319        on_update = "NoAction",
320        on_delete = "Cascade"
321    )]
322    ObjectDependency,
323}
324
325impl Related<super::object::Entity> for Entity {
326    fn to() -> RelationDef {
327        Relation::Object1.def()
328    }
329}
330
331impl Related<super::source::Entity> for Entity {
332    fn to() -> RelationDef {
333        Relation::Source.def()
334    }
335}
336
337impl ActiveModelBehavior for ActiveModel {}
338
339impl From<PbTable> for ActiveModel {
340    fn from(pb_table: PbTable) -> Self {
341        let table_type = pb_table.table_type();
342        let handle_pk_conflict_behavior = pb_table.handle_pk_conflict_behavior();
343        let refresh_state = pb_table.refresh_state();
344
345        // `PbTable` here should be sourced from the wire, not from persistence.
346        // A placeholder `maybe_vnode_count` field should be treated as `NotSet`, instead of calling
347        // the compatibility code.
348        let vnode_count = pb_table
349            .vnode_count_inner()
350            .value_opt()
351            .map(|v| v as _)
352            .map_or(NotSet, Set);
353        let fragment_id = if pb_table.fragment_id == OBJECT_ID_PLACEHOLDER {
354            NotSet
355        } else {
356            Set(Some(pb_table.fragment_id))
357        };
358        let dml_fragment_id = pb_table
359            .dml_fragment_id
360            .map(|x| Set(Some(x)))
361            .unwrap_or_default();
362        let optional_associated_source_id =
363            if let Some(src_id) = pb_table.optional_associated_source_id {
364                Set(Some(src_id.into()))
365            } else {
366                NotSet
367            };
368
369        Self {
370            table_id: Set(pb_table.id),
371            name: Set(pb_table.name),
372            optional_associated_source_id,
373            table_type: Set(table_type.into()),
374            belongs_to_job_id: Set(pb_table.job_id),
375            columns: Set(pb_table.columns.into()),
376            pk: Set(pb_table.pk.into()),
377            distribution_key: Set(pb_table.distribution_key.into()),
378            stream_key: Set(pb_table.stream_key.into()),
379            append_only: Set(pb_table.append_only),
380            fragment_id,
381            vnode_col_index: Set(pb_table.vnode_col_index.map(|x| x as i32)),
382            row_id_index: Set(pb_table.row_id_index.map(|x| x as i32)),
383            value_indices: Set(pb_table.value_indices.into()),
384            definition: Set(pb_table.definition),
385            handle_pk_conflict_behavior: Set(handle_pk_conflict_behavior.into()),
386            version_column_indices: Set(Some(
387                pb_table
388                    .version_column_indices
389                    .iter()
390                    .map(|x| *x as i32)
391                    .collect::<Vec<_>>()
392                    .into(),
393            )),
394            read_prefix_len_hint: Set(pb_table.read_prefix_len_hint as _),
395            watermark_indices: Set(pb_table.watermark_indices.into()),
396            dist_key_in_pk: Set(pb_table.dist_key_in_pk.into()),
397            dml_fragment_id,
398            cardinality: Set(pb_table.cardinality.as_ref().map(|x| x.into())),
399            cleaned_by_watermark: Set(pb_table.cleaned_by_watermark),
400            description: Set(pb_table.description),
401            version: Set(pb_table.version.as_ref().map(|v| v.into())),
402            retention_seconds: Set(pb_table.retention_seconds.map(|i| i as _)),
403            cdc_table_id: Set(pb_table.cdc_table_id),
404            vnode_count,
405            webhook_info: Set(pb_table.webhook_info.as_ref().map(WebhookSourceInfo::from)),
406            engine: Set(pb_table
407                .engine
408                .map(|engine| Engine::from(PbEngine::try_from(engine).expect("Invalid engine")))),
409            clean_watermark_index_in_pk: Set(pb_table.clean_watermark_index_in_pk),
410            refreshable: Set(pb_table.refreshable),
411            vector_index_info: Set(pb_table
412                .vector_index_info
413                .as_ref()
414                .map(VectorIndexInfo::from)),
415            cdc_table_type: Set(pb_table.cdc_table_type.map(|cdc_table_type| {
416                match cdc_table_type {
417                    0 => CdcTableType::Postgres, // Map Unspecified to Postgres as default
418                    1 => CdcTableType::Postgres,
419                    2 => CdcTableType::Mysql,
420                    3 => CdcTableType::Sqlserver,
421                    4 => CdcTableType::Mongo,
422                    _ => panic!("Invalid CDC table type: {cdc_table_type}"),
423                }
424            })),
425            refresh_state: Set(Some(refresh_state.into())),
426        }
427    }
428}