risingwave_meta_model/
lib.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16
17pub use risingwave_common::id::*;
18use risingwave_pb::catalog::{PbCreateType, PbStreamJobStatus};
19use risingwave_pb::meta::table_fragments::PbState as PbStreamJobState;
20use risingwave_pb::secret::PbSecretRef;
21use risingwave_pb::stream_plan::{PbDispatcherType, PbStreamNode};
22use sea_orm::entity::prelude::*;
23use sea_orm::{DeriveActiveEnum, EnumIter, FromJsonQueryResult, Value};
24use serde::{Deserialize, Serialize};
25
26pub mod prelude;
27
28pub mod catalog_version;
29pub mod cdc_table_snapshot_split;
30pub mod cluster;
31pub mod compaction_config;
32pub mod compaction_status;
33pub mod compaction_task;
34pub mod connection;
35pub mod database;
36pub mod exactly_once_iceberg_sink;
37pub mod fragment;
38pub mod fragment_relation;
39pub mod fragment_splits;
40pub mod function;
41pub mod hummock_epoch_to_version;
42pub mod hummock_gc_history;
43pub mod hummock_pinned_snapshot;
44pub mod hummock_pinned_version;
45pub mod hummock_sequence;
46pub mod hummock_sstable_info;
47pub mod hummock_time_travel_delta;
48pub mod hummock_time_travel_version;
49pub mod hummock_version_delta;
50pub mod hummock_version_stats;
51pub mod iceberg_namespace_properties;
52pub mod iceberg_tables;
53pub mod index;
54pub mod object;
55pub mod object_dependency;
56pub mod pending_sink_state;
57pub mod refresh_job;
58pub mod schema;
59pub mod secret;
60pub mod serde_seaql_migration;
61pub mod session_parameter;
62pub mod sink;
63pub mod source;
64pub mod streaming_job;
65pub mod subscription;
66pub mod system_parameter;
67pub mod table;
68pub mod user;
69pub mod user_default_privilege;
70pub mod user_privilege;
71pub mod view;
72pub mod worker;
73pub mod worker_property;
74
75pub type TransactionId = i32;
76
77pub type PrivilegeId = i32;
78pub type DefaultPrivilegeId = i32;
79
80pub use risingwave_pb::id::{CompactionGroupId, HummockSstableObjectId, HummockVersionId};
81pub type Epoch = i64;
82pub type CompactionTaskId = i64;
83
84#[derive(Clone, Copy, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
85#[sea_orm(rs_type = "String", db_type = "string(None)")]
86pub enum JobStatus {
87    #[sea_orm(string_value = "INITIAL")]
88    Initial,
89    #[sea_orm(string_value = "CREATING")]
90    Creating,
91    #[sea_orm(string_value = "CREATED")]
92    Created,
93}
94
95impl From<JobStatus> for PbStreamJobStatus {
96    fn from(job_status: JobStatus) -> Self {
97        match job_status {
98            JobStatus::Initial => Self::Unspecified,
99            JobStatus::Creating => Self::Creating,
100            JobStatus::Created => Self::Created,
101        }
102    }
103}
104
105// todo: deprecate job status in catalog and unify with this one.
106impl From<JobStatus> for PbStreamJobState {
107    fn from(status: JobStatus) -> Self {
108        match status {
109            JobStatus::Initial => PbStreamJobState::Initial,
110            JobStatus::Creating => PbStreamJobState::Creating,
111            JobStatus::Created => PbStreamJobState::Created,
112        }
113    }
114}
115
116#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
117#[sea_orm(rs_type = "String", db_type = "string(None)")]
118pub enum CreateType {
119    #[sea_orm(string_value = "BACKGROUND")]
120    Background,
121    #[sea_orm(string_value = "FOREGROUND")]
122    Foreground,
123}
124
125impl From<CreateType> for PbCreateType {
126    fn from(create_type: CreateType) -> Self {
127        match create_type {
128            CreateType::Background => Self::Background,
129            CreateType::Foreground => Self::Foreground,
130        }
131    }
132}
133
134impl From<PbCreateType> for CreateType {
135    fn from(create_type: PbCreateType) -> Self {
136        match create_type {
137            PbCreateType::Background => Self::Background,
138            PbCreateType::Foreground => Self::Foreground,
139            PbCreateType::Unspecified => unreachable!("Unspecified create type"),
140        }
141    }
142}
143
144impl CreateType {
145    pub fn as_str(&self) -> &'static str {
146        match self {
147            CreateType::Background => "BACKGROUND",
148            CreateType::Foreground => "FOREGROUND",
149        }
150    }
151}
152
153/// Defines struct with a single pb field that derives `FromJsonQueryResult`, it will helps to map json value stored in database to Pb struct.
154#[macro_export]
155macro_rules! derive_from_json_struct {
156    ($struct_name:ident, $field_type:ty) => {
157        #[derive(Clone, Debug, PartialEq, FromJsonQueryResult, Serialize, Deserialize, Default)]
158        pub struct $struct_name(pub $field_type);
159        impl Eq for $struct_name {}
160        impl From<$field_type> for $struct_name {
161            fn from(value: $field_type) -> Self {
162                Self(value)
163            }
164        }
165
166        impl $struct_name {
167            pub fn into_inner(self) -> $field_type {
168                self.0
169            }
170
171            pub fn inner_ref(&self) -> &$field_type {
172                &self.0
173            }
174        }
175    };
176}
177
178/// Defines struct with a byte array that derives `DeriveValueType`, it will helps to map blob stored in database to Pb struct.
179macro_rules! derive_from_blob {
180    ($struct_name:ident, $field_type:ty) => {
181        #[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, DeriveValueType)]
182        pub struct $struct_name(#[sea_orm] Vec<u8>);
183
184        impl $struct_name {
185            pub fn to_protobuf(&self) -> $field_type {
186                prost::Message::decode(self.0.as_slice()).unwrap()
187            }
188
189            fn from_protobuf(val: &$field_type) -> Self {
190                Self(prost::Message::encode_to_vec(val))
191            }
192        }
193
194        impl sea_orm::sea_query::Nullable for $struct_name {
195            fn null() -> Value {
196                Value::Bytes(None)
197            }
198        }
199
200        impl From<&$field_type> for $struct_name {
201            fn from(value: &$field_type) -> Self {
202                Self::from_protobuf(value)
203            }
204        }
205
206        impl std::fmt::Debug for $struct_name {
207            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
208                self.to_protobuf().fmt(f)
209            }
210        }
211
212        impl Default for $struct_name {
213            fn default() -> Self {
214                Self::from_protobuf(&<$field_type>::default())
215            }
216        }
217    };
218}
219
220/// Defines struct with a byte array that derives `DeriveValueType`, it will helps to map blob stored in database to Pb struct array.
221macro_rules! derive_array_from_blob {
222    ($struct_name:ident, $field_type:ty, $field_array_name:ident) => {
223        #[derive(Clone, PartialEq, Eq, DeriveValueType, serde::Deserialize, serde::Serialize)]
224        pub struct $struct_name(#[sea_orm] Vec<u8>);
225
226        #[derive(Clone, PartialEq, ::prost::Message)]
227        pub struct $field_array_name {
228            #[prost(message, repeated, tag = "1")]
229            inner: Vec<$field_type>,
230        }
231        impl Eq for $field_array_name {}
232
233        impl $struct_name {
234            pub fn to_protobuf(&self) -> Vec<$field_type> {
235                let data: $field_array_name = prost::Message::decode(self.0.as_slice()).unwrap();
236                data.inner
237            }
238
239            fn from_protobuf(val: Vec<$field_type>) -> Self {
240                Self(prost::Message::encode_to_vec(&$field_array_name {
241                    inner: val,
242                }))
243            }
244        }
245
246        impl From<Vec<$field_type>> for $struct_name {
247            fn from(value: Vec<$field_type>) -> Self {
248                Self::from_protobuf(value)
249            }
250        }
251
252        impl std::fmt::Debug for $struct_name {
253            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
254                self.to_protobuf().fmt(f)
255            }
256        }
257
258        impl Default for $struct_name {
259            fn default() -> Self {
260                Self(vec![])
261            }
262        }
263
264        impl sea_orm::sea_query::Nullable for $struct_name {
265            fn null() -> Value {
266                Value::Bytes(None)
267            }
268        }
269    };
270}
271
272macro_rules! derive_btreemap_from_blob {
273    ($struct_name:ident, $key_type:ty, $value_type:ty, $field_type:ident) => {
274        #[derive(Clone, PartialEq, Eq, DeriveValueType, serde::Deserialize, serde::Serialize)]
275        pub struct $struct_name(#[sea_orm] Vec<u8>);
276
277        #[derive(Clone, PartialEq, ::prost::Message)]
278        pub struct $field_type {
279            #[prost(btree_map = "string, message")]
280            inner: BTreeMap<$key_type, $value_type>,
281        }
282        impl Eq for $field_type {}
283
284        impl $struct_name {
285            pub fn to_protobuf(&self) -> BTreeMap<$key_type, $value_type> {
286                let data: $field_type = prost::Message::decode(self.0.as_slice()).unwrap();
287                data.inner
288            }
289
290            fn from_protobuf(val: BTreeMap<$key_type, $value_type>) -> Self {
291                Self(prost::Message::encode_to_vec(&$field_type { inner: val }))
292            }
293        }
294
295        impl From<BTreeMap<$key_type, $value_type>> for $struct_name {
296            fn from(value: BTreeMap<$key_type, $value_type>) -> Self {
297                Self::from_protobuf(value)
298            }
299        }
300
301        impl std::fmt::Debug for $struct_name {
302            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
303                self.to_protobuf().fmt(f)
304            }
305        }
306
307        impl Default for $struct_name {
308            fn default() -> Self {
309                Self(vec![])
310            }
311        }
312
313        impl sea_orm::sea_query::Nullable for $struct_name {
314            fn null() -> Value {
315                Value::Bytes(None)
316            }
317        }
318    };
319}
320
321pub(crate) use {derive_array_from_blob, derive_from_blob};
322
323derive_from_json_struct!(TableIdArray, Vec<TableId>);
324
325derive_from_json_struct!(I32Array, Vec<i32>);
326
327impl From<Vec<u32>> for I32Array {
328    fn from(value: Vec<u32>) -> Self {
329        Self(value.into_iter().map(|id| id as _).collect())
330    }
331}
332
333impl I32Array {
334    pub fn into_u32_array(self) -> Vec<u32> {
335        self.0.into_iter().map(|id| id as _).collect()
336    }
337}
338
339derive_btreemap_from_blob!(SecretRef, String, PbSecretRef, PbSecretRefMap);
340
341derive_from_blob!(StreamNode, PbStreamNode);
342derive_from_blob!(DataType, risingwave_pb::data::PbDataType);
343derive_array_from_blob!(
344    DataTypeArray,
345    risingwave_pb::data::PbDataType,
346    PbDataTypeArray
347);
348derive_array_from_blob!(
349    FieldArray,
350    risingwave_pb::plan_common::PbField,
351    PbFieldArray
352);
353derive_from_json_struct!(Property, BTreeMap<String, String>);
354derive_from_blob!(ColumnCatalog, risingwave_pb::plan_common::PbColumnCatalog);
355derive_array_from_blob!(
356    ColumnCatalogArray,
357    risingwave_pb::plan_common::PbColumnCatalog,
358    PbColumnCatalogArray
359);
360derive_from_blob!(StreamSourceInfo, risingwave_pb::catalog::PbStreamSourceInfo);
361derive_from_blob!(
362    WebhookSourceInfo,
363    risingwave_pb::catalog::PbWebhookSourceInfo
364);
365derive_from_blob!(WatermarkDesc, risingwave_pb::catalog::PbWatermarkDesc);
366derive_array_from_blob!(
367    WatermarkDescArray,
368    risingwave_pb::catalog::PbWatermarkDesc,
369    PbWatermarkDescArray
370);
371derive_array_from_blob!(
372    ExprNodeArray,
373    risingwave_pb::expr::PbExprNode,
374    PbExprNodeArray
375);
376derive_array_from_blob!(
377    ColumnOrderArray,
378    risingwave_pb::common::PbColumnOrder,
379    PbColumnOrderArray
380);
381derive_array_from_blob!(
382    IndexColumnPropertiesArray,
383    risingwave_pb::catalog::PbIndexColumnProperties,
384    PbIndexColumnPropertiesArray
385);
386derive_from_blob!(SinkFormatDesc, risingwave_pb::catalog::PbSinkFormatDesc);
387derive_from_blob!(Cardinality, risingwave_pb::plan_common::PbCardinality);
388derive_from_blob!(TableVersion, risingwave_pb::catalog::table::PbTableVersion);
389derive_from_blob!(
390    PrivateLinkService,
391    risingwave_pb::catalog::connection::PbPrivateLinkService
392);
393derive_from_blob!(ConnectionParams, risingwave_pb::catalog::ConnectionParams);
394derive_from_blob!(AuthInfo, risingwave_pb::user::PbAuthInfo);
395
396derive_from_blob!(ConnectorSplits, risingwave_pb::source::ConnectorSplits);
397derive_from_blob!(VnodeBitmap, risingwave_pb::common::Buffer);
398derive_from_blob!(ActorMapping, risingwave_pb::stream_plan::PbActorMapping);
399derive_from_blob!(ExprContext, risingwave_pb::plan_common::PbExprContext);
400derive_from_blob!(
401    SourceRefreshMode,
402    risingwave_pb::plan_common::PbSourceRefreshMode
403);
404
405derive_from_blob!(
406    SinkSchemachange,
407    risingwave_pb::stream_plan::PbSinkSchemaChange
408);
409
410derive_array_from_blob!(
411    TypePairArray,
412    risingwave_pb::stream_plan::dispatch_output_mapping::TypePair,
413    PbTypePairArray
414);
415
416derive_array_from_blob!(
417    HummockVersionDeltaArray,
418    risingwave_pb::hummock::PbHummockVersionDelta,
419    PbHummockVersionDeltaArray
420);
421
422#[derive(Clone, Debug, PartialEq, FromJsonQueryResult, Serialize, Deserialize)]
423pub enum StreamingParallelism {
424    Adaptive,
425    Fixed(usize),
426    Custom,
427}
428
429impl Eq for StreamingParallelism {}
430
431#[derive(
432    Hash, Copy, Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize,
433)]
434#[sea_orm(rs_type = "String", db_type = "string(None)")]
435pub enum DispatcherType {
436    #[sea_orm(string_value = "HASH")]
437    Hash,
438    #[sea_orm(string_value = "BROADCAST")]
439    Broadcast,
440    #[sea_orm(string_value = "SIMPLE")]
441    Simple,
442    #[sea_orm(string_value = "NO_SHUFFLE")]
443    NoShuffle,
444}
445
446impl From<PbDispatcherType> for DispatcherType {
447    fn from(val: PbDispatcherType) -> Self {
448        match val {
449            PbDispatcherType::Unspecified => unreachable!(),
450            PbDispatcherType::Hash => DispatcherType::Hash,
451            PbDispatcherType::Broadcast => DispatcherType::Broadcast,
452            PbDispatcherType::Simple => DispatcherType::Simple,
453            PbDispatcherType::NoShuffle => DispatcherType::NoShuffle,
454        }
455    }
456}
457
458impl From<DispatcherType> for PbDispatcherType {
459    fn from(val: DispatcherType) -> Self {
460        match val {
461            DispatcherType::Hash => PbDispatcherType::Hash,
462            DispatcherType::Broadcast => PbDispatcherType::Broadcast,
463            DispatcherType::Simple => PbDispatcherType::Simple,
464            DispatcherType::NoShuffle => PbDispatcherType::NoShuffle,
465        }
466    }
467}