risingwave_meta_model/
lib.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16
17use risingwave_pb::catalog::{PbCreateType, PbStreamJobStatus};
18use risingwave_pb::meta::table_fragments::PbState as PbStreamJobState;
19use risingwave_pb::secret::PbSecretRef;
20use risingwave_pb::stream_plan::{PbDispatcherType, PbStreamNode};
21use sea_orm::entity::prelude::*;
22use sea_orm::{DeriveActiveEnum, EnumIter, FromJsonQueryResult};
23use serde::{Deserialize, Serialize};
24
25pub mod prelude;
26
27pub mod actor;
28pub mod catalog_version;
29pub mod cdc_table_snapshot_split;
30pub mod cluster;
31pub mod compaction_config;
32pub mod compaction_status;
33pub mod compaction_task;
34pub mod connection;
35pub mod database;
36pub mod exactly_once_iceberg_sink;
37pub mod fragment;
38pub mod fragment_relation;
39pub mod fragment_splits;
40pub mod function;
41pub mod hummock_epoch_to_version;
42pub mod hummock_gc_history;
43pub mod hummock_pinned_snapshot;
44pub mod hummock_pinned_version;
45pub mod hummock_sequence;
46pub mod hummock_sstable_info;
47pub mod hummock_time_travel_delta;
48pub mod hummock_time_travel_version;
49pub mod hummock_version_delta;
50pub mod hummock_version_stats;
51pub mod iceberg_namespace_properties;
52pub mod iceberg_tables;
53pub mod index;
54pub mod object;
55pub mod object_dependency;
56pub mod schema;
57pub mod secret;
58pub mod serde_seaql_migration;
59pub mod session_parameter;
60pub mod sink;
61pub mod source;
62pub mod source_splits;
63pub mod streaming_job;
64pub mod subscription;
65pub mod system_parameter;
66pub mod table;
67pub mod user;
68pub mod user_default_privilege;
69pub mod user_privilege;
70pub mod view;
71pub mod worker;
72pub mod worker_property;
73
74pub type WorkerId = i32;
75
76pub type TransactionId = i32;
77
78pub type ObjectId = i32;
79pub type DatabaseId = ObjectId;
80pub type SchemaId = ObjectId;
81pub type TableId = ObjectId;
82pub type SourceId = ObjectId;
83pub type SinkId = ObjectId;
84pub type SubscriptionId = ObjectId;
85pub type IndexId = ObjectId;
86pub type ViewId = ObjectId;
87pub type FunctionId = ObjectId;
88pub type ConnectionId = ObjectId;
89pub type SecretId = ObjectId;
90pub type UserId = i32;
91pub type PrivilegeId = i32;
92pub type DefaultPrivilegeId = i32;
93
94pub type HummockVersionId = i64;
95pub type Epoch = i64;
96pub type CompactionGroupId = i64;
97pub type CompactionTaskId = i64;
98pub type HummockSstableObjectId = i64;
99
100pub type FragmentId = i32;
101pub type ActorId = i32;
102
103#[derive(Clone, Copy, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
104#[sea_orm(rs_type = "String", db_type = "string(None)")]
105pub enum JobStatus {
106    #[sea_orm(string_value = "INITIAL")]
107    Initial,
108    #[sea_orm(string_value = "CREATING")]
109    Creating,
110    #[sea_orm(string_value = "CREATED")]
111    Created,
112}
113
114impl From<JobStatus> for PbStreamJobStatus {
115    fn from(job_status: JobStatus) -> Self {
116        match job_status {
117            JobStatus::Initial => Self::Unspecified,
118            JobStatus::Creating => Self::Creating,
119            JobStatus::Created => Self::Created,
120        }
121    }
122}
123
124// todo: deprecate job status in catalog and unify with this one.
125impl From<JobStatus> for PbStreamJobState {
126    fn from(status: JobStatus) -> Self {
127        match status {
128            JobStatus::Initial => PbStreamJobState::Initial,
129            JobStatus::Creating => PbStreamJobState::Creating,
130            JobStatus::Created => PbStreamJobState::Created,
131        }
132    }
133}
134
135#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
136#[sea_orm(rs_type = "String", db_type = "string(None)")]
137pub enum CreateType {
138    #[sea_orm(string_value = "BACKGROUND")]
139    Background,
140    #[sea_orm(string_value = "FOREGROUND")]
141    Foreground,
142}
143
144impl From<CreateType> for PbCreateType {
145    fn from(create_type: CreateType) -> Self {
146        match create_type {
147            CreateType::Background => Self::Background,
148            CreateType::Foreground => Self::Foreground,
149        }
150    }
151}
152
153impl From<PbCreateType> for CreateType {
154    fn from(create_type: PbCreateType) -> Self {
155        match create_type {
156            PbCreateType::Background => Self::Background,
157            PbCreateType::Foreground => Self::Foreground,
158            PbCreateType::Unspecified => unreachable!("Unspecified create type"),
159        }
160    }
161}
162
163impl CreateType {
164    pub fn as_str(&self) -> &'static str {
165        match self {
166            CreateType::Background => "BACKGROUND",
167            CreateType::Foreground => "FOREGROUND",
168        }
169    }
170}
171
172/// Defines struct with a single pb field that derives `FromJsonQueryResult`, it will helps to map json value stored in database to Pb struct.
173macro_rules! derive_from_json_struct {
174    ($struct_name:ident, $field_type:ty) => {
175        #[derive(Clone, Debug, PartialEq, FromJsonQueryResult, Serialize, Deserialize, Default)]
176        pub struct $struct_name(pub $field_type);
177        impl Eq for $struct_name {}
178        impl From<$field_type> for $struct_name {
179            fn from(value: $field_type) -> Self {
180                Self(value)
181            }
182        }
183
184        impl $struct_name {
185            pub fn into_inner(self) -> $field_type {
186                self.0
187            }
188
189            pub fn inner_ref(&self) -> &$field_type {
190                &self.0
191            }
192        }
193    };
194}
195
196/// Defines struct with a byte array that derives `DeriveValueType`, it will helps to map blob stored in database to Pb struct.
197macro_rules! derive_from_blob {
198    ($struct_name:ident, $field_type:ty) => {
199        #[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, DeriveValueType)]
200        pub struct $struct_name(#[sea_orm] Vec<u8>);
201
202        impl $struct_name {
203            pub fn to_protobuf(&self) -> $field_type {
204                prost::Message::decode(self.0.as_slice()).unwrap()
205            }
206
207            fn from_protobuf(val: &$field_type) -> Self {
208                Self(prost::Message::encode_to_vec(val))
209            }
210        }
211
212        impl sea_orm::sea_query::Nullable for $struct_name {
213            fn null() -> Value {
214                Value::Bytes(None)
215            }
216        }
217
218        impl From<&$field_type> for $struct_name {
219            fn from(value: &$field_type) -> Self {
220                Self::from_protobuf(value)
221            }
222        }
223
224        impl std::fmt::Debug for $struct_name {
225            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
226                self.to_protobuf().fmt(f)
227            }
228        }
229
230        impl Default for $struct_name {
231            fn default() -> Self {
232                Self::from_protobuf(&<$field_type>::default())
233            }
234        }
235    };
236}
237
238/// Defines struct with a byte array that derives `DeriveValueType`, it will helps to map blob stored in database to Pb struct array.
239macro_rules! derive_array_from_blob {
240    ($struct_name:ident, $field_type:ty, $field_array_name:ident) => {
241        #[derive(Clone, PartialEq, Eq, DeriveValueType, serde::Deserialize, serde::Serialize)]
242        pub struct $struct_name(#[sea_orm] Vec<u8>);
243
244        #[derive(Clone, PartialEq, ::prost::Message)]
245        pub struct $field_array_name {
246            #[prost(message, repeated, tag = "1")]
247            inner: Vec<$field_type>,
248        }
249        impl Eq for $field_array_name {}
250
251        impl $struct_name {
252            pub fn to_protobuf(&self) -> Vec<$field_type> {
253                let data: $field_array_name = prost::Message::decode(self.0.as_slice()).unwrap();
254                data.inner
255            }
256
257            fn from_protobuf(val: Vec<$field_type>) -> Self {
258                Self(prost::Message::encode_to_vec(&$field_array_name {
259                    inner: val,
260                }))
261            }
262        }
263
264        impl From<Vec<$field_type>> for $struct_name {
265            fn from(value: Vec<$field_type>) -> Self {
266                Self::from_protobuf(value)
267            }
268        }
269
270        impl std::fmt::Debug for $struct_name {
271            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
272                self.to_protobuf().fmt(f)
273            }
274        }
275
276        impl Default for $struct_name {
277            fn default() -> Self {
278                Self(vec![])
279            }
280        }
281
282        impl sea_orm::sea_query::Nullable for $struct_name {
283            fn null() -> Value {
284                Value::Bytes(None)
285            }
286        }
287    };
288}
289
290macro_rules! derive_btreemap_from_blob {
291    ($struct_name:ident, $key_type:ty, $value_type:ty, $field_type:ident) => {
292        #[derive(Clone, PartialEq, Eq, DeriveValueType, serde::Deserialize, serde::Serialize)]
293        pub struct $struct_name(#[sea_orm] Vec<u8>);
294
295        #[derive(Clone, PartialEq, ::prost::Message)]
296        pub struct $field_type {
297            #[prost(btree_map = "string, message")]
298            inner: BTreeMap<$key_type, $value_type>,
299        }
300        impl Eq for $field_type {}
301
302        impl $struct_name {
303            pub fn to_protobuf(&self) -> BTreeMap<$key_type, $value_type> {
304                let data: $field_type = prost::Message::decode(self.0.as_slice()).unwrap();
305                data.inner
306            }
307
308            fn from_protobuf(val: BTreeMap<$key_type, $value_type>) -> Self {
309                Self(prost::Message::encode_to_vec(&$field_type { inner: val }))
310            }
311        }
312
313        impl From<BTreeMap<$key_type, $value_type>> for $struct_name {
314            fn from(value: BTreeMap<$key_type, $value_type>) -> Self {
315                Self::from_protobuf(value)
316            }
317        }
318
319        impl std::fmt::Debug for $struct_name {
320            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
321                self.to_protobuf().fmt(f)
322            }
323        }
324
325        impl Default for $struct_name {
326            fn default() -> Self {
327                Self(vec![])
328            }
329        }
330
331        impl sea_orm::sea_query::Nullable for $struct_name {
332            fn null() -> Value {
333                Value::Bytes(None)
334            }
335        }
336    };
337}
338
339pub(crate) use {derive_array_from_blob, derive_from_blob};
340
341derive_from_json_struct!(I32Array, Vec<i32>);
342
343impl From<Vec<u32>> for I32Array {
344    fn from(value: Vec<u32>) -> Self {
345        Self(value.into_iter().map(|id| id as _).collect())
346    }
347}
348
349impl I32Array {
350    pub fn into_u32_array(self) -> Vec<u32> {
351        self.0.into_iter().map(|id| id as _).collect()
352    }
353}
354
355derive_from_json_struct!(ActorUpstreamActors, BTreeMap<FragmentId, Vec<ActorId>>);
356
357impl From<BTreeMap<u32, Vec<u32>>> for ActorUpstreamActors {
358    fn from(val: BTreeMap<u32, Vec<u32>>) -> Self {
359        let mut map = BTreeMap::new();
360        for (k, v) in val {
361            map.insert(k as _, v.into_iter().map(|a| a as _).collect());
362        }
363        Self(map)
364    }
365}
366
367derive_btreemap_from_blob!(SecretRef, String, PbSecretRef, PbSecretRefMap);
368
369derive_from_blob!(StreamNode, PbStreamNode);
370derive_from_blob!(DataType, risingwave_pb::data::PbDataType);
371derive_array_from_blob!(
372    DataTypeArray,
373    risingwave_pb::data::PbDataType,
374    PbDataTypeArray
375);
376derive_array_from_blob!(
377    FieldArray,
378    risingwave_pb::plan_common::PbField,
379    PbFieldArray
380);
381derive_from_json_struct!(Property, BTreeMap<String, String>);
382derive_from_blob!(ColumnCatalog, risingwave_pb::plan_common::PbColumnCatalog);
383derive_array_from_blob!(
384    ColumnCatalogArray,
385    risingwave_pb::plan_common::PbColumnCatalog,
386    PbColumnCatalogArray
387);
388derive_from_blob!(StreamSourceInfo, risingwave_pb::catalog::PbStreamSourceInfo);
389derive_from_blob!(
390    WebhookSourceInfo,
391    risingwave_pb::catalog::PbWebhookSourceInfo
392);
393derive_from_blob!(WatermarkDesc, risingwave_pb::catalog::PbWatermarkDesc);
394derive_array_from_blob!(
395    WatermarkDescArray,
396    risingwave_pb::catalog::PbWatermarkDesc,
397    PbWatermarkDescArray
398);
399derive_array_from_blob!(
400    ExprNodeArray,
401    risingwave_pb::expr::PbExprNode,
402    PbExprNodeArray
403);
404derive_array_from_blob!(
405    ColumnOrderArray,
406    risingwave_pb::common::PbColumnOrder,
407    PbColumnOrderArray
408);
409derive_array_from_blob!(
410    IndexColumnPropertiesArray,
411    risingwave_pb::catalog::PbIndexColumnProperties,
412    PbIndexColumnPropertiesArray
413);
414derive_from_blob!(SinkFormatDesc, risingwave_pb::catalog::PbSinkFormatDesc);
415derive_from_blob!(Cardinality, risingwave_pb::plan_common::PbCardinality);
416derive_from_blob!(TableVersion, risingwave_pb::catalog::table::PbTableVersion);
417derive_from_blob!(
418    PrivateLinkService,
419    risingwave_pb::catalog::connection::PbPrivateLinkService
420);
421derive_from_blob!(ConnectionParams, risingwave_pb::catalog::ConnectionParams);
422derive_from_blob!(AuthInfo, risingwave_pb::user::PbAuthInfo);
423
424derive_from_blob!(ConnectorSplits, risingwave_pb::source::ConnectorSplits);
425derive_from_blob!(VnodeBitmap, risingwave_pb::common::Buffer);
426derive_from_blob!(ActorMapping, risingwave_pb::stream_plan::PbActorMapping);
427derive_from_blob!(ExprContext, risingwave_pb::plan_common::PbExprContext);
428
429derive_array_from_blob!(
430    TypePairArray,
431    risingwave_pb::stream_plan::dispatch_output_mapping::TypePair,
432    PbTypePairArray
433);
434
435derive_array_from_blob!(
436    HummockVersionDeltaArray,
437    risingwave_pb::hummock::PbHummockVersionDelta,
438    PbHummockVersionDeltaArray
439);
440
441#[derive(Clone, Debug, PartialEq, FromJsonQueryResult, Serialize, Deserialize)]
442pub enum StreamingParallelism {
443    Adaptive,
444    Fixed(usize),
445    Custom,
446}
447
448impl Eq for StreamingParallelism {}
449
450#[derive(
451    Hash, Copy, Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize,
452)]
453#[sea_orm(rs_type = "String", db_type = "string(None)")]
454pub enum DispatcherType {
455    #[sea_orm(string_value = "HASH")]
456    Hash,
457    #[sea_orm(string_value = "BROADCAST")]
458    Broadcast,
459    #[sea_orm(string_value = "SIMPLE")]
460    Simple,
461    #[sea_orm(string_value = "NO_SHUFFLE")]
462    NoShuffle,
463}
464
465impl From<PbDispatcherType> for DispatcherType {
466    fn from(val: PbDispatcherType) -> Self {
467        match val {
468            PbDispatcherType::Unspecified => unreachable!(),
469            PbDispatcherType::Hash => DispatcherType::Hash,
470            PbDispatcherType::Broadcast => DispatcherType::Broadcast,
471            PbDispatcherType::Simple => DispatcherType::Simple,
472            PbDispatcherType::NoShuffle => DispatcherType::NoShuffle,
473        }
474    }
475}
476
477impl From<DispatcherType> for PbDispatcherType {
478    fn from(val: DispatcherType) -> Self {
479        match val {
480            DispatcherType::Hash => PbDispatcherType::Hash,
481            DispatcherType::Broadcast => PbDispatcherType::Broadcast,
482            DispatcherType::Simple => PbDispatcherType::Simple,
483            DispatcherType::NoShuffle => PbDispatcherType::NoShuffle,
484        }
485    }
486}