risingwave_meta_model/
lib.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16
17use risingwave_pb::catalog::{PbCreateType, PbStreamJobStatus};
18use risingwave_pb::meta::table_fragments::PbState as PbStreamJobState;
19use risingwave_pb::secret::PbSecretRef;
20use risingwave_pb::stream_plan::PbStreamNode;
21use sea_orm::entity::prelude::*;
22use sea_orm::{DeriveActiveEnum, EnumIter, FromJsonQueryResult};
23use serde::{Deserialize, Serialize};
24
25pub mod prelude;
26
27pub mod actor;
28pub mod actor_dispatcher;
29pub mod catalog_version;
30pub mod cluster;
31pub mod compaction_config;
32pub mod compaction_status;
33pub mod compaction_task;
34pub mod connection;
35pub mod database;
36pub mod exactly_once_iceberg_sink;
37pub mod fragment;
38pub mod fragment_relation;
39pub mod function;
40pub mod hummock_epoch_to_version;
41pub mod hummock_gc_history;
42pub mod hummock_pinned_snapshot;
43pub mod hummock_pinned_version;
44pub mod hummock_sequence;
45pub mod hummock_sstable_info;
46pub mod hummock_time_travel_delta;
47pub mod hummock_time_travel_version;
48pub mod hummock_version_delta;
49pub mod hummock_version_stats;
50pub mod index;
51pub mod object;
52pub mod object_dependency;
53pub mod schema;
54pub mod secret;
55pub mod serde_seaql_migration;
56pub mod session_parameter;
57pub mod sink;
58pub mod source;
59pub mod streaming_job;
60pub mod subscription;
61pub mod system_parameter;
62pub mod table;
63pub mod user;
64pub mod user_privilege;
65pub mod view;
66pub mod worker;
67pub mod worker_property;
68
69pub type WorkerId = i32;
70
71pub type TransactionId = i32;
72
73pub type ObjectId = i32;
74pub type DatabaseId = ObjectId;
75pub type SchemaId = ObjectId;
76pub type TableId = ObjectId;
77pub type SourceId = ObjectId;
78pub type SinkId = ObjectId;
79pub type SubscriptionId = ObjectId;
80pub type IndexId = ObjectId;
81pub type ViewId = ObjectId;
82pub type FunctionId = ObjectId;
83pub type ConnectionId = ObjectId;
84pub type SecretId = ObjectId;
85pub type UserId = i32;
86pub type PrivilegeId = i32;
87
88pub type HummockVersionId = i64;
89pub type Epoch = i64;
90pub type CompactionGroupId = i64;
91pub type CompactionTaskId = i64;
92pub type HummockSstableObjectId = i64;
93
94pub type FragmentId = i32;
95pub type ActorId = i32;
96
97#[derive(Clone, Copy, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
98#[sea_orm(rs_type = "String", db_type = "string(None)")]
99pub enum JobStatus {
100    #[sea_orm(string_value = "INITIAL")]
101    Initial,
102    #[sea_orm(string_value = "CREATING")]
103    Creating,
104    #[sea_orm(string_value = "CREATED")]
105    Created,
106}
107
108impl From<JobStatus> for PbStreamJobStatus {
109    fn from(job_status: JobStatus) -> Self {
110        match job_status {
111            JobStatus::Initial => Self::Unspecified,
112            JobStatus::Creating => Self::Creating,
113            JobStatus::Created => Self::Created,
114        }
115    }
116}
117
118// todo: deprecate job status in catalog and unify with this one.
119impl From<JobStatus> for PbStreamJobState {
120    fn from(status: JobStatus) -> Self {
121        match status {
122            JobStatus::Initial => PbStreamJobState::Initial,
123            JobStatus::Creating => PbStreamJobState::Creating,
124            JobStatus::Created => PbStreamJobState::Created,
125        }
126    }
127}
128
129#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
130#[sea_orm(rs_type = "String", db_type = "string(None)")]
131pub enum CreateType {
132    #[sea_orm(string_value = "BACKGROUND")]
133    Background,
134    #[sea_orm(string_value = "FOREGROUND")]
135    Foreground,
136}
137
138impl From<CreateType> for PbCreateType {
139    fn from(create_type: CreateType) -> Self {
140        match create_type {
141            CreateType::Background => Self::Background,
142            CreateType::Foreground => Self::Foreground,
143        }
144    }
145}
146
147impl From<PbCreateType> for CreateType {
148    fn from(create_type: PbCreateType) -> Self {
149        match create_type {
150            PbCreateType::Background => Self::Background,
151            PbCreateType::Foreground => Self::Foreground,
152            PbCreateType::Unspecified => unreachable!("Unspecified create type"),
153        }
154    }
155}
156
157/// Defines struct with a single pb field that derives `FromJsonQueryResult`, it will helps to map json value stored in database to Pb struct.
158macro_rules! derive_from_json_struct {
159    ($struct_name:ident, $field_type:ty) => {
160        #[derive(Clone, Debug, PartialEq, FromJsonQueryResult, Serialize, Deserialize, Default)]
161        pub struct $struct_name(pub $field_type);
162        impl Eq for $struct_name {}
163        impl From<$field_type> for $struct_name {
164            fn from(value: $field_type) -> Self {
165                Self(value)
166            }
167        }
168
169        impl $struct_name {
170            pub fn into_inner(self) -> $field_type {
171                self.0
172            }
173
174            pub fn inner_ref(&self) -> &$field_type {
175                &self.0
176            }
177        }
178    };
179}
180
181/// Defines struct with a byte array that derives `DeriveValueType`, it will helps to map blob stored in database to Pb struct.
182macro_rules! derive_from_blob {
183    ($struct_name:ident, $field_type:ty) => {
184        #[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, DeriveValueType)]
185        pub struct $struct_name(#[sea_orm] Vec<u8>);
186
187        impl $struct_name {
188            pub fn to_protobuf(&self) -> $field_type {
189                prost::Message::decode(self.0.as_slice()).unwrap()
190            }
191
192            fn from_protobuf(val: &$field_type) -> Self {
193                Self(prost::Message::encode_to_vec(val))
194            }
195        }
196
197        impl sea_orm::sea_query::Nullable for $struct_name {
198            fn null() -> Value {
199                Value::Bytes(None)
200            }
201        }
202
203        impl From<&$field_type> for $struct_name {
204            fn from(value: &$field_type) -> Self {
205                Self::from_protobuf(value)
206            }
207        }
208
209        impl std::fmt::Debug for $struct_name {
210            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
211                self.to_protobuf().fmt(f)
212            }
213        }
214
215        impl Default for $struct_name {
216            fn default() -> Self {
217                Self::from_protobuf(&<$field_type>::default())
218            }
219        }
220    };
221}
222
223/// Defines struct with a byte array that derives `DeriveValueType`, it will helps to map blob stored in database to Pb struct array.
224macro_rules! derive_array_from_blob {
225    ($struct_name:ident, $field_type:ty, $field_array_name:ident) => {
226        #[derive(Clone, PartialEq, Eq, DeriveValueType, serde::Deserialize, serde::Serialize)]
227        pub struct $struct_name(#[sea_orm] Vec<u8>);
228
229        #[derive(Clone, PartialEq, ::prost::Message)]
230        pub struct $field_array_name {
231            #[prost(message, repeated, tag = "1")]
232            inner: Vec<$field_type>,
233        }
234        impl Eq for $field_array_name {}
235
236        impl $struct_name {
237            pub fn to_protobuf(&self) -> Vec<$field_type> {
238                let data: $field_array_name = prost::Message::decode(self.0.as_slice()).unwrap();
239                data.inner
240            }
241
242            fn from_protobuf(val: Vec<$field_type>) -> Self {
243                Self(prost::Message::encode_to_vec(&$field_array_name {
244                    inner: val,
245                }))
246            }
247        }
248
249        impl From<Vec<$field_type>> for $struct_name {
250            fn from(value: Vec<$field_type>) -> Self {
251                Self::from_protobuf(value)
252            }
253        }
254
255        impl std::fmt::Debug for $struct_name {
256            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
257                self.to_protobuf().fmt(f)
258            }
259        }
260
261        impl Default for $struct_name {
262            fn default() -> Self {
263                Self(vec![])
264            }
265        }
266
267        impl sea_orm::sea_query::Nullable for $struct_name {
268            fn null() -> Value {
269                Value::Bytes(None)
270            }
271        }
272    };
273}
274
275macro_rules! derive_btreemap_from_blob {
276    ($struct_name:ident, $key_type:ty, $value_type:ty, $field_type:ident) => {
277        #[derive(Clone, PartialEq, Eq, DeriveValueType, serde::Deserialize, serde::Serialize)]
278        pub struct $struct_name(#[sea_orm] Vec<u8>);
279
280        #[derive(Clone, PartialEq, ::prost::Message)]
281        pub struct $field_type {
282            #[prost(btree_map = "string, message")]
283            inner: BTreeMap<$key_type, $value_type>,
284        }
285        impl Eq for $field_type {}
286
287        impl $struct_name {
288            pub fn to_protobuf(&self) -> BTreeMap<$key_type, $value_type> {
289                let data: $field_type = prost::Message::decode(self.0.as_slice()).unwrap();
290                data.inner
291            }
292
293            fn from_protobuf(val: BTreeMap<$key_type, $value_type>) -> Self {
294                Self(prost::Message::encode_to_vec(&$field_type { inner: val }))
295            }
296        }
297
298        impl From<BTreeMap<$key_type, $value_type>> for $struct_name {
299            fn from(value: BTreeMap<$key_type, $value_type>) -> Self {
300                Self::from_protobuf(value)
301            }
302        }
303
304        impl std::fmt::Debug for $struct_name {
305            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
306                self.to_protobuf().fmt(f)
307            }
308        }
309
310        impl Default for $struct_name {
311            fn default() -> Self {
312                Self(vec![])
313            }
314        }
315
316        impl sea_orm::sea_query::Nullable for $struct_name {
317            fn null() -> Value {
318                Value::Bytes(None)
319            }
320        }
321    };
322}
323
324pub(crate) use {derive_array_from_blob, derive_from_blob};
325
326derive_from_json_struct!(I32Array, Vec<i32>);
327
328impl From<Vec<u32>> for I32Array {
329    fn from(value: Vec<u32>) -> Self {
330        Self(value.into_iter().map(|id| id as _).collect())
331    }
332}
333
334impl I32Array {
335    pub fn into_u32_array(self) -> Vec<u32> {
336        self.0.into_iter().map(|id| id as _).collect()
337    }
338}
339
340derive_from_json_struct!(ActorUpstreamActors, BTreeMap<FragmentId, Vec<ActorId>>);
341
342impl From<BTreeMap<u32, Vec<u32>>> for ActorUpstreamActors {
343    fn from(val: BTreeMap<u32, Vec<u32>>) -> Self {
344        let mut map = BTreeMap::new();
345        for (k, v) in val {
346            map.insert(k as _, v.into_iter().map(|a| a as _).collect());
347        }
348        Self(map)
349    }
350}
351
352derive_btreemap_from_blob!(SecretRef, String, PbSecretRef, PbSecretRefMap);
353
354derive_from_blob!(StreamNode, PbStreamNode);
355derive_from_blob!(DataType, risingwave_pb::data::PbDataType);
356derive_array_from_blob!(
357    DataTypeArray,
358    risingwave_pb::data::PbDataType,
359    PbDataTypeArray
360);
361derive_array_from_blob!(
362    FieldArray,
363    risingwave_pb::plan_common::PbField,
364    PbFieldArray
365);
366derive_from_json_struct!(Property, BTreeMap<String, String>);
367derive_from_blob!(ColumnCatalog, risingwave_pb::plan_common::PbColumnCatalog);
368derive_array_from_blob!(
369    ColumnCatalogArray,
370    risingwave_pb::plan_common::PbColumnCatalog,
371    PbColumnCatalogArray
372);
373derive_from_blob!(StreamSourceInfo, risingwave_pb::catalog::PbStreamSourceInfo);
374derive_from_blob!(
375    WebhookSourceInfo,
376    risingwave_pb::catalog::PbWebhookSourceInfo
377);
378derive_from_blob!(WatermarkDesc, risingwave_pb::catalog::PbWatermarkDesc);
379derive_array_from_blob!(
380    WatermarkDescArray,
381    risingwave_pb::catalog::PbWatermarkDesc,
382    PbWatermarkDescArray
383);
384derive_array_from_blob!(
385    ExprNodeArray,
386    risingwave_pb::expr::PbExprNode,
387    PbExprNodeArray
388);
389derive_array_from_blob!(
390    ColumnOrderArray,
391    risingwave_pb::common::PbColumnOrder,
392    PbColumnOrderArray
393);
394derive_array_from_blob!(
395    IndexColumnPropertiesArray,
396    risingwave_pb::catalog::PbIndexColumnProperties,
397    PbIndexColumnPropertiesArray
398);
399derive_from_blob!(SinkFormatDesc, risingwave_pb::catalog::PbSinkFormatDesc);
400derive_from_blob!(Cardinality, risingwave_pb::plan_common::PbCardinality);
401derive_from_blob!(TableVersion, risingwave_pb::catalog::table::PbTableVersion);
402derive_from_blob!(
403    PrivateLinkService,
404    risingwave_pb::catalog::connection::PbPrivateLinkService
405);
406derive_from_blob!(ConnectionParams, risingwave_pb::catalog::ConnectionParams);
407derive_from_blob!(AuthInfo, risingwave_pb::user::PbAuthInfo);
408
409derive_from_blob!(ConnectorSplits, risingwave_pb::source::ConnectorSplits);
410derive_from_blob!(VnodeBitmap, risingwave_pb::common::Buffer);
411derive_from_blob!(ActorMapping, risingwave_pb::stream_plan::PbActorMapping);
412derive_from_blob!(ExprContext, risingwave_pb::plan_common::PbExprContext);
413
414derive_array_from_blob!(
415    HummockVersionDeltaArray,
416    risingwave_pb::hummock::PbHummockVersionDelta,
417    PbHummockVersionDeltaArray
418);
419
420#[derive(Clone, Debug, PartialEq, FromJsonQueryResult, Serialize, Deserialize)]
421pub enum StreamingParallelism {
422    Adaptive,
423    Fixed(usize),
424    Custom,
425}
426
427impl Eq for StreamingParallelism {}