1use std::collections::BTreeMap;
16
17use risingwave_pb::catalog::{PbCreateType, PbStreamJobStatus};
18use risingwave_pb::meta::table_fragments::PbState as PbStreamJobState;
19use risingwave_pb::secret::PbSecretRef;
20use risingwave_pb::stream_plan::{PbDispatcherType, PbStreamNode};
21use sea_orm::entity::prelude::*;
22use sea_orm::{DeriveActiveEnum, EnumIter, FromJsonQueryResult};
23use serde::{Deserialize, Serialize};
24
25pub mod prelude;
26
27pub mod actor;
28pub mod catalog_version;
29pub mod cdc_table_snapshot_split;
30pub mod cluster;
31pub mod compaction_config;
32pub mod compaction_status;
33pub mod compaction_task;
34pub mod connection;
35pub mod database;
36pub mod exactly_once_iceberg_sink;
37pub mod fragment;
38pub mod fragment_relation;
39pub mod function;
40pub mod hummock_epoch_to_version;
41pub mod hummock_gc_history;
42pub mod hummock_pinned_snapshot;
43pub mod hummock_pinned_version;
44pub mod hummock_sequence;
45pub mod hummock_sstable_info;
46pub mod hummock_time_travel_delta;
47pub mod hummock_time_travel_version;
48pub mod hummock_version_delta;
49pub mod hummock_version_stats;
50pub mod iceberg_namespace_properties;
51pub mod iceberg_tables;
52pub mod index;
53pub mod object;
54pub mod object_dependency;
55pub mod schema;
56pub mod secret;
57pub mod serde_seaql_migration;
58pub mod session_parameter;
59pub mod sink;
60pub mod source;
61pub mod streaming_job;
62pub mod subscription;
63pub mod system_parameter;
64pub mod table;
65pub mod user;
66pub mod user_default_privilege;
67pub mod user_privilege;
68pub mod view;
69pub mod worker;
70pub mod worker_property;
71
72pub type WorkerId = i32;
73
74pub type TransactionId = i32;
75
76pub type ObjectId = i32;
77pub type DatabaseId = ObjectId;
78pub type SchemaId = ObjectId;
79pub type TableId = ObjectId;
80pub type SourceId = ObjectId;
81pub type SinkId = ObjectId;
82pub type SubscriptionId = ObjectId;
83pub type IndexId = ObjectId;
84pub type ViewId = ObjectId;
85pub type FunctionId = ObjectId;
86pub type ConnectionId = ObjectId;
87pub type SecretId = ObjectId;
88pub type UserId = i32;
89pub type PrivilegeId = i32;
90pub type DefaultPrivilegeId = i32;
91
92pub type HummockVersionId = i64;
93pub type Epoch = i64;
94pub type CompactionGroupId = i64;
95pub type CompactionTaskId = i64;
96pub type HummockSstableObjectId = i64;
97
98pub type FragmentId = i32;
99pub type ActorId = i32;
100
101#[derive(Clone, Copy, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
102#[sea_orm(rs_type = "String", db_type = "string(None)")]
103pub enum JobStatus {
104 #[sea_orm(string_value = "INITIAL")]
105 Initial,
106 #[sea_orm(string_value = "CREATING")]
107 Creating,
108 #[sea_orm(string_value = "CREATED")]
109 Created,
110}
111
112impl From<JobStatus> for PbStreamJobStatus {
113 fn from(job_status: JobStatus) -> Self {
114 match job_status {
115 JobStatus::Initial => Self::Unspecified,
116 JobStatus::Creating => Self::Creating,
117 JobStatus::Created => Self::Created,
118 }
119 }
120}
121
122impl From<JobStatus> for PbStreamJobState {
124 fn from(status: JobStatus) -> Self {
125 match status {
126 JobStatus::Initial => PbStreamJobState::Initial,
127 JobStatus::Creating => PbStreamJobState::Creating,
128 JobStatus::Created => PbStreamJobState::Created,
129 }
130 }
131}
132
133#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
134#[sea_orm(rs_type = "String", db_type = "string(None)")]
135pub enum CreateType {
136 #[sea_orm(string_value = "BACKGROUND")]
137 Background,
138 #[sea_orm(string_value = "FOREGROUND")]
139 Foreground,
140}
141
142impl From<CreateType> for PbCreateType {
143 fn from(create_type: CreateType) -> Self {
144 match create_type {
145 CreateType::Background => Self::Background,
146 CreateType::Foreground => Self::Foreground,
147 }
148 }
149}
150
151impl From<PbCreateType> for CreateType {
152 fn from(create_type: PbCreateType) -> Self {
153 match create_type {
154 PbCreateType::Background => Self::Background,
155 PbCreateType::Foreground => Self::Foreground,
156 PbCreateType::Unspecified => unreachable!("Unspecified create type"),
157 }
158 }
159}
160
161impl CreateType {
162 pub fn as_str(&self) -> &'static str {
163 match self {
164 CreateType::Background => "BACKGROUND",
165 CreateType::Foreground => "FOREGROUND",
166 }
167 }
168}
169
170macro_rules! derive_from_json_struct {
172 ($struct_name:ident, $field_type:ty) => {
173 #[derive(Clone, Debug, PartialEq, FromJsonQueryResult, Serialize, Deserialize, Default)]
174 pub struct $struct_name(pub $field_type);
175 impl Eq for $struct_name {}
176 impl From<$field_type> for $struct_name {
177 fn from(value: $field_type) -> Self {
178 Self(value)
179 }
180 }
181
182 impl $struct_name {
183 pub fn into_inner(self) -> $field_type {
184 self.0
185 }
186
187 pub fn inner_ref(&self) -> &$field_type {
188 &self.0
189 }
190 }
191 };
192}
193
194macro_rules! derive_from_blob {
196 ($struct_name:ident, $field_type:ty) => {
197 #[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, DeriveValueType)]
198 pub struct $struct_name(#[sea_orm] Vec<u8>);
199
200 impl $struct_name {
201 pub fn to_protobuf(&self) -> $field_type {
202 prost::Message::decode(self.0.as_slice()).unwrap()
203 }
204
205 fn from_protobuf(val: &$field_type) -> Self {
206 Self(prost::Message::encode_to_vec(val))
207 }
208 }
209
210 impl sea_orm::sea_query::Nullable for $struct_name {
211 fn null() -> Value {
212 Value::Bytes(None)
213 }
214 }
215
216 impl From<&$field_type> for $struct_name {
217 fn from(value: &$field_type) -> Self {
218 Self::from_protobuf(value)
219 }
220 }
221
222 impl std::fmt::Debug for $struct_name {
223 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
224 self.to_protobuf().fmt(f)
225 }
226 }
227
228 impl Default for $struct_name {
229 fn default() -> Self {
230 Self::from_protobuf(&<$field_type>::default())
231 }
232 }
233 };
234}
235
236macro_rules! derive_array_from_blob {
238 ($struct_name:ident, $field_type:ty, $field_array_name:ident) => {
239 #[derive(Clone, PartialEq, Eq, DeriveValueType, serde::Deserialize, serde::Serialize)]
240 pub struct $struct_name(#[sea_orm] Vec<u8>);
241
242 #[derive(Clone, PartialEq, ::prost::Message)]
243 pub struct $field_array_name {
244 #[prost(message, repeated, tag = "1")]
245 inner: Vec<$field_type>,
246 }
247 impl Eq for $field_array_name {}
248
249 impl $struct_name {
250 pub fn to_protobuf(&self) -> Vec<$field_type> {
251 let data: $field_array_name = prost::Message::decode(self.0.as_slice()).unwrap();
252 data.inner
253 }
254
255 fn from_protobuf(val: Vec<$field_type>) -> Self {
256 Self(prost::Message::encode_to_vec(&$field_array_name {
257 inner: val,
258 }))
259 }
260 }
261
262 impl From<Vec<$field_type>> for $struct_name {
263 fn from(value: Vec<$field_type>) -> Self {
264 Self::from_protobuf(value)
265 }
266 }
267
268 impl std::fmt::Debug for $struct_name {
269 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
270 self.to_protobuf().fmt(f)
271 }
272 }
273
274 impl Default for $struct_name {
275 fn default() -> Self {
276 Self(vec![])
277 }
278 }
279
280 impl sea_orm::sea_query::Nullable for $struct_name {
281 fn null() -> Value {
282 Value::Bytes(None)
283 }
284 }
285 };
286}
287
288macro_rules! derive_btreemap_from_blob {
289 ($struct_name:ident, $key_type:ty, $value_type:ty, $field_type:ident) => {
290 #[derive(Clone, PartialEq, Eq, DeriveValueType, serde::Deserialize, serde::Serialize)]
291 pub struct $struct_name(#[sea_orm] Vec<u8>);
292
293 #[derive(Clone, PartialEq, ::prost::Message)]
294 pub struct $field_type {
295 #[prost(btree_map = "string, message")]
296 inner: BTreeMap<$key_type, $value_type>,
297 }
298 impl Eq for $field_type {}
299
300 impl $struct_name {
301 pub fn to_protobuf(&self) -> BTreeMap<$key_type, $value_type> {
302 let data: $field_type = prost::Message::decode(self.0.as_slice()).unwrap();
303 data.inner
304 }
305
306 fn from_protobuf(val: BTreeMap<$key_type, $value_type>) -> Self {
307 Self(prost::Message::encode_to_vec(&$field_type { inner: val }))
308 }
309 }
310
311 impl From<BTreeMap<$key_type, $value_type>> for $struct_name {
312 fn from(value: BTreeMap<$key_type, $value_type>) -> Self {
313 Self::from_protobuf(value)
314 }
315 }
316
317 impl std::fmt::Debug for $struct_name {
318 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
319 self.to_protobuf().fmt(f)
320 }
321 }
322
323 impl Default for $struct_name {
324 fn default() -> Self {
325 Self(vec![])
326 }
327 }
328
329 impl sea_orm::sea_query::Nullable for $struct_name {
330 fn null() -> Value {
331 Value::Bytes(None)
332 }
333 }
334 };
335}
336
337pub(crate) use {derive_array_from_blob, derive_from_blob};
338
339derive_from_json_struct!(I32Array, Vec<i32>);
340
341impl From<Vec<u32>> for I32Array {
342 fn from(value: Vec<u32>) -> Self {
343 Self(value.into_iter().map(|id| id as _).collect())
344 }
345}
346
347impl I32Array {
348 pub fn into_u32_array(self) -> Vec<u32> {
349 self.0.into_iter().map(|id| id as _).collect()
350 }
351}
352
353derive_from_json_struct!(ActorUpstreamActors, BTreeMap<FragmentId, Vec<ActorId>>);
354
355impl From<BTreeMap<u32, Vec<u32>>> for ActorUpstreamActors {
356 fn from(val: BTreeMap<u32, Vec<u32>>) -> Self {
357 let mut map = BTreeMap::new();
358 for (k, v) in val {
359 map.insert(k as _, v.into_iter().map(|a| a as _).collect());
360 }
361 Self(map)
362 }
363}
364
365derive_btreemap_from_blob!(SecretRef, String, PbSecretRef, PbSecretRefMap);
366
367derive_from_blob!(StreamNode, PbStreamNode);
368derive_from_blob!(DataType, risingwave_pb::data::PbDataType);
369derive_array_from_blob!(
370 DataTypeArray,
371 risingwave_pb::data::PbDataType,
372 PbDataTypeArray
373);
374derive_array_from_blob!(
375 FieldArray,
376 risingwave_pb::plan_common::PbField,
377 PbFieldArray
378);
379derive_from_json_struct!(Property, BTreeMap<String, String>);
380derive_from_blob!(ColumnCatalog, risingwave_pb::plan_common::PbColumnCatalog);
381derive_array_from_blob!(
382 ColumnCatalogArray,
383 risingwave_pb::plan_common::PbColumnCatalog,
384 PbColumnCatalogArray
385);
386derive_from_blob!(StreamSourceInfo, risingwave_pb::catalog::PbStreamSourceInfo);
387derive_from_blob!(
388 WebhookSourceInfo,
389 risingwave_pb::catalog::PbWebhookSourceInfo
390);
391derive_from_blob!(WatermarkDesc, risingwave_pb::catalog::PbWatermarkDesc);
392derive_array_from_blob!(
393 WatermarkDescArray,
394 risingwave_pb::catalog::PbWatermarkDesc,
395 PbWatermarkDescArray
396);
397derive_array_from_blob!(
398 ExprNodeArray,
399 risingwave_pb::expr::PbExprNode,
400 PbExprNodeArray
401);
402derive_array_from_blob!(
403 ColumnOrderArray,
404 risingwave_pb::common::PbColumnOrder,
405 PbColumnOrderArray
406);
407derive_array_from_blob!(
408 IndexColumnPropertiesArray,
409 risingwave_pb::catalog::PbIndexColumnProperties,
410 PbIndexColumnPropertiesArray
411);
412derive_from_blob!(SinkFormatDesc, risingwave_pb::catalog::PbSinkFormatDesc);
413derive_from_blob!(Cardinality, risingwave_pb::plan_common::PbCardinality);
414derive_from_blob!(TableVersion, risingwave_pb::catalog::table::PbTableVersion);
415derive_from_blob!(
416 PrivateLinkService,
417 risingwave_pb::catalog::connection::PbPrivateLinkService
418);
419derive_from_blob!(ConnectionParams, risingwave_pb::catalog::ConnectionParams);
420derive_from_blob!(AuthInfo, risingwave_pb::user::PbAuthInfo);
421
422derive_from_blob!(ConnectorSplits, risingwave_pb::source::ConnectorSplits);
423derive_from_blob!(VnodeBitmap, risingwave_pb::common::Buffer);
424derive_from_blob!(ActorMapping, risingwave_pb::stream_plan::PbActorMapping);
425derive_from_blob!(ExprContext, risingwave_pb::plan_common::PbExprContext);
426
427derive_array_from_blob!(
428 TypePairArray,
429 risingwave_pb::stream_plan::dispatch_output_mapping::TypePair,
430 PbTypePairArray
431);
432
433derive_array_from_blob!(
434 HummockVersionDeltaArray,
435 risingwave_pb::hummock::PbHummockVersionDelta,
436 PbHummockVersionDeltaArray
437);
438
439#[derive(Clone, Debug, PartialEq, FromJsonQueryResult, Serialize, Deserialize)]
440pub enum StreamingParallelism {
441 Adaptive,
442 Fixed(usize),
443 Custom,
444}
445
446impl Eq for StreamingParallelism {}
447
448#[derive(
449 Hash, Copy, Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize,
450)]
451#[sea_orm(rs_type = "String", db_type = "string(None)")]
452pub enum DispatcherType {
453 #[sea_orm(string_value = "HASH")]
454 Hash,
455 #[sea_orm(string_value = "BROADCAST")]
456 Broadcast,
457 #[sea_orm(string_value = "SIMPLE")]
458 Simple,
459 #[sea_orm(string_value = "NO_SHUFFLE")]
460 NoShuffle,
461}
462
463impl From<PbDispatcherType> for DispatcherType {
464 fn from(val: PbDispatcherType) -> Self {
465 match val {
466 PbDispatcherType::Unspecified => unreachable!(),
467 PbDispatcherType::Hash => DispatcherType::Hash,
468 PbDispatcherType::Broadcast => DispatcherType::Broadcast,
469 PbDispatcherType::Simple => DispatcherType::Simple,
470 PbDispatcherType::NoShuffle => DispatcherType::NoShuffle,
471 }
472 }
473}
474
475impl From<DispatcherType> for PbDispatcherType {
476 fn from(val: DispatcherType) -> Self {
477 match val {
478 DispatcherType::Hash => PbDispatcherType::Hash,
479 DispatcherType::Broadcast => PbDispatcherType::Broadcast,
480 DispatcherType::Simple => PbDispatcherType::Simple,
481 DispatcherType::NoShuffle => PbDispatcherType::NoShuffle,
482 }
483 }
484}