risingwave_meta_model/
table.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::catalog::OBJECT_ID_PLACEHOLDER;
16use risingwave_common::hash::VnodeCountCompat;
17use risingwave_pb::catalog::table::{OptionalAssociatedSourceId, PbEngine, PbTableType};
18use risingwave_pb::catalog::{PbHandleConflictBehavior, PbTable, PbVectorIndexInfo};
19use sea_orm::ActiveValue::Set;
20use sea_orm::NotSet;
21use sea_orm::entity::prelude::*;
22use serde::{Deserialize, Serialize};
23
24use crate::{
25    Cardinality, ColumnCatalogArray, ColumnOrderArray, FragmentId, I32Array, ObjectId, SourceId,
26    TableId, TableVersion, WebhookSourceInfo,
27};
28
29#[derive(
30    Clone, Debug, PartialEq, Hash, Copy, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize,
31)]
32#[sea_orm(rs_type = "String", db_type = "string(None)")]
33pub enum TableType {
34    #[sea_orm(string_value = "TABLE")]
35    Table,
36    #[sea_orm(string_value = "MATERIALIZED_VIEW")]
37    MaterializedView,
38    #[sea_orm(string_value = "INDEX")]
39    Index,
40    #[sea_orm(string_value = "INTERNAL")]
41    Internal,
42}
43
44impl From<TableType> for PbTableType {
45    fn from(table_type: TableType) -> Self {
46        match table_type {
47            TableType::Table => Self::Table,
48            TableType::MaterializedView => Self::MaterializedView,
49            TableType::Index => Self::Index,
50            TableType::Internal => Self::Internal,
51        }
52    }
53}
54
55impl From<PbTableType> for TableType {
56    fn from(table_type: PbTableType) -> Self {
57        match table_type {
58            PbTableType::Table => Self::Table,
59            PbTableType::MaterializedView => Self::MaterializedView,
60            PbTableType::Index => Self::Index,
61            PbTableType::Internal => Self::Internal,
62            PbTableType::Unspecified => unreachable!("Unspecified table type"),
63        }
64    }
65}
66
67#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
68#[sea_orm(rs_type = "String", db_type = "string(None)")]
69pub enum HandleConflictBehavior {
70    #[sea_orm(string_value = "OVERWRITE")]
71    Overwrite,
72    #[sea_orm(string_value = "IGNORE")]
73    Ignore,
74    #[sea_orm(string_value = "NO_CHECK")]
75    NoCheck,
76    #[sea_orm(string_value = "DO_UPDATE_IF_NOT_NULL")]
77    DoUpdateIfNotNull,
78}
79
80impl From<HandleConflictBehavior> for PbHandleConflictBehavior {
81    fn from(handle_conflict_behavior: HandleConflictBehavior) -> Self {
82        match handle_conflict_behavior {
83            HandleConflictBehavior::Overwrite => Self::Overwrite,
84            HandleConflictBehavior::Ignore => Self::Ignore,
85            HandleConflictBehavior::NoCheck => Self::NoCheck,
86            HandleConflictBehavior::DoUpdateIfNotNull => Self::DoUpdateIfNotNull,
87        }
88    }
89}
90
91impl From<PbHandleConflictBehavior> for HandleConflictBehavior {
92    fn from(handle_conflict_behavior: PbHandleConflictBehavior) -> Self {
93        match handle_conflict_behavior {
94            PbHandleConflictBehavior::Overwrite => Self::Overwrite,
95            PbHandleConflictBehavior::Ignore => Self::Ignore,
96            PbHandleConflictBehavior::NoCheck => Self::NoCheck,
97            PbHandleConflictBehavior::DoUpdateIfNotNull => Self::DoUpdateIfNotNull,
98            PbHandleConflictBehavior::Unspecified => {
99                unreachable!("Unspecified handle conflict behavior")
100            }
101        }
102    }
103}
104
105#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
106#[sea_orm(rs_type = "String", db_type = "string(None)")]
107pub enum Engine {
108    #[sea_orm(string_value = "HUMMOCK")]
109    Hummock,
110    #[sea_orm(string_value = "ICEBERG")]
111    Iceberg,
112}
113
114impl From<Engine> for PbEngine {
115    fn from(engine: Engine) -> Self {
116        match engine {
117            Engine::Hummock => Self::Hummock,
118            Engine::Iceberg => Self::Iceberg,
119        }
120    }
121}
122
123impl From<PbEngine> for Engine {
124    fn from(engine: PbEngine) -> Self {
125        match engine {
126            PbEngine::Hummock => Self::Hummock,
127            PbEngine::Iceberg => Self::Iceberg,
128            PbEngine::Unspecified => Self::Hummock,
129        }
130    }
131}
132
133crate::derive_from_blob!(VectorIndexInfo, PbVectorIndexInfo);
134
135#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
136#[sea_orm(table_name = "table")]
137pub struct Model {
138    #[sea_orm(primary_key, auto_increment = false)]
139    pub table_id: TableId,
140    pub name: String,
141    pub optional_associated_source_id: Option<SourceId>,
142    pub table_type: TableType,
143    pub belongs_to_job_id: Option<ObjectId>,
144    pub columns: ColumnCatalogArray,
145    pub pk: ColumnOrderArray,
146    pub distribution_key: I32Array,
147    pub stream_key: I32Array,
148    pub append_only: bool,
149    pub fragment_id: Option<FragmentId>,
150    pub vnode_col_index: Option<i32>,
151    pub row_id_index: Option<i32>,
152    pub value_indices: I32Array,
153    pub definition: String,
154    pub handle_pk_conflict_behavior: HandleConflictBehavior,
155    pub version_column_index: Option<i32>,
156    pub read_prefix_len_hint: i32,
157    pub watermark_indices: I32Array,
158    pub dist_key_in_pk: I32Array,
159    pub dml_fragment_id: Option<FragmentId>,
160    pub cardinality: Option<Cardinality>,
161    pub cleaned_by_watermark: bool,
162    pub description: Option<String>,
163    pub version: Option<TableVersion>,
164    pub retention_seconds: Option<i32>,
165    pub incoming_sinks: I32Array,
166    pub cdc_table_id: Option<String>,
167    pub vnode_count: i32,
168    pub webhook_info: Option<WebhookSourceInfo>,
169    pub engine: Option<Engine>,
170    pub clean_watermark_index_in_pk: Option<i32>,
171    pub refreshable: bool,
172    pub vector_index_info: Option<VectorIndexInfo>,
173}
174
175#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
176pub enum Relation {
177    #[sea_orm(
178        belongs_to = "super::fragment::Entity",
179        from = "Column::DmlFragmentId",
180        to = "super::fragment::Column::FragmentId",
181        on_update = "NoAction",
182        on_delete = "NoAction"
183    )]
184    Fragment2,
185    #[sea_orm(
186        belongs_to = "super::fragment::Entity",
187        from = "Column::FragmentId",
188        to = "super::fragment::Column::FragmentId",
189        on_update = "NoAction",
190        on_delete = "NoAction"
191    )]
192    Fragment1,
193    #[sea_orm(
194        belongs_to = "super::object::Entity",
195        from = "Column::BelongsToJobId",
196        to = "super::object::Column::Oid",
197        on_update = "NoAction",
198        on_delete = "Cascade"
199    )]
200    Object2,
201    #[sea_orm(
202        belongs_to = "super::object::Entity",
203        from = "Column::TableId",
204        to = "super::object::Column::Oid",
205        on_update = "NoAction",
206        on_delete = "Cascade"
207    )]
208    Object1,
209    #[sea_orm(
210        belongs_to = "super::source::Entity",
211        from = "Column::OptionalAssociatedSourceId",
212        to = "super::source::Column::SourceId",
213        on_update = "NoAction",
214        on_delete = "NoAction"
215    )]
216    Source,
217
218    // To join object_dependency on the used_by column
219    #[sea_orm(
220        belongs_to = "super::object_dependency::Entity",
221        from = "Column::TableId",
222        to = "super::object_dependency::Column::UsedBy",
223        on_update = "NoAction",
224        on_delete = "Cascade"
225    )]
226    ObjectDependency,
227}
228
229impl Related<super::object::Entity> for Entity {
230    fn to() -> RelationDef {
231        Relation::Object1.def()
232    }
233}
234
235impl Related<super::source::Entity> for Entity {
236    fn to() -> RelationDef {
237        Relation::Source.def()
238    }
239}
240
241impl ActiveModelBehavior for ActiveModel {}
242
243impl From<PbTable> for ActiveModel {
244    fn from(pb_table: PbTable) -> Self {
245        let table_type = pb_table.table_type();
246        let handle_pk_conflict_behavior = pb_table.handle_pk_conflict_behavior();
247
248        // `PbTable` here should be sourced from the wire, not from persistence.
249        // A placeholder `maybe_vnode_count` field should be treated as `NotSet`, instead of calling
250        // the compatibility code.
251        let vnode_count = pb_table
252            .vnode_count_inner()
253            .value_opt()
254            .map(|v| v as _)
255            .map_or(NotSet, Set);
256        let fragment_id = if pb_table.fragment_id == OBJECT_ID_PLACEHOLDER {
257            NotSet
258        } else {
259            Set(Some(pb_table.fragment_id as FragmentId))
260        };
261        let dml_fragment_id = pb_table
262            .dml_fragment_id
263            .map(|x| Set(Some(x as FragmentId)))
264            .unwrap_or_default();
265        let optional_associated_source_id =
266            if let Some(OptionalAssociatedSourceId::AssociatedSourceId(src_id)) =
267                pb_table.optional_associated_source_id
268            {
269                Set(Some(src_id as SourceId))
270            } else {
271                NotSet
272            };
273
274        Self {
275            table_id: Set(pb_table.id as _),
276            name: Set(pb_table.name),
277            optional_associated_source_id,
278            table_type: Set(table_type.into()),
279            belongs_to_job_id: Set(pb_table.job_id.map(|x| x as _)),
280            columns: Set(pb_table.columns.into()),
281            pk: Set(pb_table.pk.into()),
282            distribution_key: Set(pb_table.distribution_key.into()),
283            stream_key: Set(pb_table.stream_key.into()),
284            append_only: Set(pb_table.append_only),
285            fragment_id,
286            vnode_col_index: Set(pb_table.vnode_col_index.map(|x| x as i32)),
287            row_id_index: Set(pb_table.row_id_index.map(|x| x as i32)),
288            value_indices: Set(pb_table.value_indices.into()),
289            definition: Set(pb_table.definition),
290            handle_pk_conflict_behavior: Set(handle_pk_conflict_behavior.into()),
291            version_column_index: Set(pb_table.version_column_index.map(|x| x as i32)),
292            read_prefix_len_hint: Set(pb_table.read_prefix_len_hint as _),
293            watermark_indices: Set(pb_table.watermark_indices.into()),
294            dist_key_in_pk: Set(pb_table.dist_key_in_pk.into()),
295            dml_fragment_id,
296            cardinality: Set(pb_table.cardinality.as_ref().map(|x| x.into())),
297            cleaned_by_watermark: Set(pb_table.cleaned_by_watermark),
298            description: Set(pb_table.description),
299            version: Set(pb_table.version.as_ref().map(|v| v.into())),
300            retention_seconds: Set(pb_table.retention_seconds.map(|i| i as _)),
301            incoming_sinks: Set(pb_table.incoming_sinks.into()),
302            cdc_table_id: Set(pb_table.cdc_table_id),
303            vnode_count,
304            webhook_info: Set(pb_table.webhook_info.as_ref().map(WebhookSourceInfo::from)),
305            engine: Set(pb_table
306                .engine
307                .map(|engine| Engine::from(PbEngine::try_from(engine).expect("Invalid engine")))),
308            clean_watermark_index_in_pk: Set(pb_table.clean_watermark_index_in_pk),
309            refreshable: Set(pb_table.refreshable),
310            vector_index_info: Set(pb_table
311                .vector_index_info
312                .as_ref()
313                .map(VectorIndexInfo::from)),
314        }
315    }
316}