risingwave_meta_model/
lib.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16
17pub use risingwave_common::id::*;
18use risingwave_pb::catalog::{PbCreateType, PbStreamJobStatus};
19use risingwave_pb::meta::table_fragments::PbState as PbStreamJobState;
20use risingwave_pb::secret::PbSecretRef;
21use risingwave_pb::stream_plan::{PbDispatcherType, PbStreamNode};
22use sea_orm::entity::prelude::*;
23use sea_orm::{DeriveActiveEnum, EnumIter, FromJsonQueryResult, Value};
24use serde::{Deserialize, Serialize};
25
26pub mod prelude;
27
28pub mod catalog_version;
29pub mod cdc_table_snapshot_split;
30pub mod cluster;
31pub mod compaction_config;
32pub mod compaction_status;
33pub mod compaction_task;
34pub mod connection;
35pub mod database;
36pub mod exactly_once_iceberg_sink;
37pub mod fragment;
38pub mod fragment_relation;
39pub mod fragment_splits;
40pub mod function;
41pub mod hummock_epoch_to_version;
42pub mod hummock_gc_history;
43pub mod hummock_pinned_snapshot;
44pub mod hummock_pinned_version;
45pub mod hummock_sequence;
46pub mod hummock_sstable_info;
47pub mod hummock_time_travel_delta;
48pub mod hummock_time_travel_version;
49pub mod hummock_version_delta;
50pub mod hummock_version_stats;
51pub mod iceberg_namespace_properties;
52pub mod iceberg_tables;
53pub mod index;
54pub mod object;
55pub mod object_dependency;
56pub mod pending_sink_state;
57pub mod refresh_job;
58pub mod schema;
59pub mod secret;
60pub mod serde_seaql_migration;
61pub mod session_parameter;
62pub mod sink;
63pub mod source;
64pub mod streaming_job;
65pub mod subscription;
66pub mod system_parameter;
67pub mod table;
68pub mod user;
69pub mod user_default_privilege;
70pub mod user_privilege;
71pub mod view;
72pub mod worker;
73pub mod worker_property;
74
75pub type TransactionId = i32;
76
77pub type UserId = i32;
78pub type PrivilegeId = i32;
79pub type DefaultPrivilegeId = i32;
80
81pub type HummockVersionId = i64;
82pub type Epoch = i64;
83pub type CompactionGroupId = i64;
84pub type CompactionTaskId = i64;
85pub type HummockSstableObjectId = i64;
86
87#[derive(Clone, Copy, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
88#[sea_orm(rs_type = "String", db_type = "string(None)")]
89pub enum JobStatus {
90    #[sea_orm(string_value = "INITIAL")]
91    Initial,
92    #[sea_orm(string_value = "CREATING")]
93    Creating,
94    #[sea_orm(string_value = "CREATED")]
95    Created,
96}
97
98impl From<JobStatus> for PbStreamJobStatus {
99    fn from(job_status: JobStatus) -> Self {
100        match job_status {
101            JobStatus::Initial => Self::Unspecified,
102            JobStatus::Creating => Self::Creating,
103            JobStatus::Created => Self::Created,
104        }
105    }
106}
107
108// todo: deprecate job status in catalog and unify with this one.
109impl From<JobStatus> for PbStreamJobState {
110    fn from(status: JobStatus) -> Self {
111        match status {
112            JobStatus::Initial => PbStreamJobState::Initial,
113            JobStatus::Creating => PbStreamJobState::Creating,
114            JobStatus::Created => PbStreamJobState::Created,
115        }
116    }
117}
118
119#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
120#[sea_orm(rs_type = "String", db_type = "string(None)")]
121pub enum CreateType {
122    #[sea_orm(string_value = "BACKGROUND")]
123    Background,
124    #[sea_orm(string_value = "FOREGROUND")]
125    Foreground,
126}
127
128impl From<CreateType> for PbCreateType {
129    fn from(create_type: CreateType) -> Self {
130        match create_type {
131            CreateType::Background => Self::Background,
132            CreateType::Foreground => Self::Foreground,
133        }
134    }
135}
136
137impl From<PbCreateType> for CreateType {
138    fn from(create_type: PbCreateType) -> Self {
139        match create_type {
140            PbCreateType::Background => Self::Background,
141            PbCreateType::Foreground => Self::Foreground,
142            PbCreateType::Unspecified => unreachable!("Unspecified create type"),
143        }
144    }
145}
146
147impl CreateType {
148    pub fn as_str(&self) -> &'static str {
149        match self {
150            CreateType::Background => "BACKGROUND",
151            CreateType::Foreground => "FOREGROUND",
152        }
153    }
154}
155
156/// Defines struct with a single pb field that derives `FromJsonQueryResult`, it will helps to map json value stored in database to Pb struct.
157macro_rules! derive_from_json_struct {
158    ($struct_name:ident, $field_type:ty) => {
159        #[derive(Clone, Debug, PartialEq, FromJsonQueryResult, Serialize, Deserialize, Default)]
160        pub struct $struct_name(pub $field_type);
161        impl Eq for $struct_name {}
162        impl From<$field_type> for $struct_name {
163            fn from(value: $field_type) -> Self {
164                Self(value)
165            }
166        }
167
168        impl $struct_name {
169            pub fn into_inner(self) -> $field_type {
170                self.0
171            }
172
173            pub fn inner_ref(&self) -> &$field_type {
174                &self.0
175            }
176        }
177    };
178}
179
180/// Defines struct with a byte array that derives `DeriveValueType`, it will helps to map blob stored in database to Pb struct.
181macro_rules! derive_from_blob {
182    ($struct_name:ident, $field_type:ty) => {
183        #[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, DeriveValueType)]
184        pub struct $struct_name(#[sea_orm] Vec<u8>);
185
186        impl $struct_name {
187            pub fn to_protobuf(&self) -> $field_type {
188                prost::Message::decode(self.0.as_slice()).unwrap()
189            }
190
191            fn from_protobuf(val: &$field_type) -> Self {
192                Self(prost::Message::encode_to_vec(val))
193            }
194        }
195
196        impl sea_orm::sea_query::Nullable for $struct_name {
197            fn null() -> Value {
198                Value::Bytes(None)
199            }
200        }
201
202        impl From<&$field_type> for $struct_name {
203            fn from(value: &$field_type) -> Self {
204                Self::from_protobuf(value)
205            }
206        }
207
208        impl std::fmt::Debug for $struct_name {
209            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
210                self.to_protobuf().fmt(f)
211            }
212        }
213
214        impl Default for $struct_name {
215            fn default() -> Self {
216                Self::from_protobuf(&<$field_type>::default())
217            }
218        }
219    };
220}
221
222/// Defines struct with a byte array that derives `DeriveValueType`, it will helps to map blob stored in database to Pb struct array.
223macro_rules! derive_array_from_blob {
224    ($struct_name:ident, $field_type:ty, $field_array_name:ident) => {
225        #[derive(Clone, PartialEq, Eq, DeriveValueType, serde::Deserialize, serde::Serialize)]
226        pub struct $struct_name(#[sea_orm] Vec<u8>);
227
228        #[derive(Clone, PartialEq, ::prost::Message)]
229        pub struct $field_array_name {
230            #[prost(message, repeated, tag = "1")]
231            inner: Vec<$field_type>,
232        }
233        impl Eq for $field_array_name {}
234
235        impl $struct_name {
236            pub fn to_protobuf(&self) -> Vec<$field_type> {
237                let data: $field_array_name = prost::Message::decode(self.0.as_slice()).unwrap();
238                data.inner
239            }
240
241            fn from_protobuf(val: Vec<$field_type>) -> Self {
242                Self(prost::Message::encode_to_vec(&$field_array_name {
243                    inner: val,
244                }))
245            }
246        }
247
248        impl From<Vec<$field_type>> for $struct_name {
249            fn from(value: Vec<$field_type>) -> Self {
250                Self::from_protobuf(value)
251            }
252        }
253
254        impl std::fmt::Debug for $struct_name {
255            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
256                self.to_protobuf().fmt(f)
257            }
258        }
259
260        impl Default for $struct_name {
261            fn default() -> Self {
262                Self(vec![])
263            }
264        }
265
266        impl sea_orm::sea_query::Nullable for $struct_name {
267            fn null() -> Value {
268                Value::Bytes(None)
269            }
270        }
271    };
272}
273
274macro_rules! derive_btreemap_from_blob {
275    ($struct_name:ident, $key_type:ty, $value_type:ty, $field_type:ident) => {
276        #[derive(Clone, PartialEq, Eq, DeriveValueType, serde::Deserialize, serde::Serialize)]
277        pub struct $struct_name(#[sea_orm] Vec<u8>);
278
279        #[derive(Clone, PartialEq, ::prost::Message)]
280        pub struct $field_type {
281            #[prost(btree_map = "string, message")]
282            inner: BTreeMap<$key_type, $value_type>,
283        }
284        impl Eq for $field_type {}
285
286        impl $struct_name {
287            pub fn to_protobuf(&self) -> BTreeMap<$key_type, $value_type> {
288                let data: $field_type = prost::Message::decode(self.0.as_slice()).unwrap();
289                data.inner
290            }
291
292            fn from_protobuf(val: BTreeMap<$key_type, $value_type>) -> Self {
293                Self(prost::Message::encode_to_vec(&$field_type { inner: val }))
294            }
295        }
296
297        impl From<BTreeMap<$key_type, $value_type>> for $struct_name {
298            fn from(value: BTreeMap<$key_type, $value_type>) -> Self {
299                Self::from_protobuf(value)
300            }
301        }
302
303        impl std::fmt::Debug for $struct_name {
304            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
305                self.to_protobuf().fmt(f)
306            }
307        }
308
309        impl Default for $struct_name {
310            fn default() -> Self {
311                Self(vec![])
312            }
313        }
314
315        impl sea_orm::sea_query::Nullable for $struct_name {
316            fn null() -> Value {
317                Value::Bytes(None)
318            }
319        }
320    };
321}
322
323pub(crate) use {derive_array_from_blob, derive_from_blob};
324
325derive_from_json_struct!(TableIdArray, Vec<TableId>);
326
327derive_from_json_struct!(I32Array, Vec<i32>);
328
329impl From<Vec<u32>> for I32Array {
330    fn from(value: Vec<u32>) -> Self {
331        Self(value.into_iter().map(|id| id as _).collect())
332    }
333}
334
335impl I32Array {
336    pub fn into_u32_array(self) -> Vec<u32> {
337        self.0.into_iter().map(|id| id as _).collect()
338    }
339}
340
341derive_btreemap_from_blob!(SecretRef, String, PbSecretRef, PbSecretRefMap);
342
343derive_from_blob!(StreamNode, PbStreamNode);
344derive_from_blob!(DataType, risingwave_pb::data::PbDataType);
345derive_array_from_blob!(
346    DataTypeArray,
347    risingwave_pb::data::PbDataType,
348    PbDataTypeArray
349);
350derive_array_from_blob!(
351    FieldArray,
352    risingwave_pb::plan_common::PbField,
353    PbFieldArray
354);
355derive_from_json_struct!(Property, BTreeMap<String, String>);
356derive_from_blob!(ColumnCatalog, risingwave_pb::plan_common::PbColumnCatalog);
357derive_array_from_blob!(
358    ColumnCatalogArray,
359    risingwave_pb::plan_common::PbColumnCatalog,
360    PbColumnCatalogArray
361);
362derive_from_blob!(StreamSourceInfo, risingwave_pb::catalog::PbStreamSourceInfo);
363derive_from_blob!(
364    WebhookSourceInfo,
365    risingwave_pb::catalog::PbWebhookSourceInfo
366);
367derive_from_blob!(WatermarkDesc, risingwave_pb::catalog::PbWatermarkDesc);
368derive_array_from_blob!(
369    WatermarkDescArray,
370    risingwave_pb::catalog::PbWatermarkDesc,
371    PbWatermarkDescArray
372);
373derive_array_from_blob!(
374    ExprNodeArray,
375    risingwave_pb::expr::PbExprNode,
376    PbExprNodeArray
377);
378derive_array_from_blob!(
379    ColumnOrderArray,
380    risingwave_pb::common::PbColumnOrder,
381    PbColumnOrderArray
382);
383derive_array_from_blob!(
384    IndexColumnPropertiesArray,
385    risingwave_pb::catalog::PbIndexColumnProperties,
386    PbIndexColumnPropertiesArray
387);
388derive_from_blob!(SinkFormatDesc, risingwave_pb::catalog::PbSinkFormatDesc);
389derive_from_blob!(Cardinality, risingwave_pb::plan_common::PbCardinality);
390derive_from_blob!(TableVersion, risingwave_pb::catalog::table::PbTableVersion);
391derive_from_blob!(
392    PrivateLinkService,
393    risingwave_pb::catalog::connection::PbPrivateLinkService
394);
395derive_from_blob!(ConnectionParams, risingwave_pb::catalog::ConnectionParams);
396derive_from_blob!(AuthInfo, risingwave_pb::user::PbAuthInfo);
397
398derive_from_blob!(ConnectorSplits, risingwave_pb::source::ConnectorSplits);
399derive_from_blob!(VnodeBitmap, risingwave_pb::common::Buffer);
400derive_from_blob!(ActorMapping, risingwave_pb::stream_plan::PbActorMapping);
401derive_from_blob!(ExprContext, risingwave_pb::plan_common::PbExprContext);
402derive_from_blob!(
403    SourceRefreshMode,
404    risingwave_pb::plan_common::PbSourceRefreshMode
405);
406
407derive_array_from_blob!(
408    TypePairArray,
409    risingwave_pb::stream_plan::dispatch_output_mapping::TypePair,
410    PbTypePairArray
411);
412
413derive_array_from_blob!(
414    HummockVersionDeltaArray,
415    risingwave_pb::hummock::PbHummockVersionDelta,
416    PbHummockVersionDeltaArray
417);
418
419#[derive(Clone, Debug, PartialEq, FromJsonQueryResult, Serialize, Deserialize)]
420pub enum StreamingParallelism {
421    Adaptive,
422    Fixed(usize),
423    Custom,
424}
425
426impl Eq for StreamingParallelism {}
427
428#[derive(
429    Hash, Copy, Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize,
430)]
431#[sea_orm(rs_type = "String", db_type = "string(None)")]
432pub enum DispatcherType {
433    #[sea_orm(string_value = "HASH")]
434    Hash,
435    #[sea_orm(string_value = "BROADCAST")]
436    Broadcast,
437    #[sea_orm(string_value = "SIMPLE")]
438    Simple,
439    #[sea_orm(string_value = "NO_SHUFFLE")]
440    NoShuffle,
441}
442
443impl From<PbDispatcherType> for DispatcherType {
444    fn from(val: PbDispatcherType) -> Self {
445        match val {
446            PbDispatcherType::Unspecified => unreachable!(),
447            PbDispatcherType::Hash => DispatcherType::Hash,
448            PbDispatcherType::Broadcast => DispatcherType::Broadcast,
449            PbDispatcherType::Simple => DispatcherType::Simple,
450            PbDispatcherType::NoShuffle => DispatcherType::NoShuffle,
451        }
452    }
453}
454
455impl From<DispatcherType> for PbDispatcherType {
456    fn from(val: DispatcherType) -> Self {
457        match val {
458            DispatcherType::Hash => PbDispatcherType::Hash,
459            DispatcherType::Broadcast => PbDispatcherType::Broadcast,
460            DispatcherType::Simple => PbDispatcherType::Simple,
461            DispatcherType::NoShuffle => PbDispatcherType::NoShuffle,
462        }
463    }
464}