risingwave_meta_model/
lib.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16
17use risingwave_pb::catalog::{PbCreateType, PbStreamJobStatus};
18use risingwave_pb::meta::table_fragments::PbState as PbStreamJobState;
19use risingwave_pb::secret::PbSecretRef;
20use risingwave_pb::stream_plan::{PbDispatcherType, PbStreamNode};
21use sea_orm::entity::prelude::*;
22use sea_orm::{DeriveActiveEnum, EnumIter, FromJsonQueryResult};
23use serde::{Deserialize, Serialize};
24
25pub mod prelude;
26
27pub mod actor;
28pub mod catalog_version;
29pub mod cluster;
30pub mod compaction_config;
31pub mod compaction_status;
32pub mod compaction_task;
33pub mod connection;
34pub mod database;
35pub mod exactly_once_iceberg_sink;
36pub mod fragment;
37pub mod fragment_relation;
38pub mod function;
39pub mod hummock_epoch_to_version;
40pub mod hummock_gc_history;
41pub mod hummock_pinned_snapshot;
42pub mod hummock_pinned_version;
43pub mod hummock_sequence;
44pub mod hummock_sstable_info;
45pub mod hummock_time_travel_delta;
46pub mod hummock_time_travel_version;
47pub mod hummock_version_delta;
48pub mod hummock_version_stats;
49pub mod iceberg_namespace_properties;
50pub mod iceberg_tables;
51pub mod index;
52pub mod object;
53pub mod object_dependency;
54pub mod schema;
55pub mod secret;
56pub mod serde_seaql_migration;
57pub mod session_parameter;
58pub mod sink;
59pub mod source;
60pub mod streaming_job;
61pub mod subscription;
62pub mod system_parameter;
63pub mod table;
64pub mod user;
65pub mod user_privilege;
66pub mod view;
67pub mod worker;
68pub mod worker_property;
69
70pub type WorkerId = i32;
71
72pub type TransactionId = i32;
73
74pub type ObjectId = i32;
75pub type DatabaseId = ObjectId;
76pub type SchemaId = ObjectId;
77pub type TableId = ObjectId;
78pub type SourceId = ObjectId;
79pub type SinkId = ObjectId;
80pub type SubscriptionId = ObjectId;
81pub type IndexId = ObjectId;
82pub type ViewId = ObjectId;
83pub type FunctionId = ObjectId;
84pub type ConnectionId = ObjectId;
85pub type SecretId = ObjectId;
86pub type UserId = i32;
87pub type PrivilegeId = i32;
88
89pub type HummockVersionId = i64;
90pub type Epoch = i64;
91pub type CompactionGroupId = i64;
92pub type CompactionTaskId = i64;
93pub type HummockSstableObjectId = i64;
94
95pub type FragmentId = i32;
96pub type ActorId = i32;
97
98#[derive(Clone, Copy, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
99#[sea_orm(rs_type = "String", db_type = "string(None)")]
100pub enum JobStatus {
101    #[sea_orm(string_value = "INITIAL")]
102    Initial,
103    #[sea_orm(string_value = "CREATING")]
104    Creating,
105    #[sea_orm(string_value = "CREATED")]
106    Created,
107}
108
109impl From<JobStatus> for PbStreamJobStatus {
110    fn from(job_status: JobStatus) -> Self {
111        match job_status {
112            JobStatus::Initial => Self::Unspecified,
113            JobStatus::Creating => Self::Creating,
114            JobStatus::Created => Self::Created,
115        }
116    }
117}
118
119// todo: deprecate job status in catalog and unify with this one.
120impl From<JobStatus> for PbStreamJobState {
121    fn from(status: JobStatus) -> Self {
122        match status {
123            JobStatus::Initial => PbStreamJobState::Initial,
124            JobStatus::Creating => PbStreamJobState::Creating,
125            JobStatus::Created => PbStreamJobState::Created,
126        }
127    }
128}
129
130#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
131#[sea_orm(rs_type = "String", db_type = "string(None)")]
132pub enum CreateType {
133    #[sea_orm(string_value = "BACKGROUND")]
134    Background,
135    #[sea_orm(string_value = "FOREGROUND")]
136    Foreground,
137}
138
139impl From<CreateType> for PbCreateType {
140    fn from(create_type: CreateType) -> Self {
141        match create_type {
142            CreateType::Background => Self::Background,
143            CreateType::Foreground => Self::Foreground,
144        }
145    }
146}
147
148impl From<PbCreateType> for CreateType {
149    fn from(create_type: PbCreateType) -> Self {
150        match create_type {
151            PbCreateType::Background => Self::Background,
152            PbCreateType::Foreground => Self::Foreground,
153            PbCreateType::Unspecified => unreachable!("Unspecified create type"),
154        }
155    }
156}
157
158/// Defines struct with a single pb field that derives `FromJsonQueryResult`, it will helps to map json value stored in database to Pb struct.
159macro_rules! derive_from_json_struct {
160    ($struct_name:ident, $field_type:ty) => {
161        #[derive(Clone, Debug, PartialEq, FromJsonQueryResult, Serialize, Deserialize, Default)]
162        pub struct $struct_name(pub $field_type);
163        impl Eq for $struct_name {}
164        impl From<$field_type> for $struct_name {
165            fn from(value: $field_type) -> Self {
166                Self(value)
167            }
168        }
169
170        impl $struct_name {
171            pub fn into_inner(self) -> $field_type {
172                self.0
173            }
174
175            pub fn inner_ref(&self) -> &$field_type {
176                &self.0
177            }
178        }
179    };
180}
181
182/// Defines struct with a byte array that derives `DeriveValueType`, it will helps to map blob stored in database to Pb struct.
183macro_rules! derive_from_blob {
184    ($struct_name:ident, $field_type:ty) => {
185        #[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, DeriveValueType)]
186        pub struct $struct_name(#[sea_orm] Vec<u8>);
187
188        impl $struct_name {
189            pub fn to_protobuf(&self) -> $field_type {
190                prost::Message::decode(self.0.as_slice()).unwrap()
191            }
192
193            fn from_protobuf(val: &$field_type) -> Self {
194                Self(prost::Message::encode_to_vec(val))
195            }
196        }
197
198        impl sea_orm::sea_query::Nullable for $struct_name {
199            fn null() -> Value {
200                Value::Bytes(None)
201            }
202        }
203
204        impl From<&$field_type> for $struct_name {
205            fn from(value: &$field_type) -> Self {
206                Self::from_protobuf(value)
207            }
208        }
209
210        impl std::fmt::Debug for $struct_name {
211            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
212                self.to_protobuf().fmt(f)
213            }
214        }
215
216        impl Default for $struct_name {
217            fn default() -> Self {
218                Self::from_protobuf(&<$field_type>::default())
219            }
220        }
221    };
222}
223
224/// Defines struct with a byte array that derives `DeriveValueType`, it will helps to map blob stored in database to Pb struct array.
225macro_rules! derive_array_from_blob {
226    ($struct_name:ident, $field_type:ty, $field_array_name:ident) => {
227        #[derive(Clone, PartialEq, Eq, DeriveValueType, serde::Deserialize, serde::Serialize)]
228        pub struct $struct_name(#[sea_orm] Vec<u8>);
229
230        #[derive(Clone, PartialEq, ::prost::Message)]
231        pub struct $field_array_name {
232            #[prost(message, repeated, tag = "1")]
233            inner: Vec<$field_type>,
234        }
235        impl Eq for $field_array_name {}
236
237        impl $struct_name {
238            pub fn to_protobuf(&self) -> Vec<$field_type> {
239                let data: $field_array_name = prost::Message::decode(self.0.as_slice()).unwrap();
240                data.inner
241            }
242
243            fn from_protobuf(val: Vec<$field_type>) -> Self {
244                Self(prost::Message::encode_to_vec(&$field_array_name {
245                    inner: val,
246                }))
247            }
248        }
249
250        impl From<Vec<$field_type>> for $struct_name {
251            fn from(value: Vec<$field_type>) -> Self {
252                Self::from_protobuf(value)
253            }
254        }
255
256        impl std::fmt::Debug for $struct_name {
257            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
258                self.to_protobuf().fmt(f)
259            }
260        }
261
262        impl Default for $struct_name {
263            fn default() -> Self {
264                Self(vec![])
265            }
266        }
267
268        impl sea_orm::sea_query::Nullable for $struct_name {
269            fn null() -> Value {
270                Value::Bytes(None)
271            }
272        }
273    };
274}
275
276macro_rules! derive_btreemap_from_blob {
277    ($struct_name:ident, $key_type:ty, $value_type:ty, $field_type:ident) => {
278        #[derive(Clone, PartialEq, Eq, DeriveValueType, serde::Deserialize, serde::Serialize)]
279        pub struct $struct_name(#[sea_orm] Vec<u8>);
280
281        #[derive(Clone, PartialEq, ::prost::Message)]
282        pub struct $field_type {
283            #[prost(btree_map = "string, message")]
284            inner: BTreeMap<$key_type, $value_type>,
285        }
286        impl Eq for $field_type {}
287
288        impl $struct_name {
289            pub fn to_protobuf(&self) -> BTreeMap<$key_type, $value_type> {
290                let data: $field_type = prost::Message::decode(self.0.as_slice()).unwrap();
291                data.inner
292            }
293
294            fn from_protobuf(val: BTreeMap<$key_type, $value_type>) -> Self {
295                Self(prost::Message::encode_to_vec(&$field_type { inner: val }))
296            }
297        }
298
299        impl From<BTreeMap<$key_type, $value_type>> for $struct_name {
300            fn from(value: BTreeMap<$key_type, $value_type>) -> Self {
301                Self::from_protobuf(value)
302            }
303        }
304
305        impl std::fmt::Debug for $struct_name {
306            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
307                self.to_protobuf().fmt(f)
308            }
309        }
310
311        impl Default for $struct_name {
312            fn default() -> Self {
313                Self(vec![])
314            }
315        }
316
317        impl sea_orm::sea_query::Nullable for $struct_name {
318            fn null() -> Value {
319                Value::Bytes(None)
320            }
321        }
322    };
323}
324
325pub(crate) use {derive_array_from_blob, derive_from_blob};
326
327derive_from_json_struct!(I32Array, Vec<i32>);
328
329impl From<Vec<u32>> for I32Array {
330    fn from(value: Vec<u32>) -> Self {
331        Self(value.into_iter().map(|id| id as _).collect())
332    }
333}
334
335impl I32Array {
336    pub fn into_u32_array(self) -> Vec<u32> {
337        self.0.into_iter().map(|id| id as _).collect()
338    }
339}
340
341derive_from_json_struct!(ActorUpstreamActors, BTreeMap<FragmentId, Vec<ActorId>>);
342
343impl From<BTreeMap<u32, Vec<u32>>> for ActorUpstreamActors {
344    fn from(val: BTreeMap<u32, Vec<u32>>) -> Self {
345        let mut map = BTreeMap::new();
346        for (k, v) in val {
347            map.insert(k as _, v.into_iter().map(|a| a as _).collect());
348        }
349        Self(map)
350    }
351}
352
353derive_btreemap_from_blob!(SecretRef, String, PbSecretRef, PbSecretRefMap);
354
355derive_from_blob!(StreamNode, PbStreamNode);
356derive_from_blob!(DataType, risingwave_pb::data::PbDataType);
357derive_array_from_blob!(
358    DataTypeArray,
359    risingwave_pb::data::PbDataType,
360    PbDataTypeArray
361);
362derive_array_from_blob!(
363    FieldArray,
364    risingwave_pb::plan_common::PbField,
365    PbFieldArray
366);
367derive_from_json_struct!(Property, BTreeMap<String, String>);
368derive_from_blob!(ColumnCatalog, risingwave_pb::plan_common::PbColumnCatalog);
369derive_array_from_blob!(
370    ColumnCatalogArray,
371    risingwave_pb::plan_common::PbColumnCatalog,
372    PbColumnCatalogArray
373);
374derive_from_blob!(StreamSourceInfo, risingwave_pb::catalog::PbStreamSourceInfo);
375derive_from_blob!(
376    WebhookSourceInfo,
377    risingwave_pb::catalog::PbWebhookSourceInfo
378);
379derive_from_blob!(WatermarkDesc, risingwave_pb::catalog::PbWatermarkDesc);
380derive_array_from_blob!(
381    WatermarkDescArray,
382    risingwave_pb::catalog::PbWatermarkDesc,
383    PbWatermarkDescArray
384);
385derive_array_from_blob!(
386    ExprNodeArray,
387    risingwave_pb::expr::PbExprNode,
388    PbExprNodeArray
389);
390derive_array_from_blob!(
391    ColumnOrderArray,
392    risingwave_pb::common::PbColumnOrder,
393    PbColumnOrderArray
394);
395derive_array_from_blob!(
396    IndexColumnPropertiesArray,
397    risingwave_pb::catalog::PbIndexColumnProperties,
398    PbIndexColumnPropertiesArray
399);
400derive_from_blob!(SinkFormatDesc, risingwave_pb::catalog::PbSinkFormatDesc);
401derive_from_blob!(Cardinality, risingwave_pb::plan_common::PbCardinality);
402derive_from_blob!(TableVersion, risingwave_pb::catalog::table::PbTableVersion);
403derive_from_blob!(
404    PrivateLinkService,
405    risingwave_pb::catalog::connection::PbPrivateLinkService
406);
407derive_from_blob!(ConnectionParams, risingwave_pb::catalog::ConnectionParams);
408derive_from_blob!(AuthInfo, risingwave_pb::user::PbAuthInfo);
409
410derive_from_blob!(ConnectorSplits, risingwave_pb::source::ConnectorSplits);
411derive_from_blob!(VnodeBitmap, risingwave_pb::common::Buffer);
412derive_from_blob!(ActorMapping, risingwave_pb::stream_plan::PbActorMapping);
413derive_from_blob!(ExprContext, risingwave_pb::plan_common::PbExprContext);
414
415derive_array_from_blob!(
416    HummockVersionDeltaArray,
417    risingwave_pb::hummock::PbHummockVersionDelta,
418    PbHummockVersionDeltaArray
419);
420
421#[derive(Clone, Debug, PartialEq, FromJsonQueryResult, Serialize, Deserialize)]
422pub enum StreamingParallelism {
423    Adaptive,
424    Fixed(usize),
425    Custom,
426}
427
428impl Eq for StreamingParallelism {}
429
430#[derive(
431    Hash, Copy, Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize,
432)]
433#[sea_orm(rs_type = "String", db_type = "string(None)")]
434pub enum DispatcherType {
435    #[sea_orm(string_value = "HASH")]
436    Hash,
437    #[sea_orm(string_value = "BROADCAST")]
438    Broadcast,
439    #[sea_orm(string_value = "SIMPLE")]
440    Simple,
441    #[sea_orm(string_value = "NO_SHUFFLE")]
442    NoShuffle,
443}
444
445impl From<PbDispatcherType> for DispatcherType {
446    fn from(val: PbDispatcherType) -> Self {
447        match val {
448            PbDispatcherType::Unspecified => unreachable!(),
449            PbDispatcherType::Hash => DispatcherType::Hash,
450            PbDispatcherType::Broadcast => DispatcherType::Broadcast,
451            PbDispatcherType::Simple => DispatcherType::Simple,
452            PbDispatcherType::NoShuffle => DispatcherType::NoShuffle,
453        }
454    }
455}
456
457impl From<DispatcherType> for PbDispatcherType {
458    fn from(val: DispatcherType) -> Self {
459        match val {
460            DispatcherType::Hash => PbDispatcherType::Hash,
461            DispatcherType::Broadcast => PbDispatcherType::Broadcast,
462            DispatcherType::Simple => PbDispatcherType::Simple,
463            DispatcherType::NoShuffle => PbDispatcherType::NoShuffle,
464        }
465    }
466}