1use std::collections::BTreeMap;
16
17pub use risingwave_common::id::*;
18use risingwave_pb::catalog::{PbCreateType, PbStreamJobStatus};
19use risingwave_pb::meta::table_fragments::PbState as PbStreamJobState;
20use risingwave_pb::secret::PbSecretRef;
21use risingwave_pb::stream_plan::{PbDispatcherType, PbStreamNode};
22use sea_orm::entity::prelude::*;
23use sea_orm::{DeriveActiveEnum, EnumIter, FromJsonQueryResult, Value};
24use serde::{Deserialize, Serialize};
25
26pub mod prelude;
27
28pub mod catalog_version;
29pub mod cdc_table_snapshot_split;
30pub mod cluster;
31pub mod compaction_config;
32pub mod compaction_status;
33pub mod compaction_task;
34pub mod connection;
35pub mod database;
36pub mod exactly_once_iceberg_sink;
37pub mod fragment;
38pub mod fragment_relation;
39pub mod fragment_splits;
40pub mod function;
41pub mod hummock_epoch_to_version;
42pub mod hummock_gc_history;
43pub mod hummock_pinned_snapshot;
44pub mod hummock_pinned_version;
45pub mod hummock_sequence;
46pub mod hummock_sstable_info;
47pub mod hummock_table_change_log;
48pub mod hummock_time_travel_delta;
49pub mod hummock_time_travel_version;
50pub mod hummock_version_delta;
51pub mod hummock_version_stats;
52pub mod iceberg_namespace_properties;
53pub mod iceberg_tables;
54pub mod index;
55pub mod object;
56pub mod object_dependency;
57pub mod pending_sink_state;
58pub mod refresh_job;
59pub mod schema;
60pub mod secret;
61pub mod serde_seaql_migration;
62pub mod session_parameter;
63pub mod sink;
64pub mod source;
65pub mod streaming_job;
66pub mod subscription;
67pub mod system_parameter;
68pub mod table;
69pub mod user;
70pub mod user_default_privilege;
71pub mod user_privilege;
72pub mod view;
73pub mod worker;
74pub mod worker_property;
75
76pub type TransactionId = i32;
77
78pub type PrivilegeId = i32;
79pub type DefaultPrivilegeId = i32;
80
81pub use risingwave_pb::id::{CompactionGroupId, HummockSstableObjectId, HummockVersionId};
82pub type Epoch = i64;
83pub type CompactionTaskId = i64;
84
85#[derive(Clone, Copy, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
86#[sea_orm(rs_type = "String", db_type = "string(None)")]
87pub enum JobStatus {
88 #[sea_orm(string_value = "INITIAL")]
89 Initial,
90 #[sea_orm(string_value = "CREATING")]
91 Creating,
92 #[sea_orm(string_value = "CREATED")]
93 Created,
94}
95
96impl From<JobStatus> for PbStreamJobStatus {
97 fn from(job_status: JobStatus) -> Self {
98 match job_status {
99 JobStatus::Initial => Self::Unspecified,
100 JobStatus::Creating => Self::Creating,
101 JobStatus::Created => Self::Created,
102 }
103 }
104}
105
106impl From<JobStatus> for PbStreamJobState {
108 fn from(status: JobStatus) -> Self {
109 match status {
110 JobStatus::Initial => PbStreamJobState::Initial,
111 JobStatus::Creating => PbStreamJobState::Creating,
112 JobStatus::Created => PbStreamJobState::Created,
113 }
114 }
115}
116
117#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
118#[sea_orm(rs_type = "String", db_type = "string(None)")]
119pub enum CreateType {
120 #[sea_orm(string_value = "BACKGROUND")]
121 Background,
122 #[sea_orm(string_value = "FOREGROUND")]
123 Foreground,
124}
125
126impl From<CreateType> for PbCreateType {
127 fn from(create_type: CreateType) -> Self {
128 match create_type {
129 CreateType::Background => Self::Background,
130 CreateType::Foreground => Self::Foreground,
131 }
132 }
133}
134
135impl From<PbCreateType> for CreateType {
136 fn from(create_type: PbCreateType) -> Self {
137 match create_type {
138 PbCreateType::Background => Self::Background,
139 PbCreateType::Foreground => Self::Foreground,
140 PbCreateType::Unspecified => unreachable!("Unspecified create type"),
141 }
142 }
143}
144
145impl CreateType {
146 pub fn as_str(&self) -> &'static str {
147 match self {
148 CreateType::Background => "BACKGROUND",
149 CreateType::Foreground => "FOREGROUND",
150 }
151 }
152}
153
154#[macro_export]
156macro_rules! derive_from_json_struct {
157 ($struct_name:ident, $field_type:ty) => {
158 #[derive(Clone, Debug, PartialEq, FromJsonQueryResult, Serialize, Deserialize, Default)]
159 pub struct $struct_name(pub $field_type);
160 impl Eq for $struct_name {}
161 impl From<$field_type> for $struct_name {
162 fn from(value: $field_type) -> Self {
163 Self(value)
164 }
165 }
166
167 impl $struct_name {
168 pub fn into_inner(self) -> $field_type {
169 self.0
170 }
171
172 pub fn inner_ref(&self) -> &$field_type {
173 &self.0
174 }
175 }
176 };
177}
178
179macro_rules! derive_from_blob {
181 ($struct_name:ident, $field_type:ty) => {
182 #[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, DeriveValueType)]
183 pub struct $struct_name(#[sea_orm] Vec<u8>);
184
185 impl $struct_name {
186 pub fn to_protobuf(&self) -> $field_type {
187 prost::Message::decode(self.0.as_slice()).unwrap()
188 }
189
190 fn from_protobuf(val: &$field_type) -> Self {
191 Self(prost::Message::encode_to_vec(val))
192 }
193 }
194
195 impl sea_orm::sea_query::Nullable for $struct_name {
196 fn null() -> Value {
197 Value::Bytes(None)
198 }
199 }
200
201 impl From<&$field_type> for $struct_name {
202 fn from(value: &$field_type) -> Self {
203 Self::from_protobuf(value)
204 }
205 }
206
207 impl std::fmt::Debug for $struct_name {
208 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
209 self.to_protobuf().fmt(f)
210 }
211 }
212
213 impl Default for $struct_name {
214 fn default() -> Self {
215 Self::from_protobuf(&<$field_type>::default())
216 }
217 }
218 };
219}
220
221macro_rules! derive_array_from_blob {
223 ($struct_name:ident, $field_type:ty, $field_array_name:ident) => {
224 #[derive(Clone, PartialEq, Eq, DeriveValueType, serde::Deserialize, serde::Serialize)]
225 pub struct $struct_name(#[sea_orm] Vec<u8>);
226
227 #[derive(Clone, PartialEq, ::prost::Message)]
228 pub struct $field_array_name {
229 #[prost(message, repeated, tag = "1")]
230 inner: Vec<$field_type>,
231 }
232 impl Eq for $field_array_name {}
233
234 impl $struct_name {
235 pub fn to_protobuf(&self) -> Vec<$field_type> {
236 let data: $field_array_name = prost::Message::decode(self.0.as_slice()).unwrap();
237 data.inner
238 }
239
240 fn from_protobuf(val: Vec<$field_type>) -> Self {
241 Self(prost::Message::encode_to_vec(&$field_array_name {
242 inner: val,
243 }))
244 }
245 }
246
247 impl From<Vec<$field_type>> for $struct_name {
248 fn from(value: Vec<$field_type>) -> Self {
249 Self::from_protobuf(value)
250 }
251 }
252
253 impl std::fmt::Debug for $struct_name {
254 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
255 self.to_protobuf().fmt(f)
256 }
257 }
258
259 impl Default for $struct_name {
260 fn default() -> Self {
261 Self(vec![])
262 }
263 }
264
265 impl sea_orm::sea_query::Nullable for $struct_name {
266 fn null() -> Value {
267 Value::Bytes(None)
268 }
269 }
270 };
271}
272
273macro_rules! derive_btreemap_from_blob {
274 ($struct_name:ident, $key_type:ty, $value_type:ty, $field_type:ident) => {
275 #[derive(Clone, PartialEq, Eq, DeriveValueType, serde::Deserialize, serde::Serialize)]
276 pub struct $struct_name(#[sea_orm] Vec<u8>);
277
278 #[derive(Clone, PartialEq, ::prost::Message)]
279 pub struct $field_type {
280 #[prost(btree_map = "string, message")]
281 inner: BTreeMap<$key_type, $value_type>,
282 }
283 impl Eq for $field_type {}
284
285 impl $struct_name {
286 pub fn to_protobuf(&self) -> BTreeMap<$key_type, $value_type> {
287 let data: $field_type = prost::Message::decode(self.0.as_slice()).unwrap();
288 data.inner
289 }
290
291 fn from_protobuf(val: BTreeMap<$key_type, $value_type>) -> Self {
292 Self(prost::Message::encode_to_vec(&$field_type { inner: val }))
293 }
294 }
295
296 impl From<BTreeMap<$key_type, $value_type>> for $struct_name {
297 fn from(value: BTreeMap<$key_type, $value_type>) -> Self {
298 Self::from_protobuf(value)
299 }
300 }
301
302 impl std::fmt::Debug for $struct_name {
303 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
304 self.to_protobuf().fmt(f)
305 }
306 }
307
308 impl Default for $struct_name {
309 fn default() -> Self {
310 Self(vec![])
311 }
312 }
313
314 impl sea_orm::sea_query::Nullable for $struct_name {
315 fn null() -> Value {
316 Value::Bytes(None)
317 }
318 }
319 };
320}
321
322pub(crate) use {derive_array_from_blob, derive_from_blob};
323
324derive_from_json_struct!(TableIdArray, Vec<TableId>);
325
326derive_from_json_struct!(EpochArray, Vec<Epoch>);
327
328derive_from_json_struct!(I32Array, Vec<i32>);
329
330impl From<Vec<u32>> for I32Array {
331 fn from(value: Vec<u32>) -> Self {
332 Self(value.into_iter().map(|id| id as _).collect())
333 }
334}
335
336impl I32Array {
337 pub fn into_u32_array(self) -> Vec<u32> {
338 self.0.into_iter().map(|id| id as _).collect()
339 }
340}
341
342derive_btreemap_from_blob!(SecretRef, String, PbSecretRef, PbSecretRefMap);
343
344derive_from_blob!(StreamNode, PbStreamNode);
345derive_from_blob!(DataType, risingwave_pb::data::PbDataType);
346derive_array_from_blob!(
347 DataTypeArray,
348 risingwave_pb::data::PbDataType,
349 PbDataTypeArray
350);
351derive_array_from_blob!(
352 FieldArray,
353 risingwave_pb::plan_common::PbField,
354 PbFieldArray
355);
356derive_from_json_struct!(Property, BTreeMap<String, String>);
357derive_from_blob!(ColumnCatalog, risingwave_pb::plan_common::PbColumnCatalog);
358derive_array_from_blob!(
359 ColumnCatalogArray,
360 risingwave_pb::plan_common::PbColumnCatalog,
361 PbColumnCatalogArray
362);
363derive_from_blob!(StreamSourceInfo, risingwave_pb::catalog::PbStreamSourceInfo);
364derive_from_blob!(
365 WebhookSourceInfo,
366 risingwave_pb::catalog::PbWebhookSourceInfo
367);
368derive_from_blob!(WatermarkDesc, risingwave_pb::catalog::PbWatermarkDesc);
369derive_array_from_blob!(
370 WatermarkDescArray,
371 risingwave_pb::catalog::PbWatermarkDesc,
372 PbWatermarkDescArray
373);
374derive_array_from_blob!(
375 ExprNodeArray,
376 risingwave_pb::expr::PbExprNode,
377 PbExprNodeArray
378);
379derive_array_from_blob!(
380 ColumnOrderArray,
381 risingwave_pb::common::PbColumnOrder,
382 PbColumnOrderArray
383);
384derive_array_from_blob!(
385 IndexColumnPropertiesArray,
386 risingwave_pb::catalog::PbIndexColumnProperties,
387 PbIndexColumnPropertiesArray
388);
389derive_from_blob!(SinkFormatDesc, risingwave_pb::catalog::PbSinkFormatDesc);
390derive_from_blob!(Cardinality, risingwave_pb::plan_common::PbCardinality);
391derive_from_blob!(TableVersion, risingwave_pb::catalog::table::PbTableVersion);
392derive_from_blob!(
393 PrivateLinkService,
394 risingwave_pb::catalog::connection::PbPrivateLinkService
395);
396derive_from_blob!(ConnectionParams, risingwave_pb::catalog::ConnectionParams);
397derive_from_blob!(AuthInfo, risingwave_pb::user::PbAuthInfo);
398
399derive_from_blob!(ConnectorSplits, risingwave_pb::source::ConnectorSplits);
400derive_from_blob!(VnodeBitmap, risingwave_pb::common::Buffer);
401derive_from_blob!(ActorMapping, risingwave_pb::stream_plan::PbActorMapping);
402derive_from_blob!(ExprContext, risingwave_pb::plan_common::PbExprContext);
403derive_from_blob!(
404 SourceRefreshMode,
405 risingwave_pb::plan_common::PbSourceRefreshMode
406);
407
408derive_from_blob!(
409 SinkSchemachange,
410 risingwave_pb::stream_plan::PbSinkSchemaChange
411);
412
413derive_array_from_blob!(
414 TypePairArray,
415 risingwave_pb::stream_plan::dispatch_output_mapping::TypePair,
416 PbTypePairArray
417);
418
419derive_array_from_blob!(
420 HummockVersionDeltaArray,
421 risingwave_pb::hummock::PbHummockVersionDelta,
422 PbHummockVersionDeltaArray
423);
424
425derive_array_from_blob!(
426 SstableInfoArray,
427 risingwave_pb::hummock::PbSstableInfo,
428 PbSstableInfoArray
429);
430
431#[derive(Clone, Debug, PartialEq, FromJsonQueryResult, Serialize, Deserialize)]
432pub enum StreamingParallelism {
433 Adaptive,
434 Fixed(usize),
435 Custom,
436}
437
438impl Eq for StreamingParallelism {}
439
440#[derive(
441 Hash, Copy, Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize,
442)]
443#[sea_orm(rs_type = "String", db_type = "string(None)")]
444pub enum DispatcherType {
445 #[sea_orm(string_value = "HASH")]
446 Hash,
447 #[sea_orm(string_value = "BROADCAST")]
448 Broadcast,
449 #[sea_orm(string_value = "SIMPLE")]
450 Simple,
451 #[sea_orm(string_value = "NO_SHUFFLE")]
452 NoShuffle,
453}
454
455impl From<PbDispatcherType> for DispatcherType {
456 fn from(val: PbDispatcherType) -> Self {
457 match val {
458 PbDispatcherType::Unspecified => unreachable!(),
459 PbDispatcherType::Hash => DispatcherType::Hash,
460 PbDispatcherType::Broadcast => DispatcherType::Broadcast,
461 PbDispatcherType::Simple => DispatcherType::Simple,
462 PbDispatcherType::NoShuffle => DispatcherType::NoShuffle,
463 }
464 }
465}
466
467impl From<DispatcherType> for PbDispatcherType {
468 fn from(val: DispatcherType) -> Self {
469 match val {
470 DispatcherType::Hash => PbDispatcherType::Hash,
471 DispatcherType::Broadcast => PbDispatcherType::Broadcast,
472 DispatcherType::Simple => PbDispatcherType::Simple,
473 DispatcherType::NoShuffle => PbDispatcherType::NoShuffle,
474 }
475 }
476}