1use std::collections::BTreeMap;
16
17pub use risingwave_common::id::*;
18use risingwave_pb::catalog::{PbCreateType, PbStreamJobStatus};
19use risingwave_pb::meta::table_fragments::PbState as PbStreamJobState;
20use risingwave_pb::secret::PbSecretRef;
21use risingwave_pb::stream_plan::{PbDispatcherType, PbStreamNode};
22use sea_orm::entity::prelude::*;
23use sea_orm::{DeriveActiveEnum, EnumIter, FromJsonQueryResult, Value};
24use serde::{Deserialize, Serialize};
25
26pub mod prelude;
27
28pub mod catalog_version;
29pub mod cdc_table_snapshot_split;
30pub mod cluster;
31pub mod compaction_config;
32pub mod compaction_status;
33pub mod compaction_task;
34pub mod connection;
35pub mod database;
36pub mod exactly_once_iceberg_sink;
37pub mod fragment;
38pub mod fragment_relation;
39pub mod fragment_splits;
40pub mod function;
41pub mod hummock_epoch_to_version;
42pub mod hummock_gc_history;
43pub mod hummock_pinned_snapshot;
44pub mod hummock_pinned_version;
45pub mod hummock_sequence;
46pub mod hummock_sstable_info;
47pub mod hummock_table_change_log;
48pub mod hummock_time_travel_delta;
49pub mod hummock_time_travel_version;
50pub mod hummock_version_delta;
51pub mod hummock_version_stats;
52pub mod iceberg_namespace_properties;
53pub mod iceberg_tables;
54pub mod index;
55pub mod object;
56pub mod object_dependency;
57pub mod pending_sink_state;
58pub mod refresh_job;
59pub mod schema;
60pub mod secret;
61pub mod serde_seaql_migration;
62pub mod session_parameter;
63pub mod sink;
64pub mod source;
65pub mod streaming_job;
66pub mod subscription;
67pub mod system_parameter;
68pub mod table;
69pub mod user;
70pub mod user_default_privilege;
71pub mod user_privilege;
72pub mod view;
73pub mod worker;
74pub mod worker_property;
75
76#[macro_export]
77macro_rules! for_all_meta_model_entities {
78 ($macro:ident) => {
79 $macro! {
80 catalog_version,
81 cdc_table_snapshot_split,
82 cluster,
83 compaction_config,
84 compaction_status,
85 compaction_task,
86 connection,
87 database,
88 exactly_once_iceberg_sink,
89 fragment,
90 fragment_relation,
91 fragment_splits,
92 function,
93 hummock_epoch_to_version,
94 hummock_gc_history,
95 hummock_pinned_snapshot,
96 hummock_pinned_version,
97 hummock_sequence,
98 hummock_sstable_info,
99 hummock_table_change_log,
100 hummock_time_travel_delta,
101 hummock_time_travel_version,
102 hummock_version_delta,
103 hummock_version_stats,
104 iceberg_namespace_properties,
105 iceberg_tables,
106 index,
107 object,
108 object_dependency,
109 pending_sink_state,
110 refresh_job,
111 schema,
112 secret,
113 serde_seaql_migration,
114 session_parameter,
115 sink,
116 source,
117 streaming_job,
118 subscription,
119 system_parameter,
120 table,
121 user,
122 user_default_privilege,
123 user_privilege,
124 view,
125 worker,
126 worker_property
127 }
128 };
129}
130
131pub type TransactionId = i32;
132
133pub type PrivilegeId = i32;
134pub type DefaultPrivilegeId = i32;
135
136pub use risingwave_pb::id::{CompactionGroupId, HummockSstableObjectId, HummockVersionId};
137pub type Epoch = i64;
138pub type CompactionTaskId = i64;
139
140#[derive(Clone, Copy, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
141#[sea_orm(rs_type = "String", db_type = "string(None)")]
142pub enum JobStatus {
143 #[sea_orm(string_value = "INITIAL")]
144 Initial,
145 #[sea_orm(string_value = "CREATING")]
146 Creating,
147 #[sea_orm(string_value = "CREATED")]
148 Created,
149}
150
151impl From<JobStatus> for PbStreamJobStatus {
152 fn from(job_status: JobStatus) -> Self {
153 match job_status {
154 JobStatus::Initial => Self::Unspecified,
155 JobStatus::Creating => Self::Creating,
156 JobStatus::Created => Self::Created,
157 }
158 }
159}
160
161impl From<JobStatus> for PbStreamJobState {
163 fn from(status: JobStatus) -> Self {
164 match status {
165 JobStatus::Initial => PbStreamJobState::Initial,
166 JobStatus::Creating => PbStreamJobState::Creating,
167 JobStatus::Created => PbStreamJobState::Created,
168 }
169 }
170}
171
172#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
173#[sea_orm(rs_type = "String", db_type = "string(None)")]
174pub enum CreateType {
175 #[sea_orm(string_value = "BACKGROUND")]
176 Background,
177 #[sea_orm(string_value = "FOREGROUND")]
178 Foreground,
179}
180
181impl From<CreateType> for PbCreateType {
182 fn from(create_type: CreateType) -> Self {
183 match create_type {
184 CreateType::Background => Self::Background,
185 CreateType::Foreground => Self::Foreground,
186 }
187 }
188}
189
190impl From<PbCreateType> for CreateType {
191 fn from(create_type: PbCreateType) -> Self {
192 match create_type {
193 PbCreateType::Background => Self::Background,
194 PbCreateType::Foreground => Self::Foreground,
195 PbCreateType::Unspecified => unreachable!("Unspecified create type"),
196 }
197 }
198}
199
200impl CreateType {
201 pub fn as_str(&self) -> &'static str {
202 match self {
203 CreateType::Background => "BACKGROUND",
204 CreateType::Foreground => "FOREGROUND",
205 }
206 }
207}
208
209#[macro_export]
211macro_rules! derive_from_json_struct {
212 ($struct_name:ident, $field_type:ty) => {
213 #[derive(Clone, Debug, PartialEq, FromJsonQueryResult, Serialize, Deserialize, Default)]
214 pub struct $struct_name(pub $field_type);
215 impl Eq for $struct_name {}
216 impl From<$field_type> for $struct_name {
217 fn from(value: $field_type) -> Self {
218 Self(value)
219 }
220 }
221
222 impl $struct_name {
223 pub fn into_inner(self) -> $field_type {
224 self.0
225 }
226
227 pub fn inner_ref(&self) -> &$field_type {
228 &self.0
229 }
230 }
231 };
232}
233
234macro_rules! derive_from_blob {
236 ($struct_name:ident, $field_type:ty) => {
237 #[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, DeriveValueType)]
238 pub struct $struct_name(#[sea_orm] Vec<u8>);
239
240 impl $struct_name {
241 pub fn to_protobuf(&self) -> $field_type {
242 prost::Message::decode(self.0.as_slice()).unwrap()
243 }
244
245 fn from_protobuf(val: &$field_type) -> Self {
246 Self(prost::Message::encode_to_vec(val))
247 }
248 }
249
250 impl sea_orm::sea_query::Nullable for $struct_name {
251 fn null() -> Value {
252 Value::Bytes(None)
253 }
254 }
255
256 impl From<&$field_type> for $struct_name {
257 fn from(value: &$field_type) -> Self {
258 Self::from_protobuf(value)
259 }
260 }
261
262 impl std::fmt::Debug for $struct_name {
263 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
264 self.to_protobuf().fmt(f)
265 }
266 }
267
268 impl Default for $struct_name {
269 fn default() -> Self {
270 Self::from_protobuf(&<$field_type>::default())
271 }
272 }
273 };
274}
275
276macro_rules! derive_array_from_blob {
278 ($struct_name:ident, $field_type:ty, $field_array_name:ident) => {
279 #[derive(Clone, PartialEq, Eq, DeriveValueType, serde::Deserialize, serde::Serialize)]
280 pub struct $struct_name(#[sea_orm] Vec<u8>);
281
282 #[derive(Clone, PartialEq, ::prost::Message)]
283 pub struct $field_array_name {
284 #[prost(message, repeated, tag = "1")]
285 inner: Vec<$field_type>,
286 }
287 impl Eq for $field_array_name {}
288
289 impl $struct_name {
290 pub fn to_protobuf(&self) -> Vec<$field_type> {
291 let data: $field_array_name = prost::Message::decode(self.0.as_slice()).unwrap();
292 data.inner
293 }
294
295 fn from_protobuf(val: Vec<$field_type>) -> Self {
296 Self(prost::Message::encode_to_vec(&$field_array_name {
297 inner: val,
298 }))
299 }
300 }
301
302 impl From<Vec<$field_type>> for $struct_name {
303 fn from(value: Vec<$field_type>) -> Self {
304 Self::from_protobuf(value)
305 }
306 }
307
308 impl std::fmt::Debug for $struct_name {
309 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
310 self.to_protobuf().fmt(f)
311 }
312 }
313
314 impl Default for $struct_name {
315 fn default() -> Self {
316 Self(vec![])
317 }
318 }
319
320 impl sea_orm::sea_query::Nullable for $struct_name {
321 fn null() -> Value {
322 Value::Bytes(None)
323 }
324 }
325 };
326}
327
328macro_rules! derive_btreemap_from_blob {
329 ($struct_name:ident, $key_type:ty, $value_type:ty, $field_type:ident) => {
330 #[derive(Clone, PartialEq, Eq, DeriveValueType, serde::Deserialize, serde::Serialize)]
331 pub struct $struct_name(#[sea_orm] Vec<u8>);
332
333 #[derive(Clone, PartialEq, ::prost::Message)]
334 pub struct $field_type {
335 #[prost(btree_map = "string, message")]
336 inner: BTreeMap<$key_type, $value_type>,
337 }
338 impl Eq for $field_type {}
339
340 impl $struct_name {
341 pub fn to_protobuf(&self) -> BTreeMap<$key_type, $value_type> {
342 let data: $field_type = prost::Message::decode(self.0.as_slice()).unwrap();
343 data.inner
344 }
345
346 fn from_protobuf(val: BTreeMap<$key_type, $value_type>) -> Self {
347 Self(prost::Message::encode_to_vec(&$field_type { inner: val }))
348 }
349 }
350
351 impl From<BTreeMap<$key_type, $value_type>> for $struct_name {
352 fn from(value: BTreeMap<$key_type, $value_type>) -> Self {
353 Self::from_protobuf(value)
354 }
355 }
356
357 impl std::fmt::Debug for $struct_name {
358 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
359 self.to_protobuf().fmt(f)
360 }
361 }
362
363 impl Default for $struct_name {
364 fn default() -> Self {
365 Self(vec![])
366 }
367 }
368
369 impl sea_orm::sea_query::Nullable for $struct_name {
370 fn null() -> Value {
371 Value::Bytes(None)
372 }
373 }
374 };
375}
376
377pub(crate) use derive_array_from_blob;
378pub(crate) use derive_from_blob;
379
380derive_from_json_struct!(TableIdArray, Vec<TableId>);
381
382derive_from_json_struct!(EpochArray, Vec<Epoch>);
383
384derive_from_json_struct!(I32Array, Vec<i32>);
385
386impl From<Vec<u32>> for I32Array {
387 fn from(value: Vec<u32>) -> Self {
388 Self(value.into_iter().map(|id| id as _).collect())
389 }
390}
391
392impl I32Array {
393 pub fn into_u32_array(self) -> Vec<u32> {
394 self.0.into_iter().map(|id| id as _).collect()
395 }
396}
397
398derive_btreemap_from_blob!(SecretRef, String, PbSecretRef, PbSecretRefMap);
399
400derive_from_blob!(StreamNode, PbStreamNode);
401derive_from_blob!(DataType, risingwave_pb::data::PbDataType);
402derive_array_from_blob!(
403 DataTypeArray,
404 risingwave_pb::data::PbDataType,
405 PbDataTypeArray
406);
407derive_array_from_blob!(
408 FieldArray,
409 risingwave_pb::plan_common::PbField,
410 PbFieldArray
411);
412derive_from_json_struct!(Property, BTreeMap<String, String>);
413derive_from_blob!(ColumnCatalog, risingwave_pb::plan_common::PbColumnCatalog);
414derive_array_from_blob!(
415 ColumnCatalogArray,
416 risingwave_pb::plan_common::PbColumnCatalog,
417 PbColumnCatalogArray
418);
419derive_from_blob!(StreamSourceInfo, risingwave_pb::catalog::PbStreamSourceInfo);
420derive_from_blob!(
421 WebhookSourceInfo,
422 risingwave_pb::catalog::PbWebhookSourceInfo
423);
424derive_from_blob!(WatermarkDesc, risingwave_pb::catalog::PbWatermarkDesc);
425derive_array_from_blob!(
426 WatermarkDescArray,
427 risingwave_pb::catalog::PbWatermarkDesc,
428 PbWatermarkDescArray
429);
430derive_array_from_blob!(
431 ExprNodeArray,
432 risingwave_pb::expr::PbExprNode,
433 PbExprNodeArray
434);
435derive_array_from_blob!(
436 ColumnOrderArray,
437 risingwave_pb::common::PbColumnOrder,
438 PbColumnOrderArray
439);
440derive_array_from_blob!(
441 IndexColumnPropertiesArray,
442 risingwave_pb::catalog::PbIndexColumnProperties,
443 PbIndexColumnPropertiesArray
444);
445derive_from_blob!(SinkFormatDesc, risingwave_pb::catalog::PbSinkFormatDesc);
446derive_from_blob!(Cardinality, risingwave_pb::plan_common::PbCardinality);
447derive_from_blob!(TableVersion, risingwave_pb::catalog::table::PbTableVersion);
448derive_from_blob!(
449 PrivateLinkService,
450 risingwave_pb::catalog::connection::PbPrivateLinkService
451);
452derive_from_blob!(ConnectionParams, risingwave_pb::catalog::ConnectionParams);
453derive_from_blob!(AuthInfo, risingwave_pb::user::PbAuthInfo);
454
455derive_from_blob!(ConnectorSplits, risingwave_pb::source::ConnectorSplits);
456derive_from_blob!(VnodeBitmap, risingwave_pb::common::Buffer);
457derive_from_blob!(ActorMapping, risingwave_pb::stream_plan::PbActorMapping);
458derive_from_blob!(ExprContext, risingwave_pb::plan_common::PbExprContext);
459derive_from_blob!(
460 SourceRefreshMode,
461 risingwave_pb::plan_common::PbSourceRefreshMode
462);
463
464derive_from_blob!(
465 SinkSchemachange,
466 risingwave_pb::stream_plan::PbSinkSchemaChange
467);
468
469derive_array_from_blob!(
470 TypePairArray,
471 risingwave_pb::stream_plan::dispatch_output_mapping::TypePair,
472 PbTypePairArray
473);
474
475derive_array_from_blob!(
476 HummockVersionDeltaArray,
477 risingwave_pb::hummock::PbHummockVersionDelta,
478 PbHummockVersionDeltaArray
479);
480
481derive_array_from_blob!(
482 SstableInfoArray,
483 risingwave_pb::hummock::PbSstableInfo,
484 PbSstableInfoArray
485);
486
487#[derive(Clone, Debug, PartialEq, FromJsonQueryResult, Serialize, Deserialize)]
488pub enum StreamingParallelism {
489 Adaptive,
490 Fixed(usize),
491 Custom,
492}
493
494impl Eq for StreamingParallelism {}
495
496#[derive(
497 Hash, Copy, Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize,
498)]
499#[sea_orm(rs_type = "String", db_type = "string(None)")]
500pub enum DispatcherType {
501 #[sea_orm(string_value = "HASH")]
502 Hash,
503 #[sea_orm(string_value = "BROADCAST")]
504 Broadcast,
505 #[sea_orm(string_value = "SIMPLE")]
506 Simple,
507 #[sea_orm(string_value = "NO_SHUFFLE")]
508 NoShuffle,
509}
510
511impl From<PbDispatcherType> for DispatcherType {
512 fn from(val: PbDispatcherType) -> Self {
513 match val {
514 PbDispatcherType::Unspecified => unreachable!(),
515 PbDispatcherType::Hash => DispatcherType::Hash,
516 PbDispatcherType::Broadcast => DispatcherType::Broadcast,
517 PbDispatcherType::Simple => DispatcherType::Simple,
518 PbDispatcherType::NoShuffle => DispatcherType::NoShuffle,
519 }
520 }
521}
522
523impl From<DispatcherType> for PbDispatcherType {
524 fn from(val: DispatcherType) -> Self {
525 match val {
526 DispatcherType::Hash => PbDispatcherType::Hash,
527 DispatcherType::Broadcast => PbDispatcherType::Broadcast,
528 DispatcherType::Simple => PbDispatcherType::Simple,
529 DispatcherType::NoShuffle => PbDispatcherType::NoShuffle,
530 }
531 }
532}