risingwave_meta_model/
table.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::catalog::OBJECT_ID_PLACEHOLDER;
16use risingwave_common::hash::VnodeCountCompat;
17use risingwave_pb::catalog::table::{
18    CdcTableType as PbCdcTableType, OptionalAssociatedSourceId, PbEngine, PbTableType,
19};
20use risingwave_pb::catalog::{PbHandleConflictBehavior, PbTable, PbVectorIndexInfo};
21use sea_orm::ActiveValue::Set;
22use sea_orm::NotSet;
23use sea_orm::entity::prelude::*;
24use serde::{Deserialize, Serialize};
25
26use crate::{
27    Cardinality, ColumnCatalogArray, ColumnOrderArray, FragmentId, I32Array, ObjectId, SourceId,
28    TableId, TableVersion, WebhookSourceInfo,
29};
30
31#[derive(
32    Clone, Debug, PartialEq, Hash, Copy, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize,
33)]
34#[sea_orm(rs_type = "String", db_type = "string(None)")]
35pub enum TableType {
36    #[sea_orm(string_value = "TABLE")]
37    Table,
38    #[sea_orm(string_value = "MATERIALIZED_VIEW")]
39    MaterializedView,
40    #[sea_orm(string_value = "INDEX")]
41    Index,
42    #[sea_orm(string_value = "INTERNAL")]
43    Internal,
44    #[sea_orm(string_value = "VECTOR_INDEX")]
45    VectorIndex,
46}
47
48impl From<TableType> for PbTableType {
49    fn from(table_type: TableType) -> Self {
50        match table_type {
51            TableType::Table => Self::Table,
52            TableType::MaterializedView => Self::MaterializedView,
53            TableType::Index => Self::Index,
54            TableType::Internal => Self::Internal,
55            TableType::VectorIndex => Self::VectorIndex,
56        }
57    }
58}
59
60impl From<PbTableType> for TableType {
61    fn from(table_type: PbTableType) -> Self {
62        match table_type {
63            PbTableType::Table => Self::Table,
64            PbTableType::MaterializedView => Self::MaterializedView,
65            PbTableType::Index => Self::Index,
66            PbTableType::Internal => Self::Internal,
67            PbTableType::VectorIndex => Self::VectorIndex,
68            PbTableType::Unspecified => unreachable!("Unspecified table type"),
69        }
70    }
71}
72
73#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
74#[sea_orm(rs_type = "String", db_type = "string(None)")]
75pub enum HandleConflictBehavior {
76    #[sea_orm(string_value = "OVERWRITE")]
77    Overwrite,
78    #[sea_orm(string_value = "IGNORE")]
79    Ignore,
80    #[sea_orm(string_value = "NO_CHECK")]
81    NoCheck,
82    #[sea_orm(string_value = "DO_UPDATE_IF_NOT_NULL")]
83    DoUpdateIfNotNull,
84}
85
86impl From<HandleConflictBehavior> for PbHandleConflictBehavior {
87    fn from(handle_conflict_behavior: HandleConflictBehavior) -> Self {
88        match handle_conflict_behavior {
89            HandleConflictBehavior::Overwrite => Self::Overwrite,
90            HandleConflictBehavior::Ignore => Self::Ignore,
91            HandleConflictBehavior::NoCheck => Self::NoCheck,
92            HandleConflictBehavior::DoUpdateIfNotNull => Self::DoUpdateIfNotNull,
93        }
94    }
95}
96
97impl From<PbHandleConflictBehavior> for HandleConflictBehavior {
98    fn from(handle_conflict_behavior: PbHandleConflictBehavior) -> Self {
99        match handle_conflict_behavior {
100            PbHandleConflictBehavior::Overwrite => Self::Overwrite,
101            PbHandleConflictBehavior::Ignore => Self::Ignore,
102            PbHandleConflictBehavior::NoCheck => Self::NoCheck,
103            PbHandleConflictBehavior::DoUpdateIfNotNull => Self::DoUpdateIfNotNull,
104            PbHandleConflictBehavior::Unspecified => {
105                unreachable!("Unspecified handle conflict behavior")
106            }
107        }
108    }
109}
110
111#[derive(Clone, Copy, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
112#[sea_orm(rs_type = "String", db_type = "string(None)")]
113pub enum RefreshState {
114    /// The table is refreshable and the current state is `Pending`.
115    #[sea_orm(string_value = "IDLE")]
116    Idle,
117    /// The table is refreshable and the current state is `Refreshing` (`RefreshStart` barrier passed).
118    #[sea_orm(string_value = "REFRESHING")]
119    Refreshing,
120    /// The table is refreshable and the current state is `Finishing`. (`LoadFinish` barrier passed).
121    #[sea_orm(string_value = "FINISHING")]
122    Finishing,
123}
124
125impl From<RefreshState> for risingwave_pb::catalog::RefreshState {
126    fn from(refresh_state: RefreshState) -> Self {
127        match refresh_state {
128            RefreshState::Idle => Self::Idle,
129            RefreshState::Refreshing => Self::Refreshing,
130            RefreshState::Finishing => Self::Finishing,
131        }
132    }
133}
134
135impl From<risingwave_pb::catalog::RefreshState> for RefreshState {
136    fn from(refresh_state: risingwave_pb::catalog::RefreshState) -> Self {
137        match refresh_state {
138            risingwave_pb::catalog::RefreshState::Idle => Self::Idle,
139            risingwave_pb::catalog::RefreshState::Refreshing => Self::Refreshing,
140            risingwave_pb::catalog::RefreshState::Finishing => Self::Finishing,
141            risingwave_pb::catalog::RefreshState::Unspecified => {
142                unreachable!("Unspecified refresh state")
143            }
144        }
145    }
146}
147
148impl std::fmt::Display for RefreshState {
149    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
150        match self {
151            RefreshState::Idle => write!(f, "IDLE"),
152            RefreshState::Refreshing => write!(f, "REFRESHING"),
153            RefreshState::Finishing => write!(f, "FINISHING"),
154        }
155    }
156}
157
158#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
159#[sea_orm(rs_type = "String", db_type = "string(None)")]
160pub enum Engine {
161    #[sea_orm(string_value = "HUMMOCK")]
162    Hummock,
163    #[sea_orm(string_value = "ICEBERG")]
164    Iceberg,
165}
166
167impl From<Engine> for PbEngine {
168    fn from(engine: Engine) -> Self {
169        match engine {
170            Engine::Hummock => Self::Hummock,
171            Engine::Iceberg => Self::Iceberg,
172        }
173    }
174}
175
176impl From<PbEngine> for Engine {
177    fn from(engine: PbEngine) -> Self {
178        match engine {
179            PbEngine::Hummock => Self::Hummock,
180            PbEngine::Iceberg => Self::Iceberg,
181            PbEngine::Unspecified => Self::Hummock,
182        }
183    }
184}
185
186#[derive(Clone, Debug, PartialEq, Eq, Hash, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
187#[sea_orm(rs_type = "String", db_type = "string(None)")]
188pub enum CdcTableType {
189    #[sea_orm(string_value = "UNSPECIFIED")]
190    Unspecified,
191    #[sea_orm(string_value = "POSTGRES")]
192    Postgres,
193    #[sea_orm(string_value = "MYSQL")]
194    Mysql,
195    #[sea_orm(string_value = "SQLSERVER")]
196    Sqlserver,
197    #[sea_orm(string_value = "MONGO")]
198    Mongo,
199    #[sea_orm(string_value = "CITUS")]
200    Citus,
201}
202
203impl From<CdcTableType> for PbCdcTableType {
204    fn from(cdc_table_type: CdcTableType) -> Self {
205        match cdc_table_type {
206            CdcTableType::Postgres => Self::Postgres,
207            CdcTableType::Mysql => Self::Mysql,
208            CdcTableType::Sqlserver => Self::Sqlserver,
209            CdcTableType::Mongo => Self::Mongo,
210            CdcTableType::Citus => Self::Citus,
211            CdcTableType::Unspecified => Self::Unspecified,
212        }
213    }
214}
215
216impl From<PbCdcTableType> for CdcTableType {
217    fn from(cdc_table_type: PbCdcTableType) -> Self {
218        match cdc_table_type {
219            PbCdcTableType::Postgres => Self::Postgres,
220            PbCdcTableType::Mysql => Self::Mysql,
221            PbCdcTableType::Sqlserver => Self::Sqlserver,
222            PbCdcTableType::Mongo => Self::Mongo,
223            PbCdcTableType::Citus => Self::Citus,
224            PbCdcTableType::Unspecified => Self::Unspecified,
225        }
226    }
227}
228
229crate::derive_from_blob!(VectorIndexInfo, PbVectorIndexInfo);
230
231#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
232#[sea_orm(table_name = "table")]
233pub struct Model {
234    #[sea_orm(primary_key, auto_increment = false)]
235    pub table_id: TableId,
236    pub name: String,
237    pub optional_associated_source_id: Option<SourceId>,
238    pub table_type: TableType,
239    pub belongs_to_job_id: Option<ObjectId>,
240    pub columns: ColumnCatalogArray,
241    pub pk: ColumnOrderArray,
242    pub distribution_key: I32Array,
243    pub stream_key: I32Array,
244    pub append_only: bool,
245    pub fragment_id: Option<FragmentId>,
246    pub vnode_col_index: Option<i32>,
247    pub row_id_index: Option<i32>,
248    pub value_indices: I32Array,
249    pub definition: String,
250    pub handle_pk_conflict_behavior: HandleConflictBehavior,
251    pub version_column_indices: Option<I32Array>,
252    pub read_prefix_len_hint: i32,
253    pub watermark_indices: I32Array,
254    pub dist_key_in_pk: I32Array,
255    pub dml_fragment_id: Option<FragmentId>,
256    pub cardinality: Option<Cardinality>,
257    pub cleaned_by_watermark: bool,
258    pub description: Option<String>,
259    pub version: Option<TableVersion>,
260    pub retention_seconds: Option<i32>,
261    pub cdc_table_id: Option<String>,
262    pub vnode_count: i32,
263    pub webhook_info: Option<WebhookSourceInfo>,
264    pub engine: Option<Engine>,
265    pub clean_watermark_index_in_pk: Option<i32>,
266    pub refreshable: bool,
267    pub vector_index_info: Option<VectorIndexInfo>,
268    pub cdc_table_type: Option<CdcTableType>,
269    pub refresh_state: Option<RefreshState>,
270}
271
272#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
273pub enum Relation {
274    #[sea_orm(
275        belongs_to = "super::fragment::Entity",
276        from = "Column::DmlFragmentId",
277        to = "super::fragment::Column::FragmentId",
278        on_update = "NoAction",
279        on_delete = "NoAction"
280    )]
281    Fragment2,
282    #[sea_orm(
283        belongs_to = "super::fragment::Entity",
284        from = "Column::FragmentId",
285        to = "super::fragment::Column::FragmentId",
286        on_update = "NoAction",
287        on_delete = "NoAction"
288    )]
289    Fragment1,
290    #[sea_orm(
291        belongs_to = "super::object::Entity",
292        from = "Column::BelongsToJobId",
293        to = "super::object::Column::Oid",
294        on_update = "NoAction",
295        on_delete = "Cascade"
296    )]
297    Object2,
298    #[sea_orm(
299        belongs_to = "super::object::Entity",
300        from = "Column::TableId",
301        to = "super::object::Column::Oid",
302        on_update = "NoAction",
303        on_delete = "Cascade"
304    )]
305    Object1,
306    #[sea_orm(
307        belongs_to = "super::source::Entity",
308        from = "Column::OptionalAssociatedSourceId",
309        to = "super::source::Column::SourceId",
310        on_update = "NoAction",
311        on_delete = "NoAction"
312    )]
313    Source,
314
315    // To join object_dependency on the used_by column
316    #[sea_orm(
317        belongs_to = "super::object_dependency::Entity",
318        from = "Column::TableId",
319        to = "super::object_dependency::Column::UsedBy",
320        on_update = "NoAction",
321        on_delete = "Cascade"
322    )]
323    ObjectDependency,
324}
325
326impl Related<super::object::Entity> for Entity {
327    fn to() -> RelationDef {
328        Relation::Object1.def()
329    }
330}
331
332impl Related<super::source::Entity> for Entity {
333    fn to() -> RelationDef {
334        Relation::Source.def()
335    }
336}
337
338impl ActiveModelBehavior for ActiveModel {}
339
340impl From<PbTable> for ActiveModel {
341    fn from(pb_table: PbTable) -> Self {
342        let table_type = pb_table.table_type();
343        let handle_pk_conflict_behavior = pb_table.handle_pk_conflict_behavior();
344        let refresh_state = pb_table.refresh_state();
345
346        // `PbTable` here should be sourced from the wire, not from persistence.
347        // A placeholder `maybe_vnode_count` field should be treated as `NotSet`, instead of calling
348        // the compatibility code.
349        let vnode_count = pb_table
350            .vnode_count_inner()
351            .value_opt()
352            .map(|v| v as _)
353            .map_or(NotSet, Set);
354        let fragment_id = if pb_table.fragment_id == OBJECT_ID_PLACEHOLDER {
355            NotSet
356        } else {
357            Set(Some(pb_table.fragment_id as FragmentId))
358        };
359        let dml_fragment_id = pb_table
360            .dml_fragment_id
361            .map(|x| Set(Some(x as FragmentId)))
362            .unwrap_or_default();
363        let optional_associated_source_id =
364            if let Some(OptionalAssociatedSourceId::AssociatedSourceId(src_id)) =
365                pb_table.optional_associated_source_id
366            {
367                Set(Some(src_id as SourceId))
368            } else {
369                NotSet
370            };
371
372        Self {
373            table_id: Set(pb_table.id as _),
374            name: Set(pb_table.name),
375            optional_associated_source_id,
376            table_type: Set(table_type.into()),
377            belongs_to_job_id: Set(pb_table.job_id.map(|x| x as _)),
378            columns: Set(pb_table.columns.into()),
379            pk: Set(pb_table.pk.into()),
380            distribution_key: Set(pb_table.distribution_key.into()),
381            stream_key: Set(pb_table.stream_key.into()),
382            append_only: Set(pb_table.append_only),
383            fragment_id,
384            vnode_col_index: Set(pb_table.vnode_col_index.map(|x| x as i32)),
385            row_id_index: Set(pb_table.row_id_index.map(|x| x as i32)),
386            value_indices: Set(pb_table.value_indices.into()),
387            definition: Set(pb_table.definition),
388            handle_pk_conflict_behavior: Set(handle_pk_conflict_behavior.into()),
389            version_column_indices: Set(Some(
390                pb_table
391                    .version_column_indices
392                    .iter()
393                    .map(|x| *x as i32)
394                    .collect::<Vec<_>>()
395                    .into(),
396            )),
397            read_prefix_len_hint: Set(pb_table.read_prefix_len_hint as _),
398            watermark_indices: Set(pb_table.watermark_indices.into()),
399            dist_key_in_pk: Set(pb_table.dist_key_in_pk.into()),
400            dml_fragment_id,
401            cardinality: Set(pb_table.cardinality.as_ref().map(|x| x.into())),
402            cleaned_by_watermark: Set(pb_table.cleaned_by_watermark),
403            description: Set(pb_table.description),
404            version: Set(pb_table.version.as_ref().map(|v| v.into())),
405            retention_seconds: Set(pb_table.retention_seconds.map(|i| i as _)),
406            cdc_table_id: Set(pb_table.cdc_table_id),
407            vnode_count,
408            webhook_info: Set(pb_table.webhook_info.as_ref().map(WebhookSourceInfo::from)),
409            engine: Set(pb_table
410                .engine
411                .map(|engine| Engine::from(PbEngine::try_from(engine).expect("Invalid engine")))),
412            clean_watermark_index_in_pk: Set(pb_table.clean_watermark_index_in_pk),
413            refreshable: Set(pb_table.refreshable),
414            vector_index_info: Set(pb_table
415                .vector_index_info
416                .as_ref()
417                .map(VectorIndexInfo::from)),
418            cdc_table_type: Set(pb_table.cdc_table_type.map(|cdc_table_type| {
419                match cdc_table_type {
420                    0 => CdcTableType::Postgres, // Map Unspecified to Postgres as default
421                    1 => CdcTableType::Postgres,
422                    2 => CdcTableType::Mysql,
423                    3 => CdcTableType::Sqlserver,
424                    4 => CdcTableType::Mongo,
425                    _ => panic!("Invalid CDC table type: {cdc_table_type}"),
426                }
427            })),
428            refresh_state: Set(Some(refresh_state.into())),
429        }
430    }
431}