1use std::collections::BTreeMap;
16
17pub use risingwave_common::id::*;
18use risingwave_pb::catalog::{PbCreateType, PbStreamJobStatus};
19use risingwave_pb::meta::table_fragments::PbState as PbStreamJobState;
20use risingwave_pb::secret::PbSecretRef;
21use risingwave_pb::stream_plan::{PbDispatcherType, PbStreamNode};
22use sea_orm::entity::prelude::*;
23use sea_orm::{DeriveActiveEnum, EnumIter, FromJsonQueryResult, Value};
24use serde::{Deserialize, Serialize};
25
26pub mod prelude;
27
28pub mod catalog_version;
29pub mod cdc_table_snapshot_split;
30pub mod cluster;
31pub mod compaction_config;
32pub mod compaction_status;
33pub mod compaction_task;
34pub mod connection;
35pub mod database;
36pub mod exactly_once_iceberg_sink;
37pub mod fragment;
38pub mod fragment_relation;
39pub mod fragment_splits;
40pub mod function;
41pub mod hummock_epoch_to_version;
42pub mod hummock_gc_history;
43pub mod hummock_pinned_snapshot;
44pub mod hummock_pinned_version;
45pub mod hummock_sequence;
46pub mod hummock_sstable_info;
47pub mod hummock_table_change_log;
48pub mod hummock_time_travel_delta;
49pub mod hummock_time_travel_version;
50pub mod hummock_version_delta;
51pub mod hummock_version_stats;
52pub mod iceberg_namespace_properties;
53pub mod iceberg_tables;
54pub mod index;
55pub mod object;
56pub mod object_dependency;
57pub mod pending_sink_state;
58pub mod refresh_job;
59pub mod schema;
60pub mod secret;
61pub mod serde_seaql_migration;
62pub mod session_parameter;
63pub mod sink;
64pub mod source;
65pub mod streaming_job;
66pub mod subscription;
67pub mod system_parameter;
68pub mod table;
69pub mod user;
70pub mod user_default_privilege;
71pub mod user_privilege;
72pub mod view;
73pub mod worker;
74pub mod worker_property;
75
76#[macro_export]
77macro_rules! for_all_meta_model_entities {
78 ($macro:ident) => {
79 $macro! {
80 catalog_version,
81 cdc_table_snapshot_split,
82 cluster,
83 compaction_config,
84 compaction_status,
85 compaction_task,
86 connection,
87 database,
88 exactly_once_iceberg_sink,
89 fragment,
90 fragment_relation,
91 fragment_splits,
92 function,
93 hummock_epoch_to_version,
94 hummock_gc_history,
95 hummock_pinned_snapshot,
96 hummock_pinned_version,
97 hummock_sequence,
98 hummock_sstable_info,
99 hummock_table_change_log,
100 hummock_time_travel_delta,
101 hummock_time_travel_version,
102 hummock_version_delta,
103 hummock_version_stats,
104 iceberg_namespace_properties,
105 iceberg_tables,
106 index,
107 object,
108 object_dependency,
109 pending_sink_state,
110 refresh_job,
111 schema,
112 secret,
113 serde_seaql_migration,
114 session_parameter,
115 sink,
116 source,
117 streaming_job,
118 subscription,
119 system_parameter,
120 table,
121 user,
122 user_default_privilege,
123 user_privilege,
124 view,
125 worker,
126 worker_property
127 }
128 };
129}
130
131pub type TransactionId = i32;
132
133pub type PrivilegeId = i32;
134pub type DefaultPrivilegeId = i32;
135
136pub use risingwave_pb::id::{CompactionGroupId, HummockSstableObjectId, HummockVersionId};
137pub type Epoch = i64;
138pub type CompactionTaskId = i64;
139
140#[derive(Clone, Copy, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
141#[sea_orm(rs_type = "String", db_type = "string(None)")]
142pub enum JobStatus {
143 #[sea_orm(string_value = "INITIAL")]
144 Initial,
145 #[sea_orm(string_value = "CREATING")]
146 Creating,
147 #[sea_orm(string_value = "CREATED")]
148 Created,
149}
150
151impl From<JobStatus> for PbStreamJobStatus {
152 fn from(job_status: JobStatus) -> Self {
153 match job_status {
154 JobStatus::Initial => Self::Unspecified,
155 JobStatus::Creating => Self::Creating,
156 JobStatus::Created => Self::Created,
157 }
158 }
159}
160
161impl From<JobStatus> for PbStreamJobState {
163 fn from(status: JobStatus) -> Self {
164 match status {
165 JobStatus::Initial => PbStreamJobState::Initial,
166 JobStatus::Creating => PbStreamJobState::Creating,
167 JobStatus::Created => PbStreamJobState::Created,
168 }
169 }
170}
171
172#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
173#[sea_orm(rs_type = "String", db_type = "string(None)")]
174pub enum CreateType {
175 #[sea_orm(string_value = "BACKGROUND")]
176 Background,
177 #[sea_orm(string_value = "FOREGROUND")]
178 Foreground,
179}
180
181impl From<CreateType> for PbCreateType {
182 fn from(create_type: CreateType) -> Self {
183 match create_type {
184 CreateType::Background => Self::Background,
185 CreateType::Foreground => Self::Foreground,
186 }
187 }
188}
189
190impl From<PbCreateType> for CreateType {
191 fn from(create_type: PbCreateType) -> Self {
192 match create_type {
193 PbCreateType::Background => Self::Background,
194 PbCreateType::Foreground => Self::Foreground,
195 PbCreateType::Unspecified => unreachable!("Unspecified create type"),
196 }
197 }
198}
199
200impl CreateType {
201 pub fn as_str(&self) -> &'static str {
202 match self {
203 CreateType::Background => "BACKGROUND",
204 CreateType::Foreground => "FOREGROUND",
205 }
206 }
207}
208
209#[macro_export]
211macro_rules! derive_from_json_struct {
212 ($struct_name:ident, $field_type:ty) => {
213 #[derive(Clone, Debug, PartialEq, FromJsonQueryResult, Serialize, Deserialize, Default)]
214 pub struct $struct_name(pub $field_type);
215 impl Eq for $struct_name {}
216 impl From<$field_type> for $struct_name {
217 fn from(value: $field_type) -> Self {
218 Self(value)
219 }
220 }
221
222 impl $struct_name {
223 pub fn into_inner(self) -> $field_type {
224 self.0
225 }
226
227 pub fn inner_ref(&self) -> &$field_type {
228 &self.0
229 }
230 }
231 };
232}
233
234macro_rules! derive_from_blob {
236 ($struct_name:ident, $field_type:ty) => {
237 #[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, DeriveValueType)]
238 pub struct $struct_name(#[sea_orm] Vec<u8>);
239
240 impl $struct_name {
241 pub fn to_protobuf(&self) -> $field_type {
242 prost::Message::decode(self.0.as_slice()).unwrap()
243 }
244
245 fn from_protobuf(val: &$field_type) -> Self {
246 Self(prost::Message::encode_to_vec(val))
247 }
248 }
249
250 impl sea_orm::sea_query::Nullable for $struct_name {
251 fn null() -> Value {
252 Value::Bytes(None)
253 }
254 }
255
256 impl From<&$field_type> for $struct_name {
257 fn from(value: &$field_type) -> Self {
258 Self::from_protobuf(value)
259 }
260 }
261
262 impl std::fmt::Debug for $struct_name {
263 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
264 self.to_protobuf().fmt(f)
265 }
266 }
267
268 impl Default for $struct_name {
269 fn default() -> Self {
270 Self::from_protobuf(&<$field_type>::default())
271 }
272 }
273 };
274}
275
276macro_rules! derive_array_from_blob {
278 ($struct_name:ident, $field_type:ty, $field_array_name:ident) => {
279 #[derive(Clone, PartialEq, Eq, DeriveValueType, serde::Deserialize, serde::Serialize)]
280 pub struct $struct_name(#[sea_orm] Vec<u8>);
281
282 #[derive(Clone, PartialEq, ::prost::Message)]
283 pub struct $field_array_name {
284 #[prost(message, repeated, tag = "1")]
285 inner: Vec<$field_type>,
286 }
287 impl Eq for $field_array_name {}
288
289 impl $struct_name {
290 pub fn to_protobuf(&self) -> Vec<$field_type> {
291 let data: $field_array_name = prost::Message::decode(self.0.as_slice()).unwrap();
292 data.inner
293 }
294
295 fn from_protobuf(val: Vec<$field_type>) -> Self {
296 Self(prost::Message::encode_to_vec(&$field_array_name {
297 inner: val,
298 }))
299 }
300 }
301
302 impl From<Vec<$field_type>> for $struct_name {
303 fn from(value: Vec<$field_type>) -> Self {
304 Self::from_protobuf(value)
305 }
306 }
307
308 impl std::fmt::Debug for $struct_name {
309 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
310 self.to_protobuf().fmt(f)
311 }
312 }
313
314 impl Default for $struct_name {
315 fn default() -> Self {
316 Self(vec![])
317 }
318 }
319
320 impl sea_orm::sea_query::Nullable for $struct_name {
321 fn null() -> Value {
322 Value::Bytes(None)
323 }
324 }
325 };
326}
327
328macro_rules! derive_btreemap_from_blob {
329 ($struct_name:ident, $key_type:ty, $value_type:ty, $field_type:ident) => {
330 #[derive(Clone, PartialEq, Eq, DeriveValueType, serde::Deserialize, serde::Serialize)]
331 pub struct $struct_name(#[sea_orm] Vec<u8>);
332
333 #[derive(Clone, PartialEq, ::prost::Message)]
334 pub struct $field_type {
335 #[prost(btree_map = "string, message")]
336 inner: BTreeMap<$key_type, $value_type>,
337 }
338 impl Eq for $field_type {}
339
340 impl $struct_name {
341 pub fn to_protobuf(&self) -> BTreeMap<$key_type, $value_type> {
342 let data: $field_type = prost::Message::decode(self.0.as_slice()).unwrap();
343 data.inner
344 }
345
346 fn from_protobuf(val: BTreeMap<$key_type, $value_type>) -> Self {
347 Self(prost::Message::encode_to_vec(&$field_type { inner: val }))
348 }
349 }
350
351 impl From<BTreeMap<$key_type, $value_type>> for $struct_name {
352 fn from(value: BTreeMap<$key_type, $value_type>) -> Self {
353 Self::from_protobuf(value)
354 }
355 }
356
357 impl std::fmt::Debug for $struct_name {
358 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
359 self.to_protobuf().fmt(f)
360 }
361 }
362
363 impl Default for $struct_name {
364 fn default() -> Self {
365 Self(vec![])
366 }
367 }
368
369 impl sea_orm::sea_query::Nullable for $struct_name {
370 fn null() -> Value {
371 Value::Bytes(None)
372 }
373 }
374 };
375}
376
377pub(crate) use {derive_array_from_blob, derive_from_blob};
378
379derive_from_json_struct!(TableIdArray, Vec<TableId>);
380
381derive_from_json_struct!(EpochArray, Vec<Epoch>);
382
383derive_from_json_struct!(I32Array, Vec<i32>);
384
385impl From<Vec<u32>> for I32Array {
386 fn from(value: Vec<u32>) -> Self {
387 Self(value.into_iter().map(|id| id as _).collect())
388 }
389}
390
391impl I32Array {
392 pub fn into_u32_array(self) -> Vec<u32> {
393 self.0.into_iter().map(|id| id as _).collect()
394 }
395}
396
397derive_btreemap_from_blob!(SecretRef, String, PbSecretRef, PbSecretRefMap);
398
399derive_from_blob!(StreamNode, PbStreamNode);
400derive_from_blob!(DataType, risingwave_pb::data::PbDataType);
401derive_array_from_blob!(
402 DataTypeArray,
403 risingwave_pb::data::PbDataType,
404 PbDataTypeArray
405);
406derive_array_from_blob!(
407 FieldArray,
408 risingwave_pb::plan_common::PbField,
409 PbFieldArray
410);
411derive_from_json_struct!(Property, BTreeMap<String, String>);
412derive_from_blob!(ColumnCatalog, risingwave_pb::plan_common::PbColumnCatalog);
413derive_array_from_blob!(
414 ColumnCatalogArray,
415 risingwave_pb::plan_common::PbColumnCatalog,
416 PbColumnCatalogArray
417);
418derive_from_blob!(StreamSourceInfo, risingwave_pb::catalog::PbStreamSourceInfo);
419derive_from_blob!(
420 WebhookSourceInfo,
421 risingwave_pb::catalog::PbWebhookSourceInfo
422);
423derive_from_blob!(WatermarkDesc, risingwave_pb::catalog::PbWatermarkDesc);
424derive_array_from_blob!(
425 WatermarkDescArray,
426 risingwave_pb::catalog::PbWatermarkDesc,
427 PbWatermarkDescArray
428);
429derive_array_from_blob!(
430 ExprNodeArray,
431 risingwave_pb::expr::PbExprNode,
432 PbExprNodeArray
433);
434derive_array_from_blob!(
435 ColumnOrderArray,
436 risingwave_pb::common::PbColumnOrder,
437 PbColumnOrderArray
438);
439derive_array_from_blob!(
440 IndexColumnPropertiesArray,
441 risingwave_pb::catalog::PbIndexColumnProperties,
442 PbIndexColumnPropertiesArray
443);
444derive_from_blob!(SinkFormatDesc, risingwave_pb::catalog::PbSinkFormatDesc);
445derive_from_blob!(Cardinality, risingwave_pb::plan_common::PbCardinality);
446derive_from_blob!(TableVersion, risingwave_pb::catalog::table::PbTableVersion);
447derive_from_blob!(
448 PrivateLinkService,
449 risingwave_pb::catalog::connection::PbPrivateLinkService
450);
451derive_from_blob!(ConnectionParams, risingwave_pb::catalog::ConnectionParams);
452derive_from_blob!(AuthInfo, risingwave_pb::user::PbAuthInfo);
453
454derive_from_blob!(ConnectorSplits, risingwave_pb::source::ConnectorSplits);
455derive_from_blob!(VnodeBitmap, risingwave_pb::common::Buffer);
456derive_from_blob!(ActorMapping, risingwave_pb::stream_plan::PbActorMapping);
457derive_from_blob!(ExprContext, risingwave_pb::plan_common::PbExprContext);
458derive_from_blob!(
459 SourceRefreshMode,
460 risingwave_pb::plan_common::PbSourceRefreshMode
461);
462
463derive_from_blob!(
464 SinkSchemachange,
465 risingwave_pb::stream_plan::PbSinkSchemaChange
466);
467
468derive_array_from_blob!(
469 TypePairArray,
470 risingwave_pb::stream_plan::dispatch_output_mapping::TypePair,
471 PbTypePairArray
472);
473
474derive_array_from_blob!(
475 HummockVersionDeltaArray,
476 risingwave_pb::hummock::PbHummockVersionDelta,
477 PbHummockVersionDeltaArray
478);
479
480derive_array_from_blob!(
481 SstableInfoArray,
482 risingwave_pb::hummock::PbSstableInfo,
483 PbSstableInfoArray
484);
485
486#[derive(Clone, Debug, PartialEq, FromJsonQueryResult, Serialize, Deserialize)]
487pub enum StreamingParallelism {
488 Adaptive,
489 Fixed(usize),
490 Custom,
491}
492
493impl Eq for StreamingParallelism {}
494
495#[derive(
496 Hash, Copy, Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize,
497)]
498#[sea_orm(rs_type = "String", db_type = "string(None)")]
499pub enum DispatcherType {
500 #[sea_orm(string_value = "HASH")]
501 Hash,
502 #[sea_orm(string_value = "BROADCAST")]
503 Broadcast,
504 #[sea_orm(string_value = "SIMPLE")]
505 Simple,
506 #[sea_orm(string_value = "NO_SHUFFLE")]
507 NoShuffle,
508}
509
510impl From<PbDispatcherType> for DispatcherType {
511 fn from(val: PbDispatcherType) -> Self {
512 match val {
513 PbDispatcherType::Unspecified => unreachable!(),
514 PbDispatcherType::Hash => DispatcherType::Hash,
515 PbDispatcherType::Broadcast => DispatcherType::Broadcast,
516 PbDispatcherType::Simple => DispatcherType::Simple,
517 PbDispatcherType::NoShuffle => DispatcherType::NoShuffle,
518 }
519 }
520}
521
522impl From<DispatcherType> for PbDispatcherType {
523 fn from(val: DispatcherType) -> Self {
524 match val {
525 DispatcherType::Hash => PbDispatcherType::Hash,
526 DispatcherType::Broadcast => PbDispatcherType::Broadcast,
527 DispatcherType::Simple => PbDispatcherType::Simple,
528 DispatcherType::NoShuffle => PbDispatcherType::NoShuffle,
529 }
530 }
531}