risingwave_meta_model/
table.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::catalog::OBJECT_ID_PLACEHOLDER;
16use risingwave_common::hash::VnodeCountCompat;
17use risingwave_pb::catalog::table::{OptionalAssociatedSourceId, PbEngine, PbTableType};
18use risingwave_pb::catalog::{PbHandleConflictBehavior, PbTable};
19use sea_orm::ActiveValue::Set;
20use sea_orm::NotSet;
21use sea_orm::entity::prelude::*;
22use serde::{Deserialize, Serialize};
23
24use crate::{
25    Cardinality, ColumnCatalogArray, ColumnOrderArray, FragmentId, I32Array, ObjectId, SourceId,
26    TableId, TableVersion, WebhookSourceInfo,
27};
28
29#[derive(
30    Clone, Debug, PartialEq, Hash, Copy, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize,
31)]
32#[sea_orm(rs_type = "String", db_type = "string(None)")]
33pub enum TableType {
34    #[sea_orm(string_value = "TABLE")]
35    Table,
36    #[sea_orm(string_value = "MATERIALIZED_VIEW")]
37    MaterializedView,
38    #[sea_orm(string_value = "INDEX")]
39    Index,
40    #[sea_orm(string_value = "INTERNAL")]
41    Internal,
42}
43
44impl From<TableType> for PbTableType {
45    fn from(table_type: TableType) -> Self {
46        match table_type {
47            TableType::Table => Self::Table,
48            TableType::MaterializedView => Self::MaterializedView,
49            TableType::Index => Self::Index,
50            TableType::Internal => Self::Internal,
51        }
52    }
53}
54
55impl From<PbTableType> for TableType {
56    fn from(table_type: PbTableType) -> Self {
57        match table_type {
58            PbTableType::Table => Self::Table,
59            PbTableType::MaterializedView => Self::MaterializedView,
60            PbTableType::Index => Self::Index,
61            PbTableType::Internal => Self::Internal,
62            PbTableType::Unspecified => unreachable!("Unspecified table type"),
63        }
64    }
65}
66
67#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
68#[sea_orm(rs_type = "String", db_type = "string(None)")]
69pub enum HandleConflictBehavior {
70    #[sea_orm(string_value = "OVERWRITE")]
71    Overwrite,
72    #[sea_orm(string_value = "IGNORE")]
73    Ignore,
74    #[sea_orm(string_value = "NO_CHECK")]
75    NoCheck,
76    #[sea_orm(string_value = "DO_UPDATE_IF_NOT_NULL")]
77    DoUpdateIfNotNull,
78}
79
80impl From<HandleConflictBehavior> for PbHandleConflictBehavior {
81    fn from(handle_conflict_behavior: HandleConflictBehavior) -> Self {
82        match handle_conflict_behavior {
83            HandleConflictBehavior::Overwrite => Self::Overwrite,
84            HandleConflictBehavior::Ignore => Self::Ignore,
85            HandleConflictBehavior::NoCheck => Self::NoCheck,
86            HandleConflictBehavior::DoUpdateIfNotNull => Self::DoUpdateIfNotNull,
87        }
88    }
89}
90
91impl From<PbHandleConflictBehavior> for HandleConflictBehavior {
92    fn from(handle_conflict_behavior: PbHandleConflictBehavior) -> Self {
93        match handle_conflict_behavior {
94            PbHandleConflictBehavior::Overwrite => Self::Overwrite,
95            PbHandleConflictBehavior::Ignore => Self::Ignore,
96            PbHandleConflictBehavior::NoCheck => Self::NoCheck,
97            PbHandleConflictBehavior::DoUpdateIfNotNull => Self::DoUpdateIfNotNull,
98            PbHandleConflictBehavior::Unspecified => {
99                unreachable!("Unspecified handle conflict behavior")
100            }
101        }
102    }
103}
104
105#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
106#[sea_orm(rs_type = "String", db_type = "string(None)")]
107pub enum Engine {
108    #[sea_orm(string_value = "HUMMOCK")]
109    Hummock,
110    #[sea_orm(string_value = "ICEBERG")]
111    Iceberg,
112}
113
114impl From<Engine> for PbEngine {
115    fn from(engine: Engine) -> Self {
116        match engine {
117            Engine::Hummock => Self::Hummock,
118            Engine::Iceberg => Self::Iceberg,
119        }
120    }
121}
122
123impl From<PbEngine> for Engine {
124    fn from(engine: PbEngine) -> Self {
125        match engine {
126            PbEngine::Hummock => Self::Hummock,
127            PbEngine::Iceberg => Self::Iceberg,
128            PbEngine::Unspecified => Self::Hummock,
129        }
130    }
131}
132
133#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
134#[sea_orm(table_name = "table")]
135pub struct Model {
136    #[sea_orm(primary_key, auto_increment = false)]
137    pub table_id: TableId,
138    pub name: String,
139    pub optional_associated_source_id: Option<SourceId>,
140    pub table_type: TableType,
141    pub belongs_to_job_id: Option<ObjectId>,
142    pub columns: ColumnCatalogArray,
143    pub pk: ColumnOrderArray,
144    pub distribution_key: I32Array,
145    pub stream_key: I32Array,
146    pub append_only: bool,
147    pub fragment_id: Option<FragmentId>,
148    pub vnode_col_index: Option<i32>,
149    pub row_id_index: Option<i32>,
150    pub value_indices: I32Array,
151    pub definition: String,
152    pub handle_pk_conflict_behavior: HandleConflictBehavior,
153    pub version_column_index: Option<i32>,
154    pub read_prefix_len_hint: i32,
155    pub watermark_indices: I32Array,
156    pub dist_key_in_pk: I32Array,
157    pub dml_fragment_id: Option<FragmentId>,
158    pub cardinality: Option<Cardinality>,
159    pub cleaned_by_watermark: bool,
160    pub description: Option<String>,
161    pub version: Option<TableVersion>,
162    pub retention_seconds: Option<i32>,
163    pub incoming_sinks: I32Array,
164    pub cdc_table_id: Option<String>,
165    pub vnode_count: i32,
166    pub webhook_info: Option<WebhookSourceInfo>,
167    pub engine: Option<Engine>,
168    pub clean_watermark_index_in_pk: Option<i32>,
169}
170
171#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
172pub enum Relation {
173    #[sea_orm(
174        belongs_to = "super::fragment::Entity",
175        from = "Column::DmlFragmentId",
176        to = "super::fragment::Column::FragmentId",
177        on_update = "NoAction",
178        on_delete = "NoAction"
179    )]
180    Fragment2,
181    #[sea_orm(
182        belongs_to = "super::fragment::Entity",
183        from = "Column::FragmentId",
184        to = "super::fragment::Column::FragmentId",
185        on_update = "NoAction",
186        on_delete = "NoAction"
187    )]
188    Fragment1,
189    #[sea_orm(
190        belongs_to = "super::object::Entity",
191        from = "Column::BelongsToJobId",
192        to = "super::object::Column::Oid",
193        on_update = "NoAction",
194        on_delete = "Cascade"
195    )]
196    Object2,
197    #[sea_orm(
198        belongs_to = "super::object::Entity",
199        from = "Column::TableId",
200        to = "super::object::Column::Oid",
201        on_update = "NoAction",
202        on_delete = "Cascade"
203    )]
204    Object1,
205    #[sea_orm(
206        belongs_to = "super::source::Entity",
207        from = "Column::OptionalAssociatedSourceId",
208        to = "super::source::Column::SourceId",
209        on_update = "NoAction",
210        on_delete = "NoAction"
211    )]
212    Source,
213
214    // To join object_dependency on the used_by column
215    #[sea_orm(
216        belongs_to = "super::object_dependency::Entity",
217        from = "Column::TableId",
218        to = "super::object_dependency::Column::UsedBy",
219        on_update = "NoAction",
220        on_delete = "Cascade"
221    )]
222    ObjectDependency,
223}
224
225impl Related<super::object::Entity> for Entity {
226    fn to() -> RelationDef {
227        Relation::Object1.def()
228    }
229}
230
231impl Related<super::source::Entity> for Entity {
232    fn to() -> RelationDef {
233        Relation::Source.def()
234    }
235}
236
237impl ActiveModelBehavior for ActiveModel {}
238
239impl From<PbTable> for ActiveModel {
240    fn from(pb_table: PbTable) -> Self {
241        let table_type = pb_table.table_type();
242        let handle_pk_conflict_behavior = pb_table.handle_pk_conflict_behavior();
243
244        // `PbTable` here should be sourced from the wire, not from persistence.
245        // A placeholder `maybe_vnode_count` field should be treated as `NotSet`, instead of calling
246        // the compatibility code.
247        let vnode_count = pb_table
248            .vnode_count_inner()
249            .value_opt()
250            .map(|v| v as _)
251            .map_or(NotSet, Set);
252        let fragment_id = if pb_table.fragment_id == OBJECT_ID_PLACEHOLDER {
253            NotSet
254        } else {
255            Set(Some(pb_table.fragment_id as FragmentId))
256        };
257        let dml_fragment_id = pb_table
258            .dml_fragment_id
259            .map(|x| Set(Some(x as FragmentId)))
260            .unwrap_or_default();
261        let optional_associated_source_id =
262            if let Some(OptionalAssociatedSourceId::AssociatedSourceId(src_id)) =
263                pb_table.optional_associated_source_id
264            {
265                Set(Some(src_id as SourceId))
266            } else {
267                NotSet
268            };
269
270        Self {
271            table_id: Set(pb_table.id as _),
272            name: Set(pb_table.name),
273            optional_associated_source_id,
274            table_type: Set(table_type.into()),
275            belongs_to_job_id: Set(pb_table.job_id.map(|x| x as _)),
276            columns: Set(pb_table.columns.into()),
277            pk: Set(pb_table.pk.into()),
278            distribution_key: Set(pb_table.distribution_key.into()),
279            stream_key: Set(pb_table.stream_key.into()),
280            append_only: Set(pb_table.append_only),
281            fragment_id,
282            vnode_col_index: Set(pb_table.vnode_col_index.map(|x| x as i32)),
283            row_id_index: Set(pb_table.row_id_index.map(|x| x as i32)),
284            value_indices: Set(pb_table.value_indices.into()),
285            definition: Set(pb_table.definition),
286            handle_pk_conflict_behavior: Set(handle_pk_conflict_behavior.into()),
287            version_column_index: Set(pb_table.version_column_index.map(|x| x as i32)),
288            read_prefix_len_hint: Set(pb_table.read_prefix_len_hint as _),
289            watermark_indices: Set(pb_table.watermark_indices.into()),
290            dist_key_in_pk: Set(pb_table.dist_key_in_pk.into()),
291            dml_fragment_id,
292            cardinality: Set(pb_table.cardinality.as_ref().map(|x| x.into())),
293            cleaned_by_watermark: Set(pb_table.cleaned_by_watermark),
294            description: Set(pb_table.description),
295            version: Set(pb_table.version.as_ref().map(|v| v.into())),
296            retention_seconds: Set(pb_table.retention_seconds.map(|i| i as _)),
297            incoming_sinks: Set(pb_table.incoming_sinks.into()),
298            cdc_table_id: Set(pb_table.cdc_table_id),
299            vnode_count,
300            webhook_info: Set(pb_table.webhook_info.as_ref().map(WebhookSourceInfo::from)),
301            engine: Set(pb_table
302                .engine
303                .map(|engine| Engine::from(PbEngine::try_from(engine).expect("Invalid engine")))),
304            clean_watermark_index_in_pk: Set(pb_table.clean_watermark_index_in_pk),
305        }
306    }
307}