risingwave_meta_model/
lib.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16
17use risingwave_pb::catalog::{PbCreateType, PbStreamJobStatus};
18use risingwave_pb::meta::table_fragments::PbState as PbStreamJobState;
19use risingwave_pb::secret::PbSecretRef;
20use risingwave_pb::stream_plan::{PbDispatcherType, PbStreamNode};
21use sea_orm::entity::prelude::*;
22use sea_orm::{DeriveActiveEnum, EnumIter, FromJsonQueryResult};
23use serde::{Deserialize, Serialize};
24
25pub mod prelude;
26
27pub mod actor;
28pub mod catalog_version;
29pub mod cdc_table_snapshot_split;
30pub mod cluster;
31pub mod compaction_config;
32pub mod compaction_status;
33pub mod compaction_task;
34pub mod connection;
35pub mod database;
36pub mod exactly_once_iceberg_sink;
37pub mod fragment;
38pub mod fragment_relation;
39pub mod function;
40pub mod hummock_epoch_to_version;
41pub mod hummock_gc_history;
42pub mod hummock_pinned_snapshot;
43pub mod hummock_pinned_version;
44pub mod hummock_sequence;
45pub mod hummock_sstable_info;
46pub mod hummock_time_travel_delta;
47pub mod hummock_time_travel_version;
48pub mod hummock_version_delta;
49pub mod hummock_version_stats;
50pub mod iceberg_namespace_properties;
51pub mod iceberg_tables;
52pub mod index;
53pub mod object;
54pub mod object_dependency;
55pub mod schema;
56pub mod secret;
57pub mod serde_seaql_migration;
58pub mod session_parameter;
59pub mod sink;
60pub mod source;
61pub mod source_splits;
62pub mod streaming_job;
63pub mod subscription;
64pub mod system_parameter;
65pub mod table;
66pub mod user;
67pub mod user_default_privilege;
68pub mod user_privilege;
69pub mod view;
70pub mod worker;
71pub mod worker_property;
72
73pub type WorkerId = i32;
74
75pub type TransactionId = i32;
76
77pub type ObjectId = i32;
78pub type DatabaseId = ObjectId;
79pub type SchemaId = ObjectId;
80pub type TableId = ObjectId;
81pub type SourceId = ObjectId;
82pub type SinkId = ObjectId;
83pub type SubscriptionId = ObjectId;
84pub type IndexId = ObjectId;
85pub type ViewId = ObjectId;
86pub type FunctionId = ObjectId;
87pub type ConnectionId = ObjectId;
88pub type SecretId = ObjectId;
89pub type UserId = i32;
90pub type PrivilegeId = i32;
91pub type DefaultPrivilegeId = i32;
92
93pub type HummockVersionId = i64;
94pub type Epoch = i64;
95pub type CompactionGroupId = i64;
96pub type CompactionTaskId = i64;
97pub type HummockSstableObjectId = i64;
98
99pub type FragmentId = i32;
100pub type ActorId = i32;
101
102#[derive(Clone, Copy, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
103#[sea_orm(rs_type = "String", db_type = "string(None)")]
104pub enum JobStatus {
105    #[sea_orm(string_value = "INITIAL")]
106    Initial,
107    #[sea_orm(string_value = "CREATING")]
108    Creating,
109    #[sea_orm(string_value = "CREATED")]
110    Created,
111}
112
113impl From<JobStatus> for PbStreamJobStatus {
114    fn from(job_status: JobStatus) -> Self {
115        match job_status {
116            JobStatus::Initial => Self::Unspecified,
117            JobStatus::Creating => Self::Creating,
118            JobStatus::Created => Self::Created,
119        }
120    }
121}
122
123// todo: deprecate job status in catalog and unify with this one.
124impl From<JobStatus> for PbStreamJobState {
125    fn from(status: JobStatus) -> Self {
126        match status {
127            JobStatus::Initial => PbStreamJobState::Initial,
128            JobStatus::Creating => PbStreamJobState::Creating,
129            JobStatus::Created => PbStreamJobState::Created,
130        }
131    }
132}
133
134#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
135#[sea_orm(rs_type = "String", db_type = "string(None)")]
136pub enum CreateType {
137    #[sea_orm(string_value = "BACKGROUND")]
138    Background,
139    #[sea_orm(string_value = "FOREGROUND")]
140    Foreground,
141}
142
143impl From<CreateType> for PbCreateType {
144    fn from(create_type: CreateType) -> Self {
145        match create_type {
146            CreateType::Background => Self::Background,
147            CreateType::Foreground => Self::Foreground,
148        }
149    }
150}
151
152impl From<PbCreateType> for CreateType {
153    fn from(create_type: PbCreateType) -> Self {
154        match create_type {
155            PbCreateType::Background => Self::Background,
156            PbCreateType::Foreground => Self::Foreground,
157            PbCreateType::Unspecified => unreachable!("Unspecified create type"),
158        }
159    }
160}
161
162impl CreateType {
163    pub fn as_str(&self) -> &'static str {
164        match self {
165            CreateType::Background => "BACKGROUND",
166            CreateType::Foreground => "FOREGROUND",
167        }
168    }
169}
170
171/// Defines struct with a single pb field that derives `FromJsonQueryResult`, it will helps to map json value stored in database to Pb struct.
172macro_rules! derive_from_json_struct {
173    ($struct_name:ident, $field_type:ty) => {
174        #[derive(Clone, Debug, PartialEq, FromJsonQueryResult, Serialize, Deserialize, Default)]
175        pub struct $struct_name(pub $field_type);
176        impl Eq for $struct_name {}
177        impl From<$field_type> for $struct_name {
178            fn from(value: $field_type) -> Self {
179                Self(value)
180            }
181        }
182
183        impl $struct_name {
184            pub fn into_inner(self) -> $field_type {
185                self.0
186            }
187
188            pub fn inner_ref(&self) -> &$field_type {
189                &self.0
190            }
191        }
192    };
193}
194
195/// Defines struct with a byte array that derives `DeriveValueType`, it will helps to map blob stored in database to Pb struct.
196macro_rules! derive_from_blob {
197    ($struct_name:ident, $field_type:ty) => {
198        #[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, DeriveValueType)]
199        pub struct $struct_name(#[sea_orm] Vec<u8>);
200
201        impl $struct_name {
202            pub fn to_protobuf(&self) -> $field_type {
203                prost::Message::decode(self.0.as_slice()).unwrap()
204            }
205
206            fn from_protobuf(val: &$field_type) -> Self {
207                Self(prost::Message::encode_to_vec(val))
208            }
209        }
210
211        impl sea_orm::sea_query::Nullable for $struct_name {
212            fn null() -> Value {
213                Value::Bytes(None)
214            }
215        }
216
217        impl From<&$field_type> for $struct_name {
218            fn from(value: &$field_type) -> Self {
219                Self::from_protobuf(value)
220            }
221        }
222
223        impl std::fmt::Debug for $struct_name {
224            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
225                self.to_protobuf().fmt(f)
226            }
227        }
228
229        impl Default for $struct_name {
230            fn default() -> Self {
231                Self::from_protobuf(&<$field_type>::default())
232            }
233        }
234    };
235}
236
237/// Defines struct with a byte array that derives `DeriveValueType`, it will helps to map blob stored in database to Pb struct array.
238macro_rules! derive_array_from_blob {
239    ($struct_name:ident, $field_type:ty, $field_array_name:ident) => {
240        #[derive(Clone, PartialEq, Eq, DeriveValueType, serde::Deserialize, serde::Serialize)]
241        pub struct $struct_name(#[sea_orm] Vec<u8>);
242
243        #[derive(Clone, PartialEq, ::prost::Message)]
244        pub struct $field_array_name {
245            #[prost(message, repeated, tag = "1")]
246            inner: Vec<$field_type>,
247        }
248        impl Eq for $field_array_name {}
249
250        impl $struct_name {
251            pub fn to_protobuf(&self) -> Vec<$field_type> {
252                let data: $field_array_name = prost::Message::decode(self.0.as_slice()).unwrap();
253                data.inner
254            }
255
256            fn from_protobuf(val: Vec<$field_type>) -> Self {
257                Self(prost::Message::encode_to_vec(&$field_array_name {
258                    inner: val,
259                }))
260            }
261        }
262
263        impl From<Vec<$field_type>> for $struct_name {
264            fn from(value: Vec<$field_type>) -> Self {
265                Self::from_protobuf(value)
266            }
267        }
268
269        impl std::fmt::Debug for $struct_name {
270            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
271                self.to_protobuf().fmt(f)
272            }
273        }
274
275        impl Default for $struct_name {
276            fn default() -> Self {
277                Self(vec![])
278            }
279        }
280
281        impl sea_orm::sea_query::Nullable for $struct_name {
282            fn null() -> Value {
283                Value::Bytes(None)
284            }
285        }
286    };
287}
288
289macro_rules! derive_btreemap_from_blob {
290    ($struct_name:ident, $key_type:ty, $value_type:ty, $field_type:ident) => {
291        #[derive(Clone, PartialEq, Eq, DeriveValueType, serde::Deserialize, serde::Serialize)]
292        pub struct $struct_name(#[sea_orm] Vec<u8>);
293
294        #[derive(Clone, PartialEq, ::prost::Message)]
295        pub struct $field_type {
296            #[prost(btree_map = "string, message")]
297            inner: BTreeMap<$key_type, $value_type>,
298        }
299        impl Eq for $field_type {}
300
301        impl $struct_name {
302            pub fn to_protobuf(&self) -> BTreeMap<$key_type, $value_type> {
303                let data: $field_type = prost::Message::decode(self.0.as_slice()).unwrap();
304                data.inner
305            }
306
307            fn from_protobuf(val: BTreeMap<$key_type, $value_type>) -> Self {
308                Self(prost::Message::encode_to_vec(&$field_type { inner: val }))
309            }
310        }
311
312        impl From<BTreeMap<$key_type, $value_type>> for $struct_name {
313            fn from(value: BTreeMap<$key_type, $value_type>) -> Self {
314                Self::from_protobuf(value)
315            }
316        }
317
318        impl std::fmt::Debug for $struct_name {
319            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
320                self.to_protobuf().fmt(f)
321            }
322        }
323
324        impl Default for $struct_name {
325            fn default() -> Self {
326                Self(vec![])
327            }
328        }
329
330        impl sea_orm::sea_query::Nullable for $struct_name {
331            fn null() -> Value {
332                Value::Bytes(None)
333            }
334        }
335    };
336}
337
338pub(crate) use {derive_array_from_blob, derive_from_blob};
339
340derive_from_json_struct!(I32Array, Vec<i32>);
341
342impl From<Vec<u32>> for I32Array {
343    fn from(value: Vec<u32>) -> Self {
344        Self(value.into_iter().map(|id| id as _).collect())
345    }
346}
347
348impl I32Array {
349    pub fn into_u32_array(self) -> Vec<u32> {
350        self.0.into_iter().map(|id| id as _).collect()
351    }
352}
353
354derive_from_json_struct!(ActorUpstreamActors, BTreeMap<FragmentId, Vec<ActorId>>);
355
356impl From<BTreeMap<u32, Vec<u32>>> for ActorUpstreamActors {
357    fn from(val: BTreeMap<u32, Vec<u32>>) -> Self {
358        let mut map = BTreeMap::new();
359        for (k, v) in val {
360            map.insert(k as _, v.into_iter().map(|a| a as _).collect());
361        }
362        Self(map)
363    }
364}
365
366derive_btreemap_from_blob!(SecretRef, String, PbSecretRef, PbSecretRefMap);
367
368derive_from_blob!(StreamNode, PbStreamNode);
369derive_from_blob!(DataType, risingwave_pb::data::PbDataType);
370derive_array_from_blob!(
371    DataTypeArray,
372    risingwave_pb::data::PbDataType,
373    PbDataTypeArray
374);
375derive_array_from_blob!(
376    FieldArray,
377    risingwave_pb::plan_common::PbField,
378    PbFieldArray
379);
380derive_from_json_struct!(Property, BTreeMap<String, String>);
381derive_from_blob!(ColumnCatalog, risingwave_pb::plan_common::PbColumnCatalog);
382derive_array_from_blob!(
383    ColumnCatalogArray,
384    risingwave_pb::plan_common::PbColumnCatalog,
385    PbColumnCatalogArray
386);
387derive_from_blob!(StreamSourceInfo, risingwave_pb::catalog::PbStreamSourceInfo);
388derive_from_blob!(
389    WebhookSourceInfo,
390    risingwave_pb::catalog::PbWebhookSourceInfo
391);
392derive_from_blob!(WatermarkDesc, risingwave_pb::catalog::PbWatermarkDesc);
393derive_array_from_blob!(
394    WatermarkDescArray,
395    risingwave_pb::catalog::PbWatermarkDesc,
396    PbWatermarkDescArray
397);
398derive_array_from_blob!(
399    ExprNodeArray,
400    risingwave_pb::expr::PbExprNode,
401    PbExprNodeArray
402);
403derive_array_from_blob!(
404    ColumnOrderArray,
405    risingwave_pb::common::PbColumnOrder,
406    PbColumnOrderArray
407);
408derive_array_from_blob!(
409    IndexColumnPropertiesArray,
410    risingwave_pb::catalog::PbIndexColumnProperties,
411    PbIndexColumnPropertiesArray
412);
413derive_from_blob!(SinkFormatDesc, risingwave_pb::catalog::PbSinkFormatDesc);
414derive_from_blob!(Cardinality, risingwave_pb::plan_common::PbCardinality);
415derive_from_blob!(TableVersion, risingwave_pb::catalog::table::PbTableVersion);
416derive_from_blob!(
417    PrivateLinkService,
418    risingwave_pb::catalog::connection::PbPrivateLinkService
419);
420derive_from_blob!(ConnectionParams, risingwave_pb::catalog::ConnectionParams);
421derive_from_blob!(AuthInfo, risingwave_pb::user::PbAuthInfo);
422
423derive_from_blob!(ConnectorSplits, risingwave_pb::source::ConnectorSplits);
424derive_from_blob!(VnodeBitmap, risingwave_pb::common::Buffer);
425derive_from_blob!(ActorMapping, risingwave_pb::stream_plan::PbActorMapping);
426derive_from_blob!(ExprContext, risingwave_pb::plan_common::PbExprContext);
427
428derive_array_from_blob!(
429    TypePairArray,
430    risingwave_pb::stream_plan::dispatch_output_mapping::TypePair,
431    PbTypePairArray
432);
433
434derive_array_from_blob!(
435    HummockVersionDeltaArray,
436    risingwave_pb::hummock::PbHummockVersionDelta,
437    PbHummockVersionDeltaArray
438);
439
440#[derive(Clone, Debug, PartialEq, FromJsonQueryResult, Serialize, Deserialize)]
441pub enum StreamingParallelism {
442    Adaptive,
443    Fixed(usize),
444    Custom,
445}
446
447impl Eq for StreamingParallelism {}
448
449#[derive(
450    Hash, Copy, Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize,
451)]
452#[sea_orm(rs_type = "String", db_type = "string(None)")]
453pub enum DispatcherType {
454    #[sea_orm(string_value = "HASH")]
455    Hash,
456    #[sea_orm(string_value = "BROADCAST")]
457    Broadcast,
458    #[sea_orm(string_value = "SIMPLE")]
459    Simple,
460    #[sea_orm(string_value = "NO_SHUFFLE")]
461    NoShuffle,
462}
463
464impl From<PbDispatcherType> for DispatcherType {
465    fn from(val: PbDispatcherType) -> Self {
466        match val {
467            PbDispatcherType::Unspecified => unreachable!(),
468            PbDispatcherType::Hash => DispatcherType::Hash,
469            PbDispatcherType::Broadcast => DispatcherType::Broadcast,
470            PbDispatcherType::Simple => DispatcherType::Simple,
471            PbDispatcherType::NoShuffle => DispatcherType::NoShuffle,
472        }
473    }
474}
475
476impl From<DispatcherType> for PbDispatcherType {
477    fn from(val: DispatcherType) -> Self {
478        match val {
479            DispatcherType::Hash => PbDispatcherType::Hash,
480            DispatcherType::Broadcast => PbDispatcherType::Broadcast,
481            DispatcherType::Simple => PbDispatcherType::Simple,
482            DispatcherType::NoShuffle => PbDispatcherType::NoShuffle,
483        }
484    }
485}