risingwave_meta_model/
lib.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16
17use risingwave_pb::catalog::{PbCreateType, PbStreamJobStatus};
18use risingwave_pb::meta::table_fragments::PbState as PbStreamJobState;
19use risingwave_pb::secret::PbSecretRef;
20use risingwave_pb::stream_plan::{PbDispatcherType, PbStreamNode};
21use sea_orm::entity::prelude::*;
22use sea_orm::{DeriveActiveEnum, EnumIter, FromJsonQueryResult};
23use serde::{Deserialize, Serialize};
24
25pub mod prelude;
26
27pub mod actor;
28pub mod catalog_version;
29pub mod cluster;
30pub mod compaction_config;
31pub mod compaction_status;
32pub mod compaction_task;
33pub mod connection;
34pub mod database;
35pub mod exactly_once_iceberg_sink;
36pub mod fragment;
37pub mod fragment_relation;
38pub mod function;
39pub mod hummock_epoch_to_version;
40pub mod hummock_gc_history;
41pub mod hummock_pinned_snapshot;
42pub mod hummock_pinned_version;
43pub mod hummock_sequence;
44pub mod hummock_sstable_info;
45pub mod hummock_time_travel_delta;
46pub mod hummock_time_travel_version;
47pub mod hummock_version_delta;
48pub mod hummock_version_stats;
49pub mod iceberg_namespace_properties;
50pub mod iceberg_tables;
51pub mod index;
52pub mod object;
53pub mod object_dependency;
54pub mod schema;
55pub mod secret;
56pub mod serde_seaql_migration;
57pub mod session_parameter;
58pub mod sink;
59pub mod source;
60pub mod streaming_job;
61pub mod subscription;
62pub mod system_parameter;
63pub mod table;
64pub mod user;
65pub mod user_default_privilege;
66pub mod user_privilege;
67pub mod view;
68pub mod worker;
69pub mod worker_property;
70
71pub type WorkerId = i32;
72
73pub type TransactionId = i32;
74
75pub type ObjectId = i32;
76pub type DatabaseId = ObjectId;
77pub type SchemaId = ObjectId;
78pub type TableId = ObjectId;
79pub type SourceId = ObjectId;
80pub type SinkId = ObjectId;
81pub type SubscriptionId = ObjectId;
82pub type IndexId = ObjectId;
83pub type ViewId = ObjectId;
84pub type FunctionId = ObjectId;
85pub type ConnectionId = ObjectId;
86pub type SecretId = ObjectId;
87pub type UserId = i32;
88pub type PrivilegeId = i32;
89pub type DefaultPrivilegeId = i32;
90
91pub type HummockVersionId = i64;
92pub type Epoch = i64;
93pub type CompactionGroupId = i64;
94pub type CompactionTaskId = i64;
95pub type HummockSstableObjectId = i64;
96
97pub type FragmentId = i32;
98pub type ActorId = i32;
99
100#[derive(Clone, Copy, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
101#[sea_orm(rs_type = "String", db_type = "string(None)")]
102pub enum JobStatus {
103    #[sea_orm(string_value = "INITIAL")]
104    Initial,
105    #[sea_orm(string_value = "CREATING")]
106    Creating,
107    #[sea_orm(string_value = "CREATED")]
108    Created,
109}
110
111impl From<JobStatus> for PbStreamJobStatus {
112    fn from(job_status: JobStatus) -> Self {
113        match job_status {
114            JobStatus::Initial => Self::Unspecified,
115            JobStatus::Creating => Self::Creating,
116            JobStatus::Created => Self::Created,
117        }
118    }
119}
120
121// todo: deprecate job status in catalog and unify with this one.
122impl From<JobStatus> for PbStreamJobState {
123    fn from(status: JobStatus) -> Self {
124        match status {
125            JobStatus::Initial => PbStreamJobState::Initial,
126            JobStatus::Creating => PbStreamJobState::Creating,
127            JobStatus::Created => PbStreamJobState::Created,
128        }
129    }
130}
131
132#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
133#[sea_orm(rs_type = "String", db_type = "string(None)")]
134pub enum CreateType {
135    #[sea_orm(string_value = "BACKGROUND")]
136    Background,
137    #[sea_orm(string_value = "FOREGROUND")]
138    Foreground,
139}
140
141impl From<CreateType> for PbCreateType {
142    fn from(create_type: CreateType) -> Self {
143        match create_type {
144            CreateType::Background => Self::Background,
145            CreateType::Foreground => Self::Foreground,
146        }
147    }
148}
149
150impl From<PbCreateType> for CreateType {
151    fn from(create_type: PbCreateType) -> Self {
152        match create_type {
153            PbCreateType::Background => Self::Background,
154            PbCreateType::Foreground => Self::Foreground,
155            PbCreateType::Unspecified => unreachable!("Unspecified create type"),
156        }
157    }
158}
159
160/// Defines struct with a single pb field that derives `FromJsonQueryResult`, it will helps to map json value stored in database to Pb struct.
161macro_rules! derive_from_json_struct {
162    ($struct_name:ident, $field_type:ty) => {
163        #[derive(Clone, Debug, PartialEq, FromJsonQueryResult, Serialize, Deserialize, Default)]
164        pub struct $struct_name(pub $field_type);
165        impl Eq for $struct_name {}
166        impl From<$field_type> for $struct_name {
167            fn from(value: $field_type) -> Self {
168                Self(value)
169            }
170        }
171
172        impl $struct_name {
173            pub fn into_inner(self) -> $field_type {
174                self.0
175            }
176
177            pub fn inner_ref(&self) -> &$field_type {
178                &self.0
179            }
180        }
181    };
182}
183
184/// Defines struct with a byte array that derives `DeriveValueType`, it will helps to map blob stored in database to Pb struct.
185macro_rules! derive_from_blob {
186    ($struct_name:ident, $field_type:ty) => {
187        #[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, DeriveValueType)]
188        pub struct $struct_name(#[sea_orm] Vec<u8>);
189
190        impl $struct_name {
191            pub fn to_protobuf(&self) -> $field_type {
192                prost::Message::decode(self.0.as_slice()).unwrap()
193            }
194
195            fn from_protobuf(val: &$field_type) -> Self {
196                Self(prost::Message::encode_to_vec(val))
197            }
198        }
199
200        impl sea_orm::sea_query::Nullable for $struct_name {
201            fn null() -> Value {
202                Value::Bytes(None)
203            }
204        }
205
206        impl From<&$field_type> for $struct_name {
207            fn from(value: &$field_type) -> Self {
208                Self::from_protobuf(value)
209            }
210        }
211
212        impl std::fmt::Debug for $struct_name {
213            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
214                self.to_protobuf().fmt(f)
215            }
216        }
217
218        impl Default for $struct_name {
219            fn default() -> Self {
220                Self::from_protobuf(&<$field_type>::default())
221            }
222        }
223    };
224}
225
226/// Defines struct with a byte array that derives `DeriveValueType`, it will helps to map blob stored in database to Pb struct array.
227macro_rules! derive_array_from_blob {
228    ($struct_name:ident, $field_type:ty, $field_array_name:ident) => {
229        #[derive(Clone, PartialEq, Eq, DeriveValueType, serde::Deserialize, serde::Serialize)]
230        pub struct $struct_name(#[sea_orm] Vec<u8>);
231
232        #[derive(Clone, PartialEq, ::prost::Message)]
233        pub struct $field_array_name {
234            #[prost(message, repeated, tag = "1")]
235            inner: Vec<$field_type>,
236        }
237        impl Eq for $field_array_name {}
238
239        impl $struct_name {
240            pub fn to_protobuf(&self) -> Vec<$field_type> {
241                let data: $field_array_name = prost::Message::decode(self.0.as_slice()).unwrap();
242                data.inner
243            }
244
245            fn from_protobuf(val: Vec<$field_type>) -> Self {
246                Self(prost::Message::encode_to_vec(&$field_array_name {
247                    inner: val,
248                }))
249            }
250        }
251
252        impl From<Vec<$field_type>> for $struct_name {
253            fn from(value: Vec<$field_type>) -> Self {
254                Self::from_protobuf(value)
255            }
256        }
257
258        impl std::fmt::Debug for $struct_name {
259            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
260                self.to_protobuf().fmt(f)
261            }
262        }
263
264        impl Default for $struct_name {
265            fn default() -> Self {
266                Self(vec![])
267            }
268        }
269
270        impl sea_orm::sea_query::Nullable for $struct_name {
271            fn null() -> Value {
272                Value::Bytes(None)
273            }
274        }
275    };
276}
277
278macro_rules! derive_btreemap_from_blob {
279    ($struct_name:ident, $key_type:ty, $value_type:ty, $field_type:ident) => {
280        #[derive(Clone, PartialEq, Eq, DeriveValueType, serde::Deserialize, serde::Serialize)]
281        pub struct $struct_name(#[sea_orm] Vec<u8>);
282
283        #[derive(Clone, PartialEq, ::prost::Message)]
284        pub struct $field_type {
285            #[prost(btree_map = "string, message")]
286            inner: BTreeMap<$key_type, $value_type>,
287        }
288        impl Eq for $field_type {}
289
290        impl $struct_name {
291            pub fn to_protobuf(&self) -> BTreeMap<$key_type, $value_type> {
292                let data: $field_type = prost::Message::decode(self.0.as_slice()).unwrap();
293                data.inner
294            }
295
296            fn from_protobuf(val: BTreeMap<$key_type, $value_type>) -> Self {
297                Self(prost::Message::encode_to_vec(&$field_type { inner: val }))
298            }
299        }
300
301        impl From<BTreeMap<$key_type, $value_type>> for $struct_name {
302            fn from(value: BTreeMap<$key_type, $value_type>) -> Self {
303                Self::from_protobuf(value)
304            }
305        }
306
307        impl std::fmt::Debug for $struct_name {
308            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
309                self.to_protobuf().fmt(f)
310            }
311        }
312
313        impl Default for $struct_name {
314            fn default() -> Self {
315                Self(vec![])
316            }
317        }
318
319        impl sea_orm::sea_query::Nullable for $struct_name {
320            fn null() -> Value {
321                Value::Bytes(None)
322            }
323        }
324    };
325}
326
327pub(crate) use {derive_array_from_blob, derive_from_blob};
328
329derive_from_json_struct!(I32Array, Vec<i32>);
330
331impl From<Vec<u32>> for I32Array {
332    fn from(value: Vec<u32>) -> Self {
333        Self(value.into_iter().map(|id| id as _).collect())
334    }
335}
336
337impl I32Array {
338    pub fn into_u32_array(self) -> Vec<u32> {
339        self.0.into_iter().map(|id| id as _).collect()
340    }
341}
342
343derive_from_json_struct!(ActorUpstreamActors, BTreeMap<FragmentId, Vec<ActorId>>);
344
345impl From<BTreeMap<u32, Vec<u32>>> for ActorUpstreamActors {
346    fn from(val: BTreeMap<u32, Vec<u32>>) -> Self {
347        let mut map = BTreeMap::new();
348        for (k, v) in val {
349            map.insert(k as _, v.into_iter().map(|a| a as _).collect());
350        }
351        Self(map)
352    }
353}
354
355derive_btreemap_from_blob!(SecretRef, String, PbSecretRef, PbSecretRefMap);
356
357derive_from_blob!(StreamNode, PbStreamNode);
358derive_from_blob!(DataType, risingwave_pb::data::PbDataType);
359derive_array_from_blob!(
360    DataTypeArray,
361    risingwave_pb::data::PbDataType,
362    PbDataTypeArray
363);
364derive_array_from_blob!(
365    FieldArray,
366    risingwave_pb::plan_common::PbField,
367    PbFieldArray
368);
369derive_from_json_struct!(Property, BTreeMap<String, String>);
370derive_from_blob!(ColumnCatalog, risingwave_pb::plan_common::PbColumnCatalog);
371derive_array_from_blob!(
372    ColumnCatalogArray,
373    risingwave_pb::plan_common::PbColumnCatalog,
374    PbColumnCatalogArray
375);
376derive_from_blob!(StreamSourceInfo, risingwave_pb::catalog::PbStreamSourceInfo);
377derive_from_blob!(
378    WebhookSourceInfo,
379    risingwave_pb::catalog::PbWebhookSourceInfo
380);
381derive_from_blob!(WatermarkDesc, risingwave_pb::catalog::PbWatermarkDesc);
382derive_array_from_blob!(
383    WatermarkDescArray,
384    risingwave_pb::catalog::PbWatermarkDesc,
385    PbWatermarkDescArray
386);
387derive_array_from_blob!(
388    ExprNodeArray,
389    risingwave_pb::expr::PbExprNode,
390    PbExprNodeArray
391);
392derive_array_from_blob!(
393    ColumnOrderArray,
394    risingwave_pb::common::PbColumnOrder,
395    PbColumnOrderArray
396);
397derive_array_from_blob!(
398    IndexColumnPropertiesArray,
399    risingwave_pb::catalog::PbIndexColumnProperties,
400    PbIndexColumnPropertiesArray
401);
402derive_from_blob!(SinkFormatDesc, risingwave_pb::catalog::PbSinkFormatDesc);
403derive_from_blob!(Cardinality, risingwave_pb::plan_common::PbCardinality);
404derive_from_blob!(TableVersion, risingwave_pb::catalog::table::PbTableVersion);
405derive_from_blob!(
406    PrivateLinkService,
407    risingwave_pb::catalog::connection::PbPrivateLinkService
408);
409derive_from_blob!(ConnectionParams, risingwave_pb::catalog::ConnectionParams);
410derive_from_blob!(AuthInfo, risingwave_pb::user::PbAuthInfo);
411
412derive_from_blob!(ConnectorSplits, risingwave_pb::source::ConnectorSplits);
413derive_from_blob!(VnodeBitmap, risingwave_pb::common::Buffer);
414derive_from_blob!(ActorMapping, risingwave_pb::stream_plan::PbActorMapping);
415derive_from_blob!(ExprContext, risingwave_pb::plan_common::PbExprContext);
416
417derive_array_from_blob!(
418    TypePairArray,
419    risingwave_pb::stream_plan::dispatch_output_mapping::TypePair,
420    PbTypePairArray
421);
422
423derive_array_from_blob!(
424    HummockVersionDeltaArray,
425    risingwave_pb::hummock::PbHummockVersionDelta,
426    PbHummockVersionDeltaArray
427);
428
429#[derive(Clone, Debug, PartialEq, FromJsonQueryResult, Serialize, Deserialize)]
430pub enum StreamingParallelism {
431    Adaptive,
432    Fixed(usize),
433    Custom,
434}
435
436impl Eq for StreamingParallelism {}
437
438#[derive(
439    Hash, Copy, Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize,
440)]
441#[sea_orm(rs_type = "String", db_type = "string(None)")]
442pub enum DispatcherType {
443    #[sea_orm(string_value = "HASH")]
444    Hash,
445    #[sea_orm(string_value = "BROADCAST")]
446    Broadcast,
447    #[sea_orm(string_value = "SIMPLE")]
448    Simple,
449    #[sea_orm(string_value = "NO_SHUFFLE")]
450    NoShuffle,
451}
452
453impl From<PbDispatcherType> for DispatcherType {
454    fn from(val: PbDispatcherType) -> Self {
455        match val {
456            PbDispatcherType::Unspecified => unreachable!(),
457            PbDispatcherType::Hash => DispatcherType::Hash,
458            PbDispatcherType::Broadcast => DispatcherType::Broadcast,
459            PbDispatcherType::Simple => DispatcherType::Simple,
460            PbDispatcherType::NoShuffle => DispatcherType::NoShuffle,
461        }
462    }
463}
464
465impl From<DispatcherType> for PbDispatcherType {
466    fn from(val: DispatcherType) -> Self {
467        match val {
468            DispatcherType::Hash => PbDispatcherType::Hash,
469            DispatcherType::Broadcast => PbDispatcherType::Broadcast,
470            DispatcherType::Simple => PbDispatcherType::Simple,
471            DispatcherType::NoShuffle => PbDispatcherType::NoShuffle,
472        }
473    }
474}