risingwave_meta_model/
lib.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16
17use risingwave_pb::catalog::{PbCreateType, PbStreamJobStatus};
18use risingwave_pb::meta::table_fragments::PbState as PbStreamJobState;
19use risingwave_pb::secret::PbSecretRef;
20use risingwave_pb::stream_plan::{PbDispatcherType, PbStreamNode};
21use sea_orm::entity::prelude::*;
22use sea_orm::{DeriveActiveEnum, EnumIter, FromJsonQueryResult};
23use serde::{Deserialize, Serialize};
24
25pub mod prelude;
26
27pub mod actor;
28pub mod catalog_version;
29pub mod cdc_table_snapshot_split;
30pub mod cluster;
31pub mod compaction_config;
32pub mod compaction_status;
33pub mod compaction_task;
34pub mod connection;
35pub mod database;
36pub mod exactly_once_iceberg_sink;
37pub mod fragment;
38pub mod fragment_relation;
39pub mod function;
40pub mod hummock_epoch_to_version;
41pub mod hummock_gc_history;
42pub mod hummock_pinned_snapshot;
43pub mod hummock_pinned_version;
44pub mod hummock_sequence;
45pub mod hummock_sstable_info;
46pub mod hummock_time_travel_delta;
47pub mod hummock_time_travel_version;
48pub mod hummock_version_delta;
49pub mod hummock_version_stats;
50pub mod iceberg_namespace_properties;
51pub mod iceberg_tables;
52pub mod index;
53pub mod object;
54pub mod object_dependency;
55pub mod schema;
56pub mod secret;
57pub mod serde_seaql_migration;
58pub mod session_parameter;
59pub mod sink;
60pub mod source;
61pub mod streaming_job;
62pub mod subscription;
63pub mod system_parameter;
64pub mod table;
65pub mod user;
66pub mod user_default_privilege;
67pub mod user_privilege;
68pub mod view;
69pub mod worker;
70pub mod worker_property;
71
72pub type WorkerId = i32;
73
74pub type TransactionId = i32;
75
76pub type ObjectId = i32;
77pub type DatabaseId = ObjectId;
78pub type SchemaId = ObjectId;
79pub type TableId = ObjectId;
80pub type SourceId = ObjectId;
81pub type SinkId = ObjectId;
82pub type SubscriptionId = ObjectId;
83pub type IndexId = ObjectId;
84pub type ViewId = ObjectId;
85pub type FunctionId = ObjectId;
86pub type ConnectionId = ObjectId;
87pub type SecretId = ObjectId;
88pub type UserId = i32;
89pub type PrivilegeId = i32;
90pub type DefaultPrivilegeId = i32;
91
92pub type HummockVersionId = i64;
93pub type Epoch = i64;
94pub type CompactionGroupId = i64;
95pub type CompactionTaskId = i64;
96pub type HummockSstableObjectId = i64;
97
98pub type FragmentId = i32;
99pub type ActorId = i32;
100
101#[derive(Clone, Copy, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
102#[sea_orm(rs_type = "String", db_type = "string(None)")]
103pub enum JobStatus {
104    #[sea_orm(string_value = "INITIAL")]
105    Initial,
106    #[sea_orm(string_value = "CREATING")]
107    Creating,
108    #[sea_orm(string_value = "CREATED")]
109    Created,
110}
111
112impl From<JobStatus> for PbStreamJobStatus {
113    fn from(job_status: JobStatus) -> Self {
114        match job_status {
115            JobStatus::Initial => Self::Unspecified,
116            JobStatus::Creating => Self::Creating,
117            JobStatus::Created => Self::Created,
118        }
119    }
120}
121
122// todo: deprecate job status in catalog and unify with this one.
123impl From<JobStatus> for PbStreamJobState {
124    fn from(status: JobStatus) -> Self {
125        match status {
126            JobStatus::Initial => PbStreamJobState::Initial,
127            JobStatus::Creating => PbStreamJobState::Creating,
128            JobStatus::Created => PbStreamJobState::Created,
129        }
130    }
131}
132
133#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
134#[sea_orm(rs_type = "String", db_type = "string(None)")]
135pub enum CreateType {
136    #[sea_orm(string_value = "BACKGROUND")]
137    Background,
138    #[sea_orm(string_value = "FOREGROUND")]
139    Foreground,
140}
141
142impl From<CreateType> for PbCreateType {
143    fn from(create_type: CreateType) -> Self {
144        match create_type {
145            CreateType::Background => Self::Background,
146            CreateType::Foreground => Self::Foreground,
147        }
148    }
149}
150
151impl From<PbCreateType> for CreateType {
152    fn from(create_type: PbCreateType) -> Self {
153        match create_type {
154            PbCreateType::Background => Self::Background,
155            PbCreateType::Foreground => Self::Foreground,
156            PbCreateType::Unspecified => unreachable!("Unspecified create type"),
157        }
158    }
159}
160
161impl CreateType {
162    pub fn as_str(&self) -> &'static str {
163        match self {
164            CreateType::Background => "BACKGROUND",
165            CreateType::Foreground => "FOREGROUND",
166        }
167    }
168}
169
170/// Defines struct with a single pb field that derives `FromJsonQueryResult`, it will helps to map json value stored in database to Pb struct.
171macro_rules! derive_from_json_struct {
172    ($struct_name:ident, $field_type:ty) => {
173        #[derive(Clone, Debug, PartialEq, FromJsonQueryResult, Serialize, Deserialize, Default)]
174        pub struct $struct_name(pub $field_type);
175        impl Eq for $struct_name {}
176        impl From<$field_type> for $struct_name {
177            fn from(value: $field_type) -> Self {
178                Self(value)
179            }
180        }
181
182        impl $struct_name {
183            pub fn into_inner(self) -> $field_type {
184                self.0
185            }
186
187            pub fn inner_ref(&self) -> &$field_type {
188                &self.0
189            }
190        }
191    };
192}
193
194/// Defines struct with a byte array that derives `DeriveValueType`, it will helps to map blob stored in database to Pb struct.
195macro_rules! derive_from_blob {
196    ($struct_name:ident, $field_type:ty) => {
197        #[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, DeriveValueType)]
198        pub struct $struct_name(#[sea_orm] Vec<u8>);
199
200        impl $struct_name {
201            pub fn to_protobuf(&self) -> $field_type {
202                prost::Message::decode(self.0.as_slice()).unwrap()
203            }
204
205            fn from_protobuf(val: &$field_type) -> Self {
206                Self(prost::Message::encode_to_vec(val))
207            }
208        }
209
210        impl sea_orm::sea_query::Nullable for $struct_name {
211            fn null() -> Value {
212                Value::Bytes(None)
213            }
214        }
215
216        impl From<&$field_type> for $struct_name {
217            fn from(value: &$field_type) -> Self {
218                Self::from_protobuf(value)
219            }
220        }
221
222        impl std::fmt::Debug for $struct_name {
223            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
224                self.to_protobuf().fmt(f)
225            }
226        }
227
228        impl Default for $struct_name {
229            fn default() -> Self {
230                Self::from_protobuf(&<$field_type>::default())
231            }
232        }
233    };
234}
235
236/// Defines struct with a byte array that derives `DeriveValueType`, it will helps to map blob stored in database to Pb struct array.
237macro_rules! derive_array_from_blob {
238    ($struct_name:ident, $field_type:ty, $field_array_name:ident) => {
239        #[derive(Clone, PartialEq, Eq, DeriveValueType, serde::Deserialize, serde::Serialize)]
240        pub struct $struct_name(#[sea_orm] Vec<u8>);
241
242        #[derive(Clone, PartialEq, ::prost::Message)]
243        pub struct $field_array_name {
244            #[prost(message, repeated, tag = "1")]
245            inner: Vec<$field_type>,
246        }
247        impl Eq for $field_array_name {}
248
249        impl $struct_name {
250            pub fn to_protobuf(&self) -> Vec<$field_type> {
251                let data: $field_array_name = prost::Message::decode(self.0.as_slice()).unwrap();
252                data.inner
253            }
254
255            fn from_protobuf(val: Vec<$field_type>) -> Self {
256                Self(prost::Message::encode_to_vec(&$field_array_name {
257                    inner: val,
258                }))
259            }
260        }
261
262        impl From<Vec<$field_type>> for $struct_name {
263            fn from(value: Vec<$field_type>) -> Self {
264                Self::from_protobuf(value)
265            }
266        }
267
268        impl std::fmt::Debug for $struct_name {
269            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
270                self.to_protobuf().fmt(f)
271            }
272        }
273
274        impl Default for $struct_name {
275            fn default() -> Self {
276                Self(vec![])
277            }
278        }
279
280        impl sea_orm::sea_query::Nullable for $struct_name {
281            fn null() -> Value {
282                Value::Bytes(None)
283            }
284        }
285    };
286}
287
288macro_rules! derive_btreemap_from_blob {
289    ($struct_name:ident, $key_type:ty, $value_type:ty, $field_type:ident) => {
290        #[derive(Clone, PartialEq, Eq, DeriveValueType, serde::Deserialize, serde::Serialize)]
291        pub struct $struct_name(#[sea_orm] Vec<u8>);
292
293        #[derive(Clone, PartialEq, ::prost::Message)]
294        pub struct $field_type {
295            #[prost(btree_map = "string, message")]
296            inner: BTreeMap<$key_type, $value_type>,
297        }
298        impl Eq for $field_type {}
299
300        impl $struct_name {
301            pub fn to_protobuf(&self) -> BTreeMap<$key_type, $value_type> {
302                let data: $field_type = prost::Message::decode(self.0.as_slice()).unwrap();
303                data.inner
304            }
305
306            fn from_protobuf(val: BTreeMap<$key_type, $value_type>) -> Self {
307                Self(prost::Message::encode_to_vec(&$field_type { inner: val }))
308            }
309        }
310
311        impl From<BTreeMap<$key_type, $value_type>> for $struct_name {
312            fn from(value: BTreeMap<$key_type, $value_type>) -> Self {
313                Self::from_protobuf(value)
314            }
315        }
316
317        impl std::fmt::Debug for $struct_name {
318            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
319                self.to_protobuf().fmt(f)
320            }
321        }
322
323        impl Default for $struct_name {
324            fn default() -> Self {
325                Self(vec![])
326            }
327        }
328
329        impl sea_orm::sea_query::Nullable for $struct_name {
330            fn null() -> Value {
331                Value::Bytes(None)
332            }
333        }
334    };
335}
336
337pub(crate) use {derive_array_from_blob, derive_from_blob};
338
339derive_from_json_struct!(I32Array, Vec<i32>);
340
341impl From<Vec<u32>> for I32Array {
342    fn from(value: Vec<u32>) -> Self {
343        Self(value.into_iter().map(|id| id as _).collect())
344    }
345}
346
347impl I32Array {
348    pub fn into_u32_array(self) -> Vec<u32> {
349        self.0.into_iter().map(|id| id as _).collect()
350    }
351}
352
353derive_from_json_struct!(ActorUpstreamActors, BTreeMap<FragmentId, Vec<ActorId>>);
354
355impl From<BTreeMap<u32, Vec<u32>>> for ActorUpstreamActors {
356    fn from(val: BTreeMap<u32, Vec<u32>>) -> Self {
357        let mut map = BTreeMap::new();
358        for (k, v) in val {
359            map.insert(k as _, v.into_iter().map(|a| a as _).collect());
360        }
361        Self(map)
362    }
363}
364
365derive_btreemap_from_blob!(SecretRef, String, PbSecretRef, PbSecretRefMap);
366
367derive_from_blob!(StreamNode, PbStreamNode);
368derive_from_blob!(DataType, risingwave_pb::data::PbDataType);
369derive_array_from_blob!(
370    DataTypeArray,
371    risingwave_pb::data::PbDataType,
372    PbDataTypeArray
373);
374derive_array_from_blob!(
375    FieldArray,
376    risingwave_pb::plan_common::PbField,
377    PbFieldArray
378);
379derive_from_json_struct!(Property, BTreeMap<String, String>);
380derive_from_blob!(ColumnCatalog, risingwave_pb::plan_common::PbColumnCatalog);
381derive_array_from_blob!(
382    ColumnCatalogArray,
383    risingwave_pb::plan_common::PbColumnCatalog,
384    PbColumnCatalogArray
385);
386derive_from_blob!(StreamSourceInfo, risingwave_pb::catalog::PbStreamSourceInfo);
387derive_from_blob!(
388    WebhookSourceInfo,
389    risingwave_pb::catalog::PbWebhookSourceInfo
390);
391derive_from_blob!(WatermarkDesc, risingwave_pb::catalog::PbWatermarkDesc);
392derive_array_from_blob!(
393    WatermarkDescArray,
394    risingwave_pb::catalog::PbWatermarkDesc,
395    PbWatermarkDescArray
396);
397derive_array_from_blob!(
398    ExprNodeArray,
399    risingwave_pb::expr::PbExprNode,
400    PbExprNodeArray
401);
402derive_array_from_blob!(
403    ColumnOrderArray,
404    risingwave_pb::common::PbColumnOrder,
405    PbColumnOrderArray
406);
407derive_array_from_blob!(
408    IndexColumnPropertiesArray,
409    risingwave_pb::catalog::PbIndexColumnProperties,
410    PbIndexColumnPropertiesArray
411);
412derive_from_blob!(SinkFormatDesc, risingwave_pb::catalog::PbSinkFormatDesc);
413derive_from_blob!(Cardinality, risingwave_pb::plan_common::PbCardinality);
414derive_from_blob!(TableVersion, risingwave_pb::catalog::table::PbTableVersion);
415derive_from_blob!(
416    PrivateLinkService,
417    risingwave_pb::catalog::connection::PbPrivateLinkService
418);
419derive_from_blob!(ConnectionParams, risingwave_pb::catalog::ConnectionParams);
420derive_from_blob!(AuthInfo, risingwave_pb::user::PbAuthInfo);
421
422derive_from_blob!(ConnectorSplits, risingwave_pb::source::ConnectorSplits);
423derive_from_blob!(VnodeBitmap, risingwave_pb::common::Buffer);
424derive_from_blob!(ActorMapping, risingwave_pb::stream_plan::PbActorMapping);
425derive_from_blob!(ExprContext, risingwave_pb::plan_common::PbExprContext);
426
427derive_array_from_blob!(
428    TypePairArray,
429    risingwave_pb::stream_plan::dispatch_output_mapping::TypePair,
430    PbTypePairArray
431);
432
433derive_array_from_blob!(
434    HummockVersionDeltaArray,
435    risingwave_pb::hummock::PbHummockVersionDelta,
436    PbHummockVersionDeltaArray
437);
438
439#[derive(Clone, Debug, PartialEq, FromJsonQueryResult, Serialize, Deserialize)]
440pub enum StreamingParallelism {
441    Adaptive,
442    Fixed(usize),
443    Custom,
444}
445
446impl Eq for StreamingParallelism {}
447
448#[derive(
449    Hash, Copy, Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize,
450)]
451#[sea_orm(rs_type = "String", db_type = "string(None)")]
452pub enum DispatcherType {
453    #[sea_orm(string_value = "HASH")]
454    Hash,
455    #[sea_orm(string_value = "BROADCAST")]
456    Broadcast,
457    #[sea_orm(string_value = "SIMPLE")]
458    Simple,
459    #[sea_orm(string_value = "NO_SHUFFLE")]
460    NoShuffle,
461}
462
463impl From<PbDispatcherType> for DispatcherType {
464    fn from(val: PbDispatcherType) -> Self {
465        match val {
466            PbDispatcherType::Unspecified => unreachable!(),
467            PbDispatcherType::Hash => DispatcherType::Hash,
468            PbDispatcherType::Broadcast => DispatcherType::Broadcast,
469            PbDispatcherType::Simple => DispatcherType::Simple,
470            PbDispatcherType::NoShuffle => DispatcherType::NoShuffle,
471        }
472    }
473}
474
475impl From<DispatcherType> for PbDispatcherType {
476    fn from(val: DispatcherType) -> Self {
477        match val {
478            DispatcherType::Hash => PbDispatcherType::Hash,
479            DispatcherType::Broadcast => PbDispatcherType::Broadcast,
480            DispatcherType::Simple => PbDispatcherType::Simple,
481            DispatcherType::NoShuffle => PbDispatcherType::NoShuffle,
482        }
483    }
484}