risingwave_meta_model/
lib.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16
17pub use risingwave_common::id::*;
18use risingwave_pb::catalog::{PbCreateType, PbStreamJobStatus};
19use risingwave_pb::meta::table_fragments::PbState as PbStreamJobState;
20use risingwave_pb::secret::PbSecretRef;
21use risingwave_pb::stream_plan::{PbDispatcherType, PbStreamNode};
22use sea_orm::entity::prelude::*;
23use sea_orm::{DeriveActiveEnum, EnumIter, FromJsonQueryResult, Value};
24use serde::{Deserialize, Serialize};
25
26pub mod prelude;
27
28pub mod catalog_version;
29pub mod cdc_table_snapshot_split;
30pub mod cluster;
31pub mod compaction_config;
32pub mod compaction_status;
33pub mod compaction_task;
34pub mod connection;
35pub mod database;
36pub mod exactly_once_iceberg_sink;
37pub mod fragment;
38pub mod fragment_relation;
39pub mod fragment_splits;
40pub mod function;
41pub mod hummock_epoch_to_version;
42pub mod hummock_gc_history;
43pub mod hummock_pinned_snapshot;
44pub mod hummock_pinned_version;
45pub mod hummock_sequence;
46pub mod hummock_sstable_info;
47pub mod hummock_table_change_log;
48pub mod hummock_time_travel_delta;
49pub mod hummock_time_travel_version;
50pub mod hummock_version_delta;
51pub mod hummock_version_stats;
52pub mod iceberg_namespace_properties;
53pub mod iceberg_tables;
54pub mod index;
55pub mod object;
56pub mod object_dependency;
57pub mod pending_sink_state;
58pub mod refresh_job;
59pub mod schema;
60pub mod secret;
61pub mod serde_seaql_migration;
62pub mod session_parameter;
63pub mod sink;
64pub mod source;
65pub mod streaming_job;
66pub mod subscription;
67pub mod system_parameter;
68pub mod table;
69pub mod user;
70pub mod user_default_privilege;
71pub mod user_privilege;
72pub mod view;
73pub mod worker;
74pub mod worker_property;
75
76#[macro_export]
77macro_rules! for_all_meta_model_entities {
78    ($macro:ident) => {
79        $macro! {
80            catalog_version,
81            cdc_table_snapshot_split,
82            cluster,
83            compaction_config,
84            compaction_status,
85            compaction_task,
86            connection,
87            database,
88            exactly_once_iceberg_sink,
89            fragment,
90            fragment_relation,
91            fragment_splits,
92            function,
93            hummock_epoch_to_version,
94            hummock_gc_history,
95            hummock_pinned_snapshot,
96            hummock_pinned_version,
97            hummock_sequence,
98            hummock_sstable_info,
99            hummock_table_change_log,
100            hummock_time_travel_delta,
101            hummock_time_travel_version,
102            hummock_version_delta,
103            hummock_version_stats,
104            iceberg_namespace_properties,
105            iceberg_tables,
106            index,
107            object,
108            object_dependency,
109            pending_sink_state,
110            refresh_job,
111            schema,
112            secret,
113            serde_seaql_migration,
114            session_parameter,
115            sink,
116            source,
117            streaming_job,
118            subscription,
119            system_parameter,
120            table,
121            user,
122            user_default_privilege,
123            user_privilege,
124            view,
125            worker,
126            worker_property
127        }
128    };
129}
130
131pub type TransactionId = i32;
132
133pub type PrivilegeId = i32;
134pub type DefaultPrivilegeId = i32;
135
136pub use risingwave_pb::id::{CompactionGroupId, HummockSstableObjectId, HummockVersionId};
137pub type Epoch = i64;
138pub type CompactionTaskId = i64;
139
140#[derive(Clone, Copy, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
141#[sea_orm(rs_type = "String", db_type = "string(None)")]
142pub enum JobStatus {
143    #[sea_orm(string_value = "INITIAL")]
144    Initial,
145    #[sea_orm(string_value = "CREATING")]
146    Creating,
147    #[sea_orm(string_value = "CREATED")]
148    Created,
149}
150
151impl From<JobStatus> for PbStreamJobStatus {
152    fn from(job_status: JobStatus) -> Self {
153        match job_status {
154            JobStatus::Initial => Self::Unspecified,
155            JobStatus::Creating => Self::Creating,
156            JobStatus::Created => Self::Created,
157        }
158    }
159}
160
161// todo: deprecate job status in catalog and unify with this one.
162impl From<JobStatus> for PbStreamJobState {
163    fn from(status: JobStatus) -> Self {
164        match status {
165            JobStatus::Initial => PbStreamJobState::Initial,
166            JobStatus::Creating => PbStreamJobState::Creating,
167            JobStatus::Created => PbStreamJobState::Created,
168        }
169    }
170}
171
172#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
173#[sea_orm(rs_type = "String", db_type = "string(None)")]
174pub enum CreateType {
175    #[sea_orm(string_value = "BACKGROUND")]
176    Background,
177    #[sea_orm(string_value = "FOREGROUND")]
178    Foreground,
179}
180
181impl From<CreateType> for PbCreateType {
182    fn from(create_type: CreateType) -> Self {
183        match create_type {
184            CreateType::Background => Self::Background,
185            CreateType::Foreground => Self::Foreground,
186        }
187    }
188}
189
190impl From<PbCreateType> for CreateType {
191    fn from(create_type: PbCreateType) -> Self {
192        match create_type {
193            PbCreateType::Background => Self::Background,
194            PbCreateType::Foreground => Self::Foreground,
195            PbCreateType::Unspecified => unreachable!("Unspecified create type"),
196        }
197    }
198}
199
200impl CreateType {
201    pub fn as_str(&self) -> &'static str {
202        match self {
203            CreateType::Background => "BACKGROUND",
204            CreateType::Foreground => "FOREGROUND",
205        }
206    }
207}
208
209/// Defines struct with a single pb field that derives `FromJsonQueryResult`, it will helps to map json value stored in database to Pb struct.
210#[macro_export]
211macro_rules! derive_from_json_struct {
212    ($struct_name:ident, $field_type:ty) => {
213        #[derive(Clone, Debug, PartialEq, FromJsonQueryResult, Serialize, Deserialize, Default)]
214        pub struct $struct_name(pub $field_type);
215        impl Eq for $struct_name {}
216        impl From<$field_type> for $struct_name {
217            fn from(value: $field_type) -> Self {
218                Self(value)
219            }
220        }
221
222        impl $struct_name {
223            pub fn into_inner(self) -> $field_type {
224                self.0
225            }
226
227            pub fn inner_ref(&self) -> &$field_type {
228                &self.0
229            }
230        }
231    };
232}
233
234/// Defines struct with a byte array that derives `DeriveValueType`, it will helps to map blob stored in database to Pb struct.
235macro_rules! derive_from_blob {
236    ($struct_name:ident, $field_type:ty) => {
237        #[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, DeriveValueType)]
238        pub struct $struct_name(#[sea_orm] Vec<u8>);
239
240        impl $struct_name {
241            pub fn to_protobuf(&self) -> $field_type {
242                prost::Message::decode(self.0.as_slice()).unwrap()
243            }
244
245            fn from_protobuf(val: &$field_type) -> Self {
246                Self(prost::Message::encode_to_vec(val))
247            }
248        }
249
250        impl sea_orm::sea_query::Nullable for $struct_name {
251            fn null() -> Value {
252                Value::Bytes(None)
253            }
254        }
255
256        impl From<&$field_type> for $struct_name {
257            fn from(value: &$field_type) -> Self {
258                Self::from_protobuf(value)
259            }
260        }
261
262        impl std::fmt::Debug for $struct_name {
263            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
264                self.to_protobuf().fmt(f)
265            }
266        }
267
268        impl Default for $struct_name {
269            fn default() -> Self {
270                Self::from_protobuf(&<$field_type>::default())
271            }
272        }
273    };
274}
275
276/// Defines struct with a byte array that derives `DeriveValueType`, it will helps to map blob stored in database to Pb struct array.
277macro_rules! derive_array_from_blob {
278    ($struct_name:ident, $field_type:ty, $field_array_name:ident) => {
279        #[derive(Clone, PartialEq, Eq, DeriveValueType, serde::Deserialize, serde::Serialize)]
280        pub struct $struct_name(#[sea_orm] Vec<u8>);
281
282        #[derive(Clone, PartialEq, ::prost::Message)]
283        pub struct $field_array_name {
284            #[prost(message, repeated, tag = "1")]
285            inner: Vec<$field_type>,
286        }
287        impl Eq for $field_array_name {}
288
289        impl $struct_name {
290            pub fn to_protobuf(&self) -> Vec<$field_type> {
291                let data: $field_array_name = prost::Message::decode(self.0.as_slice()).unwrap();
292                data.inner
293            }
294
295            fn from_protobuf(val: Vec<$field_type>) -> Self {
296                Self(prost::Message::encode_to_vec(&$field_array_name {
297                    inner: val,
298                }))
299            }
300        }
301
302        impl From<Vec<$field_type>> for $struct_name {
303            fn from(value: Vec<$field_type>) -> Self {
304                Self::from_protobuf(value)
305            }
306        }
307
308        impl std::fmt::Debug for $struct_name {
309            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
310                self.to_protobuf().fmt(f)
311            }
312        }
313
314        impl Default for $struct_name {
315            fn default() -> Self {
316                Self(vec![])
317            }
318        }
319
320        impl sea_orm::sea_query::Nullable for $struct_name {
321            fn null() -> Value {
322                Value::Bytes(None)
323            }
324        }
325    };
326}
327
328macro_rules! derive_btreemap_from_blob {
329    ($struct_name:ident, $key_type:ty, $value_type:ty, $field_type:ident) => {
330        #[derive(Clone, PartialEq, Eq, DeriveValueType, serde::Deserialize, serde::Serialize)]
331        pub struct $struct_name(#[sea_orm] Vec<u8>);
332
333        #[derive(Clone, PartialEq, ::prost::Message)]
334        pub struct $field_type {
335            #[prost(btree_map = "string, message")]
336            inner: BTreeMap<$key_type, $value_type>,
337        }
338        impl Eq for $field_type {}
339
340        impl $struct_name {
341            pub fn to_protobuf(&self) -> BTreeMap<$key_type, $value_type> {
342                let data: $field_type = prost::Message::decode(self.0.as_slice()).unwrap();
343                data.inner
344            }
345
346            fn from_protobuf(val: BTreeMap<$key_type, $value_type>) -> Self {
347                Self(prost::Message::encode_to_vec(&$field_type { inner: val }))
348            }
349        }
350
351        impl From<BTreeMap<$key_type, $value_type>> for $struct_name {
352            fn from(value: BTreeMap<$key_type, $value_type>) -> Self {
353                Self::from_protobuf(value)
354            }
355        }
356
357        impl std::fmt::Debug for $struct_name {
358            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
359                self.to_protobuf().fmt(f)
360            }
361        }
362
363        impl Default for $struct_name {
364            fn default() -> Self {
365                Self(vec![])
366            }
367        }
368
369        impl sea_orm::sea_query::Nullable for $struct_name {
370            fn null() -> Value {
371                Value::Bytes(None)
372            }
373        }
374    };
375}
376
377pub(crate) use {derive_array_from_blob, derive_from_blob};
378
379derive_from_json_struct!(TableIdArray, Vec<TableId>);
380
381derive_from_json_struct!(EpochArray, Vec<Epoch>);
382
383derive_from_json_struct!(I32Array, Vec<i32>);
384
385impl From<Vec<u32>> for I32Array {
386    fn from(value: Vec<u32>) -> Self {
387        Self(value.into_iter().map(|id| id as _).collect())
388    }
389}
390
391impl I32Array {
392    pub fn into_u32_array(self) -> Vec<u32> {
393        self.0.into_iter().map(|id| id as _).collect()
394    }
395}
396
397derive_btreemap_from_blob!(SecretRef, String, PbSecretRef, PbSecretRefMap);
398
399derive_from_blob!(StreamNode, PbStreamNode);
400derive_from_blob!(DataType, risingwave_pb::data::PbDataType);
401derive_array_from_blob!(
402    DataTypeArray,
403    risingwave_pb::data::PbDataType,
404    PbDataTypeArray
405);
406derive_array_from_blob!(
407    FieldArray,
408    risingwave_pb::plan_common::PbField,
409    PbFieldArray
410);
411derive_from_json_struct!(Property, BTreeMap<String, String>);
412derive_from_blob!(ColumnCatalog, risingwave_pb::plan_common::PbColumnCatalog);
413derive_array_from_blob!(
414    ColumnCatalogArray,
415    risingwave_pb::plan_common::PbColumnCatalog,
416    PbColumnCatalogArray
417);
418derive_from_blob!(StreamSourceInfo, risingwave_pb::catalog::PbStreamSourceInfo);
419derive_from_blob!(
420    WebhookSourceInfo,
421    risingwave_pb::catalog::PbWebhookSourceInfo
422);
423derive_from_blob!(WatermarkDesc, risingwave_pb::catalog::PbWatermarkDesc);
424derive_array_from_blob!(
425    WatermarkDescArray,
426    risingwave_pb::catalog::PbWatermarkDesc,
427    PbWatermarkDescArray
428);
429derive_array_from_blob!(
430    ExprNodeArray,
431    risingwave_pb::expr::PbExprNode,
432    PbExprNodeArray
433);
434derive_array_from_blob!(
435    ColumnOrderArray,
436    risingwave_pb::common::PbColumnOrder,
437    PbColumnOrderArray
438);
439derive_array_from_blob!(
440    IndexColumnPropertiesArray,
441    risingwave_pb::catalog::PbIndexColumnProperties,
442    PbIndexColumnPropertiesArray
443);
444derive_from_blob!(SinkFormatDesc, risingwave_pb::catalog::PbSinkFormatDesc);
445derive_from_blob!(Cardinality, risingwave_pb::plan_common::PbCardinality);
446derive_from_blob!(TableVersion, risingwave_pb::catalog::table::PbTableVersion);
447derive_from_blob!(
448    PrivateLinkService,
449    risingwave_pb::catalog::connection::PbPrivateLinkService
450);
451derive_from_blob!(ConnectionParams, risingwave_pb::catalog::ConnectionParams);
452derive_from_blob!(AuthInfo, risingwave_pb::user::PbAuthInfo);
453
454derive_from_blob!(ConnectorSplits, risingwave_pb::source::ConnectorSplits);
455derive_from_blob!(VnodeBitmap, risingwave_pb::common::Buffer);
456derive_from_blob!(ActorMapping, risingwave_pb::stream_plan::PbActorMapping);
457derive_from_blob!(ExprContext, risingwave_pb::plan_common::PbExprContext);
458derive_from_blob!(
459    SourceRefreshMode,
460    risingwave_pb::plan_common::PbSourceRefreshMode
461);
462
463derive_from_blob!(
464    SinkSchemachange,
465    risingwave_pb::stream_plan::PbSinkSchemaChange
466);
467
468derive_array_from_blob!(
469    TypePairArray,
470    risingwave_pb::stream_plan::dispatch_output_mapping::TypePair,
471    PbTypePairArray
472);
473
474derive_array_from_blob!(
475    HummockVersionDeltaArray,
476    risingwave_pb::hummock::PbHummockVersionDelta,
477    PbHummockVersionDeltaArray
478);
479
480derive_array_from_blob!(
481    SstableInfoArray,
482    risingwave_pb::hummock::PbSstableInfo,
483    PbSstableInfoArray
484);
485
486#[derive(Clone, Debug, PartialEq, FromJsonQueryResult, Serialize, Deserialize)]
487pub enum StreamingParallelism {
488    Adaptive,
489    Fixed(usize),
490    Custom,
491}
492
493impl Eq for StreamingParallelism {}
494
495#[derive(
496    Hash, Copy, Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize,
497)]
498#[sea_orm(rs_type = "String", db_type = "string(None)")]
499pub enum DispatcherType {
500    #[sea_orm(string_value = "HASH")]
501    Hash,
502    #[sea_orm(string_value = "BROADCAST")]
503    Broadcast,
504    #[sea_orm(string_value = "SIMPLE")]
505    Simple,
506    #[sea_orm(string_value = "NO_SHUFFLE")]
507    NoShuffle,
508}
509
510impl From<PbDispatcherType> for DispatcherType {
511    fn from(val: PbDispatcherType) -> Self {
512        match val {
513            PbDispatcherType::Unspecified => unreachable!(),
514            PbDispatcherType::Hash => DispatcherType::Hash,
515            PbDispatcherType::Broadcast => DispatcherType::Broadcast,
516            PbDispatcherType::Simple => DispatcherType::Simple,
517            PbDispatcherType::NoShuffle => DispatcherType::NoShuffle,
518        }
519    }
520}
521
522impl From<DispatcherType> for PbDispatcherType {
523    fn from(val: DispatcherType) -> Self {
524        match val {
525            DispatcherType::Hash => PbDispatcherType::Hash,
526            DispatcherType::Broadcast => PbDispatcherType::Broadcast,
527            DispatcherType::Simple => PbDispatcherType::Simple,
528            DispatcherType::NoShuffle => PbDispatcherType::NoShuffle,
529        }
530    }
531}