risingwave_meta_model/
lib.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16
17pub use risingwave_common::id::*;
18use risingwave_pb::catalog::{PbCreateType, PbStreamJobStatus};
19use risingwave_pb::meta::table_fragments::PbState as PbStreamJobState;
20use risingwave_pb::secret::PbSecretRef;
21use risingwave_pb::stream_plan::{PbDispatcherType, PbStreamNode};
22use sea_orm::entity::prelude::*;
23use sea_orm::{DeriveActiveEnum, EnumIter, FromJsonQueryResult, Value};
24use serde::{Deserialize, Serialize};
25
26pub mod prelude;
27
28pub mod catalog_version;
29pub mod cdc_table_snapshot_split;
30pub mod cluster;
31pub mod compaction_config;
32pub mod compaction_status;
33pub mod compaction_task;
34pub mod connection;
35pub mod database;
36pub mod exactly_once_iceberg_sink;
37pub mod fragment;
38pub mod fragment_relation;
39pub mod fragment_splits;
40pub mod function;
41pub mod hummock_epoch_to_version;
42pub mod hummock_gc_history;
43pub mod hummock_pinned_snapshot;
44pub mod hummock_pinned_version;
45pub mod hummock_sequence;
46pub mod hummock_sstable_info;
47pub mod hummock_table_change_log;
48pub mod hummock_time_travel_delta;
49pub mod hummock_time_travel_version;
50pub mod hummock_version_delta;
51pub mod hummock_version_stats;
52pub mod iceberg_namespace_properties;
53pub mod iceberg_tables;
54pub mod index;
55pub mod object;
56pub mod object_dependency;
57pub mod pending_sink_state;
58pub mod refresh_job;
59pub mod schema;
60pub mod secret;
61pub mod serde_seaql_migration;
62pub mod session_parameter;
63pub mod sink;
64pub mod source;
65pub mod streaming_job;
66pub mod subscription;
67pub mod system_parameter;
68pub mod table;
69pub mod user;
70pub mod user_default_privilege;
71pub mod user_privilege;
72pub mod view;
73pub mod worker;
74pub mod worker_property;
75
76pub type TransactionId = i32;
77
78pub type PrivilegeId = i32;
79pub type DefaultPrivilegeId = i32;
80
81pub use risingwave_pb::id::{CompactionGroupId, HummockSstableObjectId, HummockVersionId};
82pub type Epoch = i64;
83pub type CompactionTaskId = i64;
84
85#[derive(Clone, Copy, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
86#[sea_orm(rs_type = "String", db_type = "string(None)")]
87pub enum JobStatus {
88    #[sea_orm(string_value = "INITIAL")]
89    Initial,
90    #[sea_orm(string_value = "CREATING")]
91    Creating,
92    #[sea_orm(string_value = "CREATED")]
93    Created,
94}
95
96impl From<JobStatus> for PbStreamJobStatus {
97    fn from(job_status: JobStatus) -> Self {
98        match job_status {
99            JobStatus::Initial => Self::Unspecified,
100            JobStatus::Creating => Self::Creating,
101            JobStatus::Created => Self::Created,
102        }
103    }
104}
105
106// todo: deprecate job status in catalog and unify with this one.
107impl From<JobStatus> for PbStreamJobState {
108    fn from(status: JobStatus) -> Self {
109        match status {
110            JobStatus::Initial => PbStreamJobState::Initial,
111            JobStatus::Creating => PbStreamJobState::Creating,
112            JobStatus::Created => PbStreamJobState::Created,
113        }
114    }
115}
116
117#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
118#[sea_orm(rs_type = "String", db_type = "string(None)")]
119pub enum CreateType {
120    #[sea_orm(string_value = "BACKGROUND")]
121    Background,
122    #[sea_orm(string_value = "FOREGROUND")]
123    Foreground,
124}
125
126impl From<CreateType> for PbCreateType {
127    fn from(create_type: CreateType) -> Self {
128        match create_type {
129            CreateType::Background => Self::Background,
130            CreateType::Foreground => Self::Foreground,
131        }
132    }
133}
134
135impl From<PbCreateType> for CreateType {
136    fn from(create_type: PbCreateType) -> Self {
137        match create_type {
138            PbCreateType::Background => Self::Background,
139            PbCreateType::Foreground => Self::Foreground,
140            PbCreateType::Unspecified => unreachable!("Unspecified create type"),
141        }
142    }
143}
144
145impl CreateType {
146    pub fn as_str(&self) -> &'static str {
147        match self {
148            CreateType::Background => "BACKGROUND",
149            CreateType::Foreground => "FOREGROUND",
150        }
151    }
152}
153
154/// Defines struct with a single pb field that derives `FromJsonQueryResult`, it will helps to map json value stored in database to Pb struct.
155#[macro_export]
156macro_rules! derive_from_json_struct {
157    ($struct_name:ident, $field_type:ty) => {
158        #[derive(Clone, Debug, PartialEq, FromJsonQueryResult, Serialize, Deserialize, Default)]
159        pub struct $struct_name(pub $field_type);
160        impl Eq for $struct_name {}
161        impl From<$field_type> for $struct_name {
162            fn from(value: $field_type) -> Self {
163                Self(value)
164            }
165        }
166
167        impl $struct_name {
168            pub fn into_inner(self) -> $field_type {
169                self.0
170            }
171
172            pub fn inner_ref(&self) -> &$field_type {
173                &self.0
174            }
175        }
176    };
177}
178
179/// Defines struct with a byte array that derives `DeriveValueType`, it will helps to map blob stored in database to Pb struct.
180macro_rules! derive_from_blob {
181    ($struct_name:ident, $field_type:ty) => {
182        #[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, DeriveValueType)]
183        pub struct $struct_name(#[sea_orm] Vec<u8>);
184
185        impl $struct_name {
186            pub fn to_protobuf(&self) -> $field_type {
187                prost::Message::decode(self.0.as_slice()).unwrap()
188            }
189
190            fn from_protobuf(val: &$field_type) -> Self {
191                Self(prost::Message::encode_to_vec(val))
192            }
193        }
194
195        impl sea_orm::sea_query::Nullable for $struct_name {
196            fn null() -> Value {
197                Value::Bytes(None)
198            }
199        }
200
201        impl From<&$field_type> for $struct_name {
202            fn from(value: &$field_type) -> Self {
203                Self::from_protobuf(value)
204            }
205        }
206
207        impl std::fmt::Debug for $struct_name {
208            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
209                self.to_protobuf().fmt(f)
210            }
211        }
212
213        impl Default for $struct_name {
214            fn default() -> Self {
215                Self::from_protobuf(&<$field_type>::default())
216            }
217        }
218    };
219}
220
221/// Defines struct with a byte array that derives `DeriveValueType`, it will helps to map blob stored in database to Pb struct array.
222macro_rules! derive_array_from_blob {
223    ($struct_name:ident, $field_type:ty, $field_array_name:ident) => {
224        #[derive(Clone, PartialEq, Eq, DeriveValueType, serde::Deserialize, serde::Serialize)]
225        pub struct $struct_name(#[sea_orm] Vec<u8>);
226
227        #[derive(Clone, PartialEq, ::prost::Message)]
228        pub struct $field_array_name {
229            #[prost(message, repeated, tag = "1")]
230            inner: Vec<$field_type>,
231        }
232        impl Eq for $field_array_name {}
233
234        impl $struct_name {
235            pub fn to_protobuf(&self) -> Vec<$field_type> {
236                let data: $field_array_name = prost::Message::decode(self.0.as_slice()).unwrap();
237                data.inner
238            }
239
240            fn from_protobuf(val: Vec<$field_type>) -> Self {
241                Self(prost::Message::encode_to_vec(&$field_array_name {
242                    inner: val,
243                }))
244            }
245        }
246
247        impl From<Vec<$field_type>> for $struct_name {
248            fn from(value: Vec<$field_type>) -> Self {
249                Self::from_protobuf(value)
250            }
251        }
252
253        impl std::fmt::Debug for $struct_name {
254            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
255                self.to_protobuf().fmt(f)
256            }
257        }
258
259        impl Default for $struct_name {
260            fn default() -> Self {
261                Self(vec![])
262            }
263        }
264
265        impl sea_orm::sea_query::Nullable for $struct_name {
266            fn null() -> Value {
267                Value::Bytes(None)
268            }
269        }
270    };
271}
272
273macro_rules! derive_btreemap_from_blob {
274    ($struct_name:ident, $key_type:ty, $value_type:ty, $field_type:ident) => {
275        #[derive(Clone, PartialEq, Eq, DeriveValueType, serde::Deserialize, serde::Serialize)]
276        pub struct $struct_name(#[sea_orm] Vec<u8>);
277
278        #[derive(Clone, PartialEq, ::prost::Message)]
279        pub struct $field_type {
280            #[prost(btree_map = "string, message")]
281            inner: BTreeMap<$key_type, $value_type>,
282        }
283        impl Eq for $field_type {}
284
285        impl $struct_name {
286            pub fn to_protobuf(&self) -> BTreeMap<$key_type, $value_type> {
287                let data: $field_type = prost::Message::decode(self.0.as_slice()).unwrap();
288                data.inner
289            }
290
291            fn from_protobuf(val: BTreeMap<$key_type, $value_type>) -> Self {
292                Self(prost::Message::encode_to_vec(&$field_type { inner: val }))
293            }
294        }
295
296        impl From<BTreeMap<$key_type, $value_type>> for $struct_name {
297            fn from(value: BTreeMap<$key_type, $value_type>) -> Self {
298                Self::from_protobuf(value)
299            }
300        }
301
302        impl std::fmt::Debug for $struct_name {
303            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
304                self.to_protobuf().fmt(f)
305            }
306        }
307
308        impl Default for $struct_name {
309            fn default() -> Self {
310                Self(vec![])
311            }
312        }
313
314        impl sea_orm::sea_query::Nullable for $struct_name {
315            fn null() -> Value {
316                Value::Bytes(None)
317            }
318        }
319    };
320}
321
322pub(crate) use {derive_array_from_blob, derive_from_blob};
323
324derive_from_json_struct!(TableIdArray, Vec<TableId>);
325
326derive_from_json_struct!(EpochArray, Vec<Epoch>);
327
328derive_from_json_struct!(I32Array, Vec<i32>);
329
330impl From<Vec<u32>> for I32Array {
331    fn from(value: Vec<u32>) -> Self {
332        Self(value.into_iter().map(|id| id as _).collect())
333    }
334}
335
336impl I32Array {
337    pub fn into_u32_array(self) -> Vec<u32> {
338        self.0.into_iter().map(|id| id as _).collect()
339    }
340}
341
342derive_btreemap_from_blob!(SecretRef, String, PbSecretRef, PbSecretRefMap);
343
344derive_from_blob!(StreamNode, PbStreamNode);
345derive_from_blob!(DataType, risingwave_pb::data::PbDataType);
346derive_array_from_blob!(
347    DataTypeArray,
348    risingwave_pb::data::PbDataType,
349    PbDataTypeArray
350);
351derive_array_from_blob!(
352    FieldArray,
353    risingwave_pb::plan_common::PbField,
354    PbFieldArray
355);
356derive_from_json_struct!(Property, BTreeMap<String, String>);
357derive_from_blob!(ColumnCatalog, risingwave_pb::plan_common::PbColumnCatalog);
358derive_array_from_blob!(
359    ColumnCatalogArray,
360    risingwave_pb::plan_common::PbColumnCatalog,
361    PbColumnCatalogArray
362);
363derive_from_blob!(StreamSourceInfo, risingwave_pb::catalog::PbStreamSourceInfo);
364derive_from_blob!(
365    WebhookSourceInfo,
366    risingwave_pb::catalog::PbWebhookSourceInfo
367);
368derive_from_blob!(WatermarkDesc, risingwave_pb::catalog::PbWatermarkDesc);
369derive_array_from_blob!(
370    WatermarkDescArray,
371    risingwave_pb::catalog::PbWatermarkDesc,
372    PbWatermarkDescArray
373);
374derive_array_from_blob!(
375    ExprNodeArray,
376    risingwave_pb::expr::PbExprNode,
377    PbExprNodeArray
378);
379derive_array_from_blob!(
380    ColumnOrderArray,
381    risingwave_pb::common::PbColumnOrder,
382    PbColumnOrderArray
383);
384derive_array_from_blob!(
385    IndexColumnPropertiesArray,
386    risingwave_pb::catalog::PbIndexColumnProperties,
387    PbIndexColumnPropertiesArray
388);
389derive_from_blob!(SinkFormatDesc, risingwave_pb::catalog::PbSinkFormatDesc);
390derive_from_blob!(Cardinality, risingwave_pb::plan_common::PbCardinality);
391derive_from_blob!(TableVersion, risingwave_pb::catalog::table::PbTableVersion);
392derive_from_blob!(
393    PrivateLinkService,
394    risingwave_pb::catalog::connection::PbPrivateLinkService
395);
396derive_from_blob!(ConnectionParams, risingwave_pb::catalog::ConnectionParams);
397derive_from_blob!(AuthInfo, risingwave_pb::user::PbAuthInfo);
398
399derive_from_blob!(ConnectorSplits, risingwave_pb::source::ConnectorSplits);
400derive_from_blob!(VnodeBitmap, risingwave_pb::common::Buffer);
401derive_from_blob!(ActorMapping, risingwave_pb::stream_plan::PbActorMapping);
402derive_from_blob!(ExprContext, risingwave_pb::plan_common::PbExprContext);
403derive_from_blob!(
404    SourceRefreshMode,
405    risingwave_pb::plan_common::PbSourceRefreshMode
406);
407
408derive_from_blob!(
409    SinkSchemachange,
410    risingwave_pb::stream_plan::PbSinkSchemaChange
411);
412
413derive_array_from_blob!(
414    TypePairArray,
415    risingwave_pb::stream_plan::dispatch_output_mapping::TypePair,
416    PbTypePairArray
417);
418
419derive_array_from_blob!(
420    HummockVersionDeltaArray,
421    risingwave_pb::hummock::PbHummockVersionDelta,
422    PbHummockVersionDeltaArray
423);
424
425derive_array_from_blob!(
426    SstableInfoArray,
427    risingwave_pb::hummock::PbSstableInfo,
428    PbSstableInfoArray
429);
430
431#[derive(Clone, Debug, PartialEq, FromJsonQueryResult, Serialize, Deserialize)]
432pub enum StreamingParallelism {
433    Adaptive,
434    Fixed(usize),
435    Custom,
436}
437
438impl Eq for StreamingParallelism {}
439
440#[derive(
441    Hash, Copy, Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize,
442)]
443#[sea_orm(rs_type = "String", db_type = "string(None)")]
444pub enum DispatcherType {
445    #[sea_orm(string_value = "HASH")]
446    Hash,
447    #[sea_orm(string_value = "BROADCAST")]
448    Broadcast,
449    #[sea_orm(string_value = "SIMPLE")]
450    Simple,
451    #[sea_orm(string_value = "NO_SHUFFLE")]
452    NoShuffle,
453}
454
455impl From<PbDispatcherType> for DispatcherType {
456    fn from(val: PbDispatcherType) -> Self {
457        match val {
458            PbDispatcherType::Unspecified => unreachable!(),
459            PbDispatcherType::Hash => DispatcherType::Hash,
460            PbDispatcherType::Broadcast => DispatcherType::Broadcast,
461            PbDispatcherType::Simple => DispatcherType::Simple,
462            PbDispatcherType::NoShuffle => DispatcherType::NoShuffle,
463        }
464    }
465}
466
467impl From<DispatcherType> for PbDispatcherType {
468    fn from(val: DispatcherType) -> Self {
469        match val {
470            DispatcherType::Hash => PbDispatcherType::Hash,
471            DispatcherType::Broadcast => PbDispatcherType::Broadcast,
472            DispatcherType::Simple => PbDispatcherType::Simple,
473            DispatcherType::NoShuffle => PbDispatcherType::NoShuffle,
474        }
475    }
476}