risingwave_meta_model/
lib.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16
17use risingwave_pb::catalog::{PbCreateType, PbStreamJobStatus};
18use risingwave_pb::meta::table_fragments::PbState as PbStreamJobState;
19use risingwave_pb::secret::PbSecretRef;
20use risingwave_pb::stream_plan::{PbDispatcherType, PbStreamNode};
21use sea_orm::entity::prelude::*;
22use sea_orm::{DeriveActiveEnum, EnumIter, FromJsonQueryResult};
23use serde::{Deserialize, Serialize};
24
25pub mod prelude;
26
27pub mod actor;
28pub mod catalog_version;
29pub mod cluster;
30pub mod compaction_config;
31pub mod compaction_status;
32pub mod compaction_task;
33pub mod connection;
34pub mod database;
35pub mod exactly_once_iceberg_sink;
36pub mod fragment;
37pub mod fragment_relation;
38pub mod function;
39pub mod hummock_epoch_to_version;
40pub mod hummock_gc_history;
41pub mod hummock_pinned_snapshot;
42pub mod hummock_pinned_version;
43pub mod hummock_sequence;
44pub mod hummock_sstable_info;
45pub mod hummock_time_travel_delta;
46pub mod hummock_time_travel_version;
47pub mod hummock_version_delta;
48pub mod hummock_version_stats;
49pub mod iceberg_namespace_properties;
50pub mod iceberg_tables;
51pub mod index;
52pub mod object;
53pub mod object_dependency;
54pub mod schema;
55pub mod secret;
56pub mod serde_seaql_migration;
57pub mod session_parameter;
58pub mod sink;
59pub mod source;
60pub mod streaming_job;
61pub mod subscription;
62pub mod system_parameter;
63pub mod table;
64pub mod user;
65pub mod user_default_privilege;
66pub mod user_privilege;
67pub mod view;
68pub mod worker;
69pub mod worker_property;
70
71pub type WorkerId = i32;
72
73pub type TransactionId = i32;
74
75pub type ObjectId = i32;
76pub type DatabaseId = ObjectId;
77pub type SchemaId = ObjectId;
78pub type TableId = ObjectId;
79pub type SourceId = ObjectId;
80pub type SinkId = ObjectId;
81pub type SubscriptionId = ObjectId;
82pub type IndexId = ObjectId;
83pub type ViewId = ObjectId;
84pub type FunctionId = ObjectId;
85pub type ConnectionId = ObjectId;
86pub type SecretId = ObjectId;
87pub type UserId = i32;
88pub type PrivilegeId = i32;
89pub type DefaultPrivilegeId = i32;
90
91pub type HummockVersionId = i64;
92pub type Epoch = i64;
93pub type CompactionGroupId = i64;
94pub type CompactionTaskId = i64;
95pub type HummockSstableObjectId = i64;
96
97pub type FragmentId = i32;
98pub type ActorId = i32;
99
100#[derive(Clone, Copy, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
101#[sea_orm(rs_type = "String", db_type = "string(None)")]
102pub enum JobStatus {
103    #[sea_orm(string_value = "INITIAL")]
104    Initial,
105    #[sea_orm(string_value = "CREATING")]
106    Creating,
107    #[sea_orm(string_value = "CREATED")]
108    Created,
109}
110
111impl From<JobStatus> for PbStreamJobStatus {
112    fn from(job_status: JobStatus) -> Self {
113        match job_status {
114            JobStatus::Initial => Self::Unspecified,
115            JobStatus::Creating => Self::Creating,
116            JobStatus::Created => Self::Created,
117        }
118    }
119}
120
121// todo: deprecate job status in catalog and unify with this one.
122impl From<JobStatus> for PbStreamJobState {
123    fn from(status: JobStatus) -> Self {
124        match status {
125            JobStatus::Initial => PbStreamJobState::Initial,
126            JobStatus::Creating => PbStreamJobState::Creating,
127            JobStatus::Created => PbStreamJobState::Created,
128        }
129    }
130}
131
132#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
133#[sea_orm(rs_type = "String", db_type = "string(None)")]
134pub enum CreateType {
135    #[sea_orm(string_value = "BACKGROUND")]
136    Background,
137    #[sea_orm(string_value = "FOREGROUND")]
138    Foreground,
139}
140
141impl From<CreateType> for PbCreateType {
142    fn from(create_type: CreateType) -> Self {
143        match create_type {
144            CreateType::Background => Self::Background,
145            CreateType::Foreground => Self::Foreground,
146        }
147    }
148}
149
150impl From<PbCreateType> for CreateType {
151    fn from(create_type: PbCreateType) -> Self {
152        match create_type {
153            PbCreateType::Background => Self::Background,
154            PbCreateType::Foreground => Self::Foreground,
155            PbCreateType::Unspecified => unreachable!("Unspecified create type"),
156        }
157    }
158}
159
160impl CreateType {
161    pub fn as_str(&self) -> &'static str {
162        match self {
163            CreateType::Background => "BACKGROUND",
164            CreateType::Foreground => "FOREGROUND",
165        }
166    }
167}
168
169/// Defines struct with a single pb field that derives `FromJsonQueryResult`, it will helps to map json value stored in database to Pb struct.
170macro_rules! derive_from_json_struct {
171    ($struct_name:ident, $field_type:ty) => {
172        #[derive(Clone, Debug, PartialEq, FromJsonQueryResult, Serialize, Deserialize, Default)]
173        pub struct $struct_name(pub $field_type);
174        impl Eq for $struct_name {}
175        impl From<$field_type> for $struct_name {
176            fn from(value: $field_type) -> Self {
177                Self(value)
178            }
179        }
180
181        impl $struct_name {
182            pub fn into_inner(self) -> $field_type {
183                self.0
184            }
185
186            pub fn inner_ref(&self) -> &$field_type {
187                &self.0
188            }
189        }
190    };
191}
192
193/// Defines struct with a byte array that derives `DeriveValueType`, it will helps to map blob stored in database to Pb struct.
194macro_rules! derive_from_blob {
195    ($struct_name:ident, $field_type:ty) => {
196        #[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, DeriveValueType)]
197        pub struct $struct_name(#[sea_orm] Vec<u8>);
198
199        impl $struct_name {
200            pub fn to_protobuf(&self) -> $field_type {
201                prost::Message::decode(self.0.as_slice()).unwrap()
202            }
203
204            fn from_protobuf(val: &$field_type) -> Self {
205                Self(prost::Message::encode_to_vec(val))
206            }
207        }
208
209        impl sea_orm::sea_query::Nullable for $struct_name {
210            fn null() -> Value {
211                Value::Bytes(None)
212            }
213        }
214
215        impl From<&$field_type> for $struct_name {
216            fn from(value: &$field_type) -> Self {
217                Self::from_protobuf(value)
218            }
219        }
220
221        impl std::fmt::Debug for $struct_name {
222            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
223                self.to_protobuf().fmt(f)
224            }
225        }
226
227        impl Default for $struct_name {
228            fn default() -> Self {
229                Self::from_protobuf(&<$field_type>::default())
230            }
231        }
232    };
233}
234
235/// Defines struct with a byte array that derives `DeriveValueType`, it will helps to map blob stored in database to Pb struct array.
236macro_rules! derive_array_from_blob {
237    ($struct_name:ident, $field_type:ty, $field_array_name:ident) => {
238        #[derive(Clone, PartialEq, Eq, DeriveValueType, serde::Deserialize, serde::Serialize)]
239        pub struct $struct_name(#[sea_orm] Vec<u8>);
240
241        #[derive(Clone, PartialEq, ::prost::Message)]
242        pub struct $field_array_name {
243            #[prost(message, repeated, tag = "1")]
244            inner: Vec<$field_type>,
245        }
246        impl Eq for $field_array_name {}
247
248        impl $struct_name {
249            pub fn to_protobuf(&self) -> Vec<$field_type> {
250                let data: $field_array_name = prost::Message::decode(self.0.as_slice()).unwrap();
251                data.inner
252            }
253
254            fn from_protobuf(val: Vec<$field_type>) -> Self {
255                Self(prost::Message::encode_to_vec(&$field_array_name {
256                    inner: val,
257                }))
258            }
259        }
260
261        impl From<Vec<$field_type>> for $struct_name {
262            fn from(value: Vec<$field_type>) -> Self {
263                Self::from_protobuf(value)
264            }
265        }
266
267        impl std::fmt::Debug for $struct_name {
268            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
269                self.to_protobuf().fmt(f)
270            }
271        }
272
273        impl Default for $struct_name {
274            fn default() -> Self {
275                Self(vec![])
276            }
277        }
278
279        impl sea_orm::sea_query::Nullable for $struct_name {
280            fn null() -> Value {
281                Value::Bytes(None)
282            }
283        }
284    };
285}
286
287macro_rules! derive_btreemap_from_blob {
288    ($struct_name:ident, $key_type:ty, $value_type:ty, $field_type:ident) => {
289        #[derive(Clone, PartialEq, Eq, DeriveValueType, serde::Deserialize, serde::Serialize)]
290        pub struct $struct_name(#[sea_orm] Vec<u8>);
291
292        #[derive(Clone, PartialEq, ::prost::Message)]
293        pub struct $field_type {
294            #[prost(btree_map = "string, message")]
295            inner: BTreeMap<$key_type, $value_type>,
296        }
297        impl Eq for $field_type {}
298
299        impl $struct_name {
300            pub fn to_protobuf(&self) -> BTreeMap<$key_type, $value_type> {
301                let data: $field_type = prost::Message::decode(self.0.as_slice()).unwrap();
302                data.inner
303            }
304
305            fn from_protobuf(val: BTreeMap<$key_type, $value_type>) -> Self {
306                Self(prost::Message::encode_to_vec(&$field_type { inner: val }))
307            }
308        }
309
310        impl From<BTreeMap<$key_type, $value_type>> for $struct_name {
311            fn from(value: BTreeMap<$key_type, $value_type>) -> Self {
312                Self::from_protobuf(value)
313            }
314        }
315
316        impl std::fmt::Debug for $struct_name {
317            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
318                self.to_protobuf().fmt(f)
319            }
320        }
321
322        impl Default for $struct_name {
323            fn default() -> Self {
324                Self(vec![])
325            }
326        }
327
328        impl sea_orm::sea_query::Nullable for $struct_name {
329            fn null() -> Value {
330                Value::Bytes(None)
331            }
332        }
333    };
334}
335
336pub(crate) use {derive_array_from_blob, derive_from_blob};
337
338derive_from_json_struct!(I32Array, Vec<i32>);
339
340impl From<Vec<u32>> for I32Array {
341    fn from(value: Vec<u32>) -> Self {
342        Self(value.into_iter().map(|id| id as _).collect())
343    }
344}
345
346impl I32Array {
347    pub fn into_u32_array(self) -> Vec<u32> {
348        self.0.into_iter().map(|id| id as _).collect()
349    }
350}
351
352derive_from_json_struct!(ActorUpstreamActors, BTreeMap<FragmentId, Vec<ActorId>>);
353
354impl From<BTreeMap<u32, Vec<u32>>> for ActorUpstreamActors {
355    fn from(val: BTreeMap<u32, Vec<u32>>) -> Self {
356        let mut map = BTreeMap::new();
357        for (k, v) in val {
358            map.insert(k as _, v.into_iter().map(|a| a as _).collect());
359        }
360        Self(map)
361    }
362}
363
364derive_btreemap_from_blob!(SecretRef, String, PbSecretRef, PbSecretRefMap);
365
366derive_from_blob!(StreamNode, PbStreamNode);
367derive_from_blob!(DataType, risingwave_pb::data::PbDataType);
368derive_array_from_blob!(
369    DataTypeArray,
370    risingwave_pb::data::PbDataType,
371    PbDataTypeArray
372);
373derive_array_from_blob!(
374    FieldArray,
375    risingwave_pb::plan_common::PbField,
376    PbFieldArray
377);
378derive_from_json_struct!(Property, BTreeMap<String, String>);
379derive_from_blob!(ColumnCatalog, risingwave_pb::plan_common::PbColumnCatalog);
380derive_array_from_blob!(
381    ColumnCatalogArray,
382    risingwave_pb::plan_common::PbColumnCatalog,
383    PbColumnCatalogArray
384);
385derive_from_blob!(StreamSourceInfo, risingwave_pb::catalog::PbStreamSourceInfo);
386derive_from_blob!(
387    WebhookSourceInfo,
388    risingwave_pb::catalog::PbWebhookSourceInfo
389);
390derive_from_blob!(WatermarkDesc, risingwave_pb::catalog::PbWatermarkDesc);
391derive_array_from_blob!(
392    WatermarkDescArray,
393    risingwave_pb::catalog::PbWatermarkDesc,
394    PbWatermarkDescArray
395);
396derive_array_from_blob!(
397    ExprNodeArray,
398    risingwave_pb::expr::PbExprNode,
399    PbExprNodeArray
400);
401derive_array_from_blob!(
402    ColumnOrderArray,
403    risingwave_pb::common::PbColumnOrder,
404    PbColumnOrderArray
405);
406derive_array_from_blob!(
407    IndexColumnPropertiesArray,
408    risingwave_pb::catalog::PbIndexColumnProperties,
409    PbIndexColumnPropertiesArray
410);
411derive_from_blob!(SinkFormatDesc, risingwave_pb::catalog::PbSinkFormatDesc);
412derive_from_blob!(Cardinality, risingwave_pb::plan_common::PbCardinality);
413derive_from_blob!(TableVersion, risingwave_pb::catalog::table::PbTableVersion);
414derive_from_blob!(
415    PrivateLinkService,
416    risingwave_pb::catalog::connection::PbPrivateLinkService
417);
418derive_from_blob!(ConnectionParams, risingwave_pb::catalog::ConnectionParams);
419derive_from_blob!(AuthInfo, risingwave_pb::user::PbAuthInfo);
420
421derive_from_blob!(ConnectorSplits, risingwave_pb::source::ConnectorSplits);
422derive_from_blob!(VnodeBitmap, risingwave_pb::common::Buffer);
423derive_from_blob!(ActorMapping, risingwave_pb::stream_plan::PbActorMapping);
424derive_from_blob!(ExprContext, risingwave_pb::plan_common::PbExprContext);
425
426derive_array_from_blob!(
427    TypePairArray,
428    risingwave_pb::stream_plan::dispatch_output_mapping::TypePair,
429    PbTypePairArray
430);
431
432derive_array_from_blob!(
433    HummockVersionDeltaArray,
434    risingwave_pb::hummock::PbHummockVersionDelta,
435    PbHummockVersionDeltaArray
436);
437
438#[derive(Clone, Debug, PartialEq, FromJsonQueryResult, Serialize, Deserialize)]
439pub enum StreamingParallelism {
440    Adaptive,
441    Fixed(usize),
442    Custom,
443}
444
445impl Eq for StreamingParallelism {}
446
447#[derive(
448    Hash, Copy, Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize,
449)]
450#[sea_orm(rs_type = "String", db_type = "string(None)")]
451pub enum DispatcherType {
452    #[sea_orm(string_value = "HASH")]
453    Hash,
454    #[sea_orm(string_value = "BROADCAST")]
455    Broadcast,
456    #[sea_orm(string_value = "SIMPLE")]
457    Simple,
458    #[sea_orm(string_value = "NO_SHUFFLE")]
459    NoShuffle,
460}
461
462impl From<PbDispatcherType> for DispatcherType {
463    fn from(val: PbDispatcherType) -> Self {
464        match val {
465            PbDispatcherType::Unspecified => unreachable!(),
466            PbDispatcherType::Hash => DispatcherType::Hash,
467            PbDispatcherType::Broadcast => DispatcherType::Broadcast,
468            PbDispatcherType::Simple => DispatcherType::Simple,
469            PbDispatcherType::NoShuffle => DispatcherType::NoShuffle,
470        }
471    }
472}
473
474impl From<DispatcherType> for PbDispatcherType {
475    fn from(val: DispatcherType) -> Self {
476        match val {
477            DispatcherType::Hash => PbDispatcherType::Hash,
478            DispatcherType::Broadcast => PbDispatcherType::Broadcast,
479            DispatcherType::Simple => PbDispatcherType::Simple,
480            DispatcherType::NoShuffle => PbDispatcherType::NoShuffle,
481        }
482    }
483}