risingwave_meta_model/
table.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::catalog::OBJECT_ID_PLACEHOLDER;
16use risingwave_common::hash::VnodeCountCompat;
17use risingwave_common::id::JobId;
18use risingwave_pb::catalog::table::{CdcTableType as PbCdcTableType, PbEngine, PbTableType};
19use risingwave_pb::catalog::{PbHandleConflictBehavior, PbTable, PbVectorIndexInfo};
20use sea_orm::ActiveValue::Set;
21use sea_orm::NotSet;
22use sea_orm::entity::prelude::*;
23use serde::{Deserialize, Serialize};
24
25use crate::{
26    Cardinality, ColumnCatalogArray, ColumnOrderArray, FragmentId, I32Array, SourceId, TableId,
27    TableVersion, WebhookSourceInfo,
28};
29
30#[derive(
31    Clone, Debug, PartialEq, Hash, Copy, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize,
32)]
33#[sea_orm(rs_type = "String", db_type = "string(None)")]
34pub enum TableType {
35    #[sea_orm(string_value = "TABLE")]
36    Table,
37    #[sea_orm(string_value = "MATERIALIZED_VIEW")]
38    MaterializedView,
39    #[sea_orm(string_value = "INDEX")]
40    Index,
41    #[sea_orm(string_value = "INTERNAL")]
42    Internal,
43    #[sea_orm(string_value = "VECTOR_INDEX")]
44    VectorIndex,
45}
46
47impl From<TableType> for PbTableType {
48    fn from(table_type: TableType) -> Self {
49        match table_type {
50            TableType::Table => Self::Table,
51            TableType::MaterializedView => Self::MaterializedView,
52            TableType::Index => Self::Index,
53            TableType::Internal => Self::Internal,
54            TableType::VectorIndex => Self::VectorIndex,
55        }
56    }
57}
58
59impl From<PbTableType> for TableType {
60    fn from(table_type: PbTableType) -> Self {
61        match table_type {
62            PbTableType::Table => Self::Table,
63            PbTableType::MaterializedView => Self::MaterializedView,
64            PbTableType::Index => Self::Index,
65            PbTableType::Internal => Self::Internal,
66            PbTableType::VectorIndex => Self::VectorIndex,
67            PbTableType::Unspecified => unreachable!("Unspecified table type"),
68        }
69    }
70}
71
72#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
73#[sea_orm(rs_type = "String", db_type = "string(None)")]
74pub enum HandleConflictBehavior {
75    #[sea_orm(string_value = "OVERWRITE")]
76    Overwrite,
77    #[sea_orm(string_value = "IGNORE")]
78    Ignore,
79    #[sea_orm(string_value = "NO_CHECK")]
80    NoCheck,
81    #[sea_orm(string_value = "DO_UPDATE_IF_NOT_NULL")]
82    DoUpdateIfNotNull,
83}
84
85impl From<HandleConflictBehavior> for PbHandleConflictBehavior {
86    fn from(handle_conflict_behavior: HandleConflictBehavior) -> Self {
87        match handle_conflict_behavior {
88            HandleConflictBehavior::Overwrite => Self::Overwrite,
89            HandleConflictBehavior::Ignore => Self::Ignore,
90            HandleConflictBehavior::NoCheck => Self::NoCheck,
91            HandleConflictBehavior::DoUpdateIfNotNull => Self::DoUpdateIfNotNull,
92        }
93    }
94}
95
96impl From<PbHandleConflictBehavior> for HandleConflictBehavior {
97    fn from(handle_conflict_behavior: PbHandleConflictBehavior) -> Self {
98        match handle_conflict_behavior {
99            PbHandleConflictBehavior::Overwrite => Self::Overwrite,
100            PbHandleConflictBehavior::Ignore => Self::Ignore,
101            PbHandleConflictBehavior::NoCheck => Self::NoCheck,
102            PbHandleConflictBehavior::DoUpdateIfNotNull => Self::DoUpdateIfNotNull,
103            PbHandleConflictBehavior::Unspecified => {
104                unreachable!("Unspecified handle conflict behavior")
105            }
106        }
107    }
108}
109
110#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
111#[sea_orm(rs_type = "String", db_type = "string(None)")]
112pub enum Engine {
113    #[sea_orm(string_value = "HUMMOCK")]
114    Hummock,
115    #[sea_orm(string_value = "ICEBERG")]
116    Iceberg,
117}
118
119impl From<Engine> for PbEngine {
120    fn from(engine: Engine) -> Self {
121        match engine {
122            Engine::Hummock => Self::Hummock,
123            Engine::Iceberg => Self::Iceberg,
124        }
125    }
126}
127
128impl From<PbEngine> for Engine {
129    fn from(engine: PbEngine) -> Self {
130        match engine {
131            PbEngine::Hummock => Self::Hummock,
132            PbEngine::Iceberg => Self::Iceberg,
133            PbEngine::Unspecified => Self::Hummock,
134        }
135    }
136}
137
138#[derive(Clone, Debug, PartialEq, Eq, Hash, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
139#[sea_orm(rs_type = "String", db_type = "string(None)")]
140pub enum CdcTableType {
141    #[sea_orm(string_value = "UNSPECIFIED")]
142    Unspecified,
143    #[sea_orm(string_value = "POSTGRES")]
144    Postgres,
145    #[sea_orm(string_value = "MYSQL")]
146    Mysql,
147    #[sea_orm(string_value = "SQLSERVER")]
148    Sqlserver,
149    #[sea_orm(string_value = "MONGO")]
150    Mongo,
151    #[sea_orm(string_value = "CITUS")]
152    Citus,
153}
154
155impl From<CdcTableType> for PbCdcTableType {
156    fn from(cdc_table_type: CdcTableType) -> Self {
157        match cdc_table_type {
158            CdcTableType::Postgres => Self::Postgres,
159            CdcTableType::Mysql => Self::Mysql,
160            CdcTableType::Sqlserver => Self::Sqlserver,
161            CdcTableType::Mongo => Self::Mongo,
162            CdcTableType::Citus => Self::Citus,
163            CdcTableType::Unspecified => Self::Unspecified,
164        }
165    }
166}
167
168impl From<PbCdcTableType> for CdcTableType {
169    fn from(cdc_table_type: PbCdcTableType) -> Self {
170        match cdc_table_type {
171            PbCdcTableType::Postgres => Self::Postgres,
172            PbCdcTableType::Mysql => Self::Mysql,
173            PbCdcTableType::Sqlserver => Self::Sqlserver,
174            PbCdcTableType::Mongo => Self::Mongo,
175            PbCdcTableType::Citus => Self::Citus,
176            PbCdcTableType::Unspecified => Self::Unspecified,
177        }
178    }
179}
180
181crate::derive_from_blob!(VectorIndexInfo, PbVectorIndexInfo);
182
183#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
184#[sea_orm(table_name = "table")]
185pub struct Model {
186    #[sea_orm(primary_key, auto_increment = false)]
187    pub table_id: TableId,
188    pub name: String,
189    pub optional_associated_source_id: Option<SourceId>,
190    pub table_type: TableType,
191    pub belongs_to_job_id: Option<JobId>,
192    pub columns: ColumnCatalogArray,
193    pub pk: ColumnOrderArray,
194    pub distribution_key: I32Array,
195    pub stream_key: I32Array,
196    pub append_only: bool,
197    pub fragment_id: Option<FragmentId>,
198    pub vnode_col_index: Option<i32>,
199    pub row_id_index: Option<i32>,
200    pub value_indices: I32Array,
201    pub definition: String,
202    pub handle_pk_conflict_behavior: HandleConflictBehavior,
203    pub version_column_indices: Option<I32Array>,
204    pub read_prefix_len_hint: i32,
205    pub watermark_indices: I32Array,
206    pub dist_key_in_pk: I32Array,
207    pub dml_fragment_id: Option<FragmentId>,
208    pub cardinality: Option<Cardinality>,
209    pub cleaned_by_watermark: bool,
210    pub description: Option<String>,
211    pub version: Option<TableVersion>,
212    pub retention_seconds: Option<i32>,
213    pub cdc_table_id: Option<String>,
214    pub vnode_count: i32,
215    pub webhook_info: Option<WebhookSourceInfo>,
216    pub engine: Option<Engine>,
217    pub clean_watermark_index_in_pk: Option<i32>,
218    pub clean_watermark_indices: Option<I32Array>,
219    pub refreshable: bool,
220    pub vector_index_info: Option<VectorIndexInfo>,
221    pub cdc_table_type: Option<CdcTableType>,
222}
223
224#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
225pub enum Relation {
226    #[sea_orm(
227        belongs_to = "super::fragment::Entity",
228        from = "Column::DmlFragmentId",
229        to = "super::fragment::Column::FragmentId",
230        on_update = "NoAction",
231        on_delete = "NoAction"
232    )]
233    Fragment2,
234    #[sea_orm(
235        belongs_to = "super::fragment::Entity",
236        from = "Column::FragmentId",
237        to = "super::fragment::Column::FragmentId",
238        on_update = "NoAction",
239        on_delete = "NoAction"
240    )]
241    Fragment1,
242    #[sea_orm(
243        belongs_to = "super::object::Entity",
244        from = "Column::BelongsToJobId",
245        to = "super::object::Column::Oid",
246        on_update = "NoAction",
247        on_delete = "Cascade"
248    )]
249    Object2,
250    #[sea_orm(
251        belongs_to = "super::object::Entity",
252        from = "Column::TableId",
253        to = "super::object::Column::Oid",
254        on_update = "NoAction",
255        on_delete = "Cascade"
256    )]
257    Object1,
258    #[sea_orm(
259        belongs_to = "super::source::Entity",
260        from = "Column::OptionalAssociatedSourceId",
261        to = "super::source::Column::SourceId",
262        on_update = "NoAction",
263        on_delete = "NoAction"
264    )]
265    Source,
266
267    // To join object_dependency on the used_by column
268    #[sea_orm(
269        belongs_to = "super::object_dependency::Entity",
270        from = "Column::TableId",
271        to = "super::object_dependency::Column::UsedBy",
272        on_update = "NoAction",
273        on_delete = "Cascade"
274    )]
275    ObjectDependency,
276}
277
278impl Related<super::object::Entity> for Entity {
279    fn to() -> RelationDef {
280        Relation::Object1.def()
281    }
282}
283
284impl Related<super::source::Entity> for Entity {
285    fn to() -> RelationDef {
286        Relation::Source.def()
287    }
288}
289
290impl ActiveModelBehavior for ActiveModel {}
291
292impl Model {
293    pub fn job_id(&self) -> JobId {
294        self.belongs_to_job_id
295            .unwrap_or_else(|| self.table_id.as_job_id())
296    }
297}
298
299impl From<PbTable> for ActiveModel {
300    fn from(pb_table: PbTable) -> Self {
301        let table_type = pb_table.table_type();
302        let handle_pk_conflict_behavior = pb_table.handle_pk_conflict_behavior();
303
304        // `PbTable` here should be sourced from the wire, not from persistence.
305        // A placeholder `maybe_vnode_count` field should be treated as `NotSet`, instead of calling
306        // the compatibility code.
307        let vnode_count = pb_table
308            .vnode_count_inner()
309            .value_opt()
310            .map(|v| v as _)
311            .map_or(NotSet, Set);
312        let fragment_id = if pb_table.fragment_id == OBJECT_ID_PLACEHOLDER {
313            NotSet
314        } else {
315            Set(Some(pb_table.fragment_id))
316        };
317        let dml_fragment_id = pb_table
318            .dml_fragment_id
319            .map(|x| Set(Some(x)))
320            .unwrap_or_default();
321        let optional_associated_source_id =
322            if let Some(src_id) = pb_table.optional_associated_source_id {
323                Set(Some(src_id.into()))
324            } else {
325                NotSet
326            };
327
328        Self {
329            table_id: Set(pb_table.id),
330            name: Set(pb_table.name),
331            optional_associated_source_id,
332            table_type: Set(table_type.into()),
333            belongs_to_job_id: Set(pb_table.job_id),
334            columns: Set(pb_table.columns.into()),
335            pk: Set(pb_table.pk.into()),
336            distribution_key: Set(pb_table.distribution_key.into()),
337            stream_key: Set(pb_table.stream_key.into()),
338            append_only: Set(pb_table.append_only),
339            fragment_id,
340            vnode_col_index: Set(pb_table.vnode_col_index.map(|x| x as i32)),
341            row_id_index: Set(pb_table.row_id_index.map(|x| x as i32)),
342            value_indices: Set(pb_table.value_indices.into()),
343            definition: Set(pb_table.definition),
344            handle_pk_conflict_behavior: Set(handle_pk_conflict_behavior.into()),
345            version_column_indices: Set(Some(
346                pb_table
347                    .version_column_indices
348                    .iter()
349                    .map(|x| *x as i32)
350                    .collect::<Vec<_>>()
351                    .into(),
352            )),
353            read_prefix_len_hint: Set(pb_table.read_prefix_len_hint as _),
354            watermark_indices: Set(pb_table.watermark_indices.into()),
355            dist_key_in_pk: Set(pb_table.dist_key_in_pk.into()),
356            dml_fragment_id,
357            cardinality: Set(pb_table.cardinality.as_ref().map(|x| x.into())),
358            #[expect(deprecated)]
359            cleaned_by_watermark: Set(pb_table.cleaned_by_watermark),
360            description: Set(pb_table.description),
361            version: Set(pb_table.version.as_ref().map(|v| v.into())),
362            retention_seconds: Set(pb_table.retention_seconds.map(|i| i as _)),
363            cdc_table_id: Set(pb_table.cdc_table_id),
364            vnode_count,
365            webhook_info: Set(pb_table.webhook_info.as_ref().map(WebhookSourceInfo::from)),
366            engine: Set(pb_table
367                .engine
368                .map(|engine| Engine::from(PbEngine::try_from(engine).expect("Invalid engine")))),
369            #[expect(deprecated)]
370            clean_watermark_index_in_pk: Set(pb_table.clean_watermark_index_in_pk),
371            clean_watermark_indices: Set(if pb_table.clean_watermark_indices.is_empty() {
372                None
373            } else {
374                Some(
375                    pb_table
376                        .clean_watermark_indices
377                        .iter()
378                        .map(|x| *x as i32)
379                        .collect::<Vec<_>>()
380                        .into(),
381                )
382            }),
383            refreshable: Set(pb_table.refreshable),
384            vector_index_info: Set(pb_table
385                .vector_index_info
386                .as_ref()
387                .map(VectorIndexInfo::from)),
388            cdc_table_type: Set(pb_table.cdc_table_type.map(|cdc_table_type| {
389                match cdc_table_type {
390                    0 => CdcTableType::Postgres, // Map Unspecified to Postgres as default
391                    1 => CdcTableType::Postgres,
392                    2 => CdcTableType::Mysql,
393                    3 => CdcTableType::Sqlserver,
394                    4 => CdcTableType::Mongo,
395                    _ => panic!("Invalid CDC table type: {cdc_table_type}"),
396                }
397            })),
398        }
399    }
400}