1use risingwave_common::catalog::OBJECT_ID_PLACEHOLDER;
16use risingwave_common::hash::VnodeCountCompat;
17use risingwave_pb::catalog::table::{OptionalAssociatedSourceId, PbEngine, PbTableType};
18use risingwave_pb::catalog::{PbHandleConflictBehavior, PbTable, PbVectorIndexInfo};
19use sea_orm::ActiveValue::Set;
20use sea_orm::NotSet;
21use sea_orm::entity::prelude::*;
22use serde::{Deserialize, Serialize};
23
24use crate::{
25 Cardinality, ColumnCatalogArray, ColumnOrderArray, FragmentId, I32Array, ObjectId, SourceId,
26 TableId, TableVersion, WebhookSourceInfo,
27};
28
29#[derive(
30 Clone, Debug, PartialEq, Hash, Copy, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize,
31)]
32#[sea_orm(rs_type = "String", db_type = "string(None)")]
33pub enum TableType {
34 #[sea_orm(string_value = "TABLE")]
35 Table,
36 #[sea_orm(string_value = "MATERIALIZED_VIEW")]
37 MaterializedView,
38 #[sea_orm(string_value = "INDEX")]
39 Index,
40 #[sea_orm(string_value = "INTERNAL")]
41 Internal,
42}
43
44impl From<TableType> for PbTableType {
45 fn from(table_type: TableType) -> Self {
46 match table_type {
47 TableType::Table => Self::Table,
48 TableType::MaterializedView => Self::MaterializedView,
49 TableType::Index => Self::Index,
50 TableType::Internal => Self::Internal,
51 }
52 }
53}
54
55impl From<PbTableType> for TableType {
56 fn from(table_type: PbTableType) -> Self {
57 match table_type {
58 PbTableType::Table => Self::Table,
59 PbTableType::MaterializedView => Self::MaterializedView,
60 PbTableType::Index => Self::Index,
61 PbTableType::Internal => Self::Internal,
62 PbTableType::Unspecified => unreachable!("Unspecified table type"),
63 }
64 }
65}
66
67#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
68#[sea_orm(rs_type = "String", db_type = "string(None)")]
69pub enum HandleConflictBehavior {
70 #[sea_orm(string_value = "OVERWRITE")]
71 Overwrite,
72 #[sea_orm(string_value = "IGNORE")]
73 Ignore,
74 #[sea_orm(string_value = "NO_CHECK")]
75 NoCheck,
76 #[sea_orm(string_value = "DO_UPDATE_IF_NOT_NULL")]
77 DoUpdateIfNotNull,
78}
79
80impl From<HandleConflictBehavior> for PbHandleConflictBehavior {
81 fn from(handle_conflict_behavior: HandleConflictBehavior) -> Self {
82 match handle_conflict_behavior {
83 HandleConflictBehavior::Overwrite => Self::Overwrite,
84 HandleConflictBehavior::Ignore => Self::Ignore,
85 HandleConflictBehavior::NoCheck => Self::NoCheck,
86 HandleConflictBehavior::DoUpdateIfNotNull => Self::DoUpdateIfNotNull,
87 }
88 }
89}
90
91impl From<PbHandleConflictBehavior> for HandleConflictBehavior {
92 fn from(handle_conflict_behavior: PbHandleConflictBehavior) -> Self {
93 match handle_conflict_behavior {
94 PbHandleConflictBehavior::Overwrite => Self::Overwrite,
95 PbHandleConflictBehavior::Ignore => Self::Ignore,
96 PbHandleConflictBehavior::NoCheck => Self::NoCheck,
97 PbHandleConflictBehavior::DoUpdateIfNotNull => Self::DoUpdateIfNotNull,
98 PbHandleConflictBehavior::Unspecified => {
99 unreachable!("Unspecified handle conflict behavior")
100 }
101 }
102 }
103}
104
105#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
106#[sea_orm(rs_type = "String", db_type = "string(None)")]
107pub enum Engine {
108 #[sea_orm(string_value = "HUMMOCK")]
109 Hummock,
110 #[sea_orm(string_value = "ICEBERG")]
111 Iceberg,
112}
113
114impl From<Engine> for PbEngine {
115 fn from(engine: Engine) -> Self {
116 match engine {
117 Engine::Hummock => Self::Hummock,
118 Engine::Iceberg => Self::Iceberg,
119 }
120 }
121}
122
123impl From<PbEngine> for Engine {
124 fn from(engine: PbEngine) -> Self {
125 match engine {
126 PbEngine::Hummock => Self::Hummock,
127 PbEngine::Iceberg => Self::Iceberg,
128 PbEngine::Unspecified => Self::Hummock,
129 }
130 }
131}
132
133crate::derive_from_blob!(VectorIndexInfo, PbVectorIndexInfo);
134
135#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
136#[sea_orm(table_name = "table")]
137pub struct Model {
138 #[sea_orm(primary_key, auto_increment = false)]
139 pub table_id: TableId,
140 pub name: String,
141 pub optional_associated_source_id: Option<SourceId>,
142 pub table_type: TableType,
143 pub belongs_to_job_id: Option<ObjectId>,
144 pub columns: ColumnCatalogArray,
145 pub pk: ColumnOrderArray,
146 pub distribution_key: I32Array,
147 pub stream_key: I32Array,
148 pub append_only: bool,
149 pub fragment_id: Option<FragmentId>,
150 pub vnode_col_index: Option<i32>,
151 pub row_id_index: Option<i32>,
152 pub value_indices: I32Array,
153 pub definition: String,
154 pub handle_pk_conflict_behavior: HandleConflictBehavior,
155 pub version_column_index: Option<i32>,
156 pub read_prefix_len_hint: i32,
157 pub watermark_indices: I32Array,
158 pub dist_key_in_pk: I32Array,
159 pub dml_fragment_id: Option<FragmentId>,
160 pub cardinality: Option<Cardinality>,
161 pub cleaned_by_watermark: bool,
162 pub description: Option<String>,
163 pub version: Option<TableVersion>,
164 pub retention_seconds: Option<i32>,
165 pub incoming_sinks: I32Array,
166 pub cdc_table_id: Option<String>,
167 pub vnode_count: i32,
168 pub webhook_info: Option<WebhookSourceInfo>,
169 pub engine: Option<Engine>,
170 pub clean_watermark_index_in_pk: Option<i32>,
171 pub refreshable: bool,
172 pub vector_index_info: Option<VectorIndexInfo>,
173}
174
175#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
176pub enum Relation {
177 #[sea_orm(
178 belongs_to = "super::fragment::Entity",
179 from = "Column::DmlFragmentId",
180 to = "super::fragment::Column::FragmentId",
181 on_update = "NoAction",
182 on_delete = "NoAction"
183 )]
184 Fragment2,
185 #[sea_orm(
186 belongs_to = "super::fragment::Entity",
187 from = "Column::FragmentId",
188 to = "super::fragment::Column::FragmentId",
189 on_update = "NoAction",
190 on_delete = "NoAction"
191 )]
192 Fragment1,
193 #[sea_orm(
194 belongs_to = "super::object::Entity",
195 from = "Column::BelongsToJobId",
196 to = "super::object::Column::Oid",
197 on_update = "NoAction",
198 on_delete = "Cascade"
199 )]
200 Object2,
201 #[sea_orm(
202 belongs_to = "super::object::Entity",
203 from = "Column::TableId",
204 to = "super::object::Column::Oid",
205 on_update = "NoAction",
206 on_delete = "Cascade"
207 )]
208 Object1,
209 #[sea_orm(
210 belongs_to = "super::source::Entity",
211 from = "Column::OptionalAssociatedSourceId",
212 to = "super::source::Column::SourceId",
213 on_update = "NoAction",
214 on_delete = "NoAction"
215 )]
216 Source,
217
218 #[sea_orm(
220 belongs_to = "super::object_dependency::Entity",
221 from = "Column::TableId",
222 to = "super::object_dependency::Column::UsedBy",
223 on_update = "NoAction",
224 on_delete = "Cascade"
225 )]
226 ObjectDependency,
227}
228
229impl Related<super::object::Entity> for Entity {
230 fn to() -> RelationDef {
231 Relation::Object1.def()
232 }
233}
234
235impl Related<super::source::Entity> for Entity {
236 fn to() -> RelationDef {
237 Relation::Source.def()
238 }
239}
240
241impl ActiveModelBehavior for ActiveModel {}
242
243impl From<PbTable> for ActiveModel {
244 fn from(pb_table: PbTable) -> Self {
245 let table_type = pb_table.table_type();
246 let handle_pk_conflict_behavior = pb_table.handle_pk_conflict_behavior();
247
248 let vnode_count = pb_table
252 .vnode_count_inner()
253 .value_opt()
254 .map(|v| v as _)
255 .map_or(NotSet, Set);
256 let fragment_id = if pb_table.fragment_id == OBJECT_ID_PLACEHOLDER {
257 NotSet
258 } else {
259 Set(Some(pb_table.fragment_id as FragmentId))
260 };
261 let dml_fragment_id = pb_table
262 .dml_fragment_id
263 .map(|x| Set(Some(x as FragmentId)))
264 .unwrap_or_default();
265 let optional_associated_source_id =
266 if let Some(OptionalAssociatedSourceId::AssociatedSourceId(src_id)) =
267 pb_table.optional_associated_source_id
268 {
269 Set(Some(src_id as SourceId))
270 } else {
271 NotSet
272 };
273
274 Self {
275 table_id: Set(pb_table.id as _),
276 name: Set(pb_table.name),
277 optional_associated_source_id,
278 table_type: Set(table_type.into()),
279 belongs_to_job_id: Set(pb_table.job_id.map(|x| x as _)),
280 columns: Set(pb_table.columns.into()),
281 pk: Set(pb_table.pk.into()),
282 distribution_key: Set(pb_table.distribution_key.into()),
283 stream_key: Set(pb_table.stream_key.into()),
284 append_only: Set(pb_table.append_only),
285 fragment_id,
286 vnode_col_index: Set(pb_table.vnode_col_index.map(|x| x as i32)),
287 row_id_index: Set(pb_table.row_id_index.map(|x| x as i32)),
288 value_indices: Set(pb_table.value_indices.into()),
289 definition: Set(pb_table.definition),
290 handle_pk_conflict_behavior: Set(handle_pk_conflict_behavior.into()),
291 version_column_index: Set(pb_table.version_column_index.map(|x| x as i32)),
292 read_prefix_len_hint: Set(pb_table.read_prefix_len_hint as _),
293 watermark_indices: Set(pb_table.watermark_indices.into()),
294 dist_key_in_pk: Set(pb_table.dist_key_in_pk.into()),
295 dml_fragment_id,
296 cardinality: Set(pb_table.cardinality.as_ref().map(|x| x.into())),
297 cleaned_by_watermark: Set(pb_table.cleaned_by_watermark),
298 description: Set(pb_table.description),
299 version: Set(pb_table.version.as_ref().map(|v| v.into())),
300 retention_seconds: Set(pb_table.retention_seconds.map(|i| i as _)),
301 incoming_sinks: Set(pb_table.incoming_sinks.into()),
302 cdc_table_id: Set(pb_table.cdc_table_id),
303 vnode_count,
304 webhook_info: Set(pb_table.webhook_info.as_ref().map(WebhookSourceInfo::from)),
305 engine: Set(pb_table
306 .engine
307 .map(|engine| Engine::from(PbEngine::try_from(engine).expect("Invalid engine")))),
308 clean_watermark_index_in_pk: Set(pb_table.clean_watermark_index_in_pk),
309 refreshable: Set(pb_table.refreshable),
310 vector_index_info: Set(pb_table
311 .vector_index_info
312 .as_ref()
313 .map(VectorIndexInfo::from)),
314 }
315 }
316}