1use std::collections::BTreeMap;
16
17use risingwave_pb::catalog::{PbCreateType, PbStreamJobStatus};
18use risingwave_pb::meta::table_fragments::PbState as PbStreamJobState;
19use risingwave_pb::secret::PbSecretRef;
20use risingwave_pb::stream_plan::{PbDispatcherType, PbStreamNode};
21use sea_orm::entity::prelude::*;
22use sea_orm::{DeriveActiveEnum, EnumIter, FromJsonQueryResult};
23use serde::{Deserialize, Serialize};
24
25pub mod prelude;
26
27pub mod actor;
28pub mod catalog_version;
29pub mod cdc_table_snapshot_split;
30pub mod cluster;
31pub mod compaction_config;
32pub mod compaction_status;
33pub mod compaction_task;
34pub mod connection;
35pub mod database;
36pub mod exactly_once_iceberg_sink;
37pub mod fragment;
38pub mod fragment_relation;
39pub mod fragment_splits;
40pub mod function;
41pub mod hummock_epoch_to_version;
42pub mod hummock_gc_history;
43pub mod hummock_pinned_snapshot;
44pub mod hummock_pinned_version;
45pub mod hummock_sequence;
46pub mod hummock_sstable_info;
47pub mod hummock_time_travel_delta;
48pub mod hummock_time_travel_version;
49pub mod hummock_version_delta;
50pub mod hummock_version_stats;
51pub mod iceberg_namespace_properties;
52pub mod iceberg_tables;
53pub mod index;
54pub mod object;
55pub mod object_dependency;
56pub mod schema;
57pub mod secret;
58pub mod serde_seaql_migration;
59pub mod session_parameter;
60pub mod sink;
61pub mod source;
62pub mod source_splits;
63pub mod streaming_job;
64pub mod subscription;
65pub mod system_parameter;
66pub mod table;
67pub mod user;
68pub mod user_default_privilege;
69pub mod user_privilege;
70pub mod view;
71pub mod worker;
72pub mod worker_property;
73
74pub type WorkerId = i32;
75
76pub type TransactionId = i32;
77
78pub type ObjectId = i32;
79pub type DatabaseId = ObjectId;
80pub type SchemaId = ObjectId;
81pub type TableId = ObjectId;
82pub type SourceId = ObjectId;
83pub type SinkId = ObjectId;
84pub type SubscriptionId = ObjectId;
85pub type IndexId = ObjectId;
86pub type ViewId = ObjectId;
87pub type FunctionId = ObjectId;
88pub type ConnectionId = ObjectId;
89pub type SecretId = ObjectId;
90pub type UserId = i32;
91pub type PrivilegeId = i32;
92pub type DefaultPrivilegeId = i32;
93
94pub type HummockVersionId = i64;
95pub type Epoch = i64;
96pub type CompactionGroupId = i64;
97pub type CompactionTaskId = i64;
98pub type HummockSstableObjectId = i64;
99
100pub type FragmentId = i32;
101pub type ActorId = i32;
102
103#[derive(Clone, Copy, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
104#[sea_orm(rs_type = "String", db_type = "string(None)")]
105pub enum JobStatus {
106 #[sea_orm(string_value = "INITIAL")]
107 Initial,
108 #[sea_orm(string_value = "CREATING")]
109 Creating,
110 #[sea_orm(string_value = "CREATED")]
111 Created,
112}
113
114impl From<JobStatus> for PbStreamJobStatus {
115 fn from(job_status: JobStatus) -> Self {
116 match job_status {
117 JobStatus::Initial => Self::Unspecified,
118 JobStatus::Creating => Self::Creating,
119 JobStatus::Created => Self::Created,
120 }
121 }
122}
123
124impl From<JobStatus> for PbStreamJobState {
126 fn from(status: JobStatus) -> Self {
127 match status {
128 JobStatus::Initial => PbStreamJobState::Initial,
129 JobStatus::Creating => PbStreamJobState::Creating,
130 JobStatus::Created => PbStreamJobState::Created,
131 }
132 }
133}
134
135#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
136#[sea_orm(rs_type = "String", db_type = "string(None)")]
137pub enum CreateType {
138 #[sea_orm(string_value = "BACKGROUND")]
139 Background,
140 #[sea_orm(string_value = "FOREGROUND")]
141 Foreground,
142}
143
144impl From<CreateType> for PbCreateType {
145 fn from(create_type: CreateType) -> Self {
146 match create_type {
147 CreateType::Background => Self::Background,
148 CreateType::Foreground => Self::Foreground,
149 }
150 }
151}
152
153impl From<PbCreateType> for CreateType {
154 fn from(create_type: PbCreateType) -> Self {
155 match create_type {
156 PbCreateType::Background => Self::Background,
157 PbCreateType::Foreground => Self::Foreground,
158 PbCreateType::Unspecified => unreachable!("Unspecified create type"),
159 }
160 }
161}
162
163impl CreateType {
164 pub fn as_str(&self) -> &'static str {
165 match self {
166 CreateType::Background => "BACKGROUND",
167 CreateType::Foreground => "FOREGROUND",
168 }
169 }
170}
171
172macro_rules! derive_from_json_struct {
174 ($struct_name:ident, $field_type:ty) => {
175 #[derive(Clone, Debug, PartialEq, FromJsonQueryResult, Serialize, Deserialize, Default)]
176 pub struct $struct_name(pub $field_type);
177 impl Eq for $struct_name {}
178 impl From<$field_type> for $struct_name {
179 fn from(value: $field_type) -> Self {
180 Self(value)
181 }
182 }
183
184 impl $struct_name {
185 pub fn into_inner(self) -> $field_type {
186 self.0
187 }
188
189 pub fn inner_ref(&self) -> &$field_type {
190 &self.0
191 }
192 }
193 };
194}
195
196macro_rules! derive_from_blob {
198 ($struct_name:ident, $field_type:ty) => {
199 #[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, DeriveValueType)]
200 pub struct $struct_name(#[sea_orm] Vec<u8>);
201
202 impl $struct_name {
203 pub fn to_protobuf(&self) -> $field_type {
204 prost::Message::decode(self.0.as_slice()).unwrap()
205 }
206
207 fn from_protobuf(val: &$field_type) -> Self {
208 Self(prost::Message::encode_to_vec(val))
209 }
210 }
211
212 impl sea_orm::sea_query::Nullable for $struct_name {
213 fn null() -> Value {
214 Value::Bytes(None)
215 }
216 }
217
218 impl From<&$field_type> for $struct_name {
219 fn from(value: &$field_type) -> Self {
220 Self::from_protobuf(value)
221 }
222 }
223
224 impl std::fmt::Debug for $struct_name {
225 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
226 self.to_protobuf().fmt(f)
227 }
228 }
229
230 impl Default for $struct_name {
231 fn default() -> Self {
232 Self::from_protobuf(&<$field_type>::default())
233 }
234 }
235 };
236}
237
238macro_rules! derive_array_from_blob {
240 ($struct_name:ident, $field_type:ty, $field_array_name:ident) => {
241 #[derive(Clone, PartialEq, Eq, DeriveValueType, serde::Deserialize, serde::Serialize)]
242 pub struct $struct_name(#[sea_orm] Vec<u8>);
243
244 #[derive(Clone, PartialEq, ::prost::Message)]
245 pub struct $field_array_name {
246 #[prost(message, repeated, tag = "1")]
247 inner: Vec<$field_type>,
248 }
249 impl Eq for $field_array_name {}
250
251 impl $struct_name {
252 pub fn to_protobuf(&self) -> Vec<$field_type> {
253 let data: $field_array_name = prost::Message::decode(self.0.as_slice()).unwrap();
254 data.inner
255 }
256
257 fn from_protobuf(val: Vec<$field_type>) -> Self {
258 Self(prost::Message::encode_to_vec(&$field_array_name {
259 inner: val,
260 }))
261 }
262 }
263
264 impl From<Vec<$field_type>> for $struct_name {
265 fn from(value: Vec<$field_type>) -> Self {
266 Self::from_protobuf(value)
267 }
268 }
269
270 impl std::fmt::Debug for $struct_name {
271 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
272 self.to_protobuf().fmt(f)
273 }
274 }
275
276 impl Default for $struct_name {
277 fn default() -> Self {
278 Self(vec![])
279 }
280 }
281
282 impl sea_orm::sea_query::Nullable for $struct_name {
283 fn null() -> Value {
284 Value::Bytes(None)
285 }
286 }
287 };
288}
289
290macro_rules! derive_btreemap_from_blob {
291 ($struct_name:ident, $key_type:ty, $value_type:ty, $field_type:ident) => {
292 #[derive(Clone, PartialEq, Eq, DeriveValueType, serde::Deserialize, serde::Serialize)]
293 pub struct $struct_name(#[sea_orm] Vec<u8>);
294
295 #[derive(Clone, PartialEq, ::prost::Message)]
296 pub struct $field_type {
297 #[prost(btree_map = "string, message")]
298 inner: BTreeMap<$key_type, $value_type>,
299 }
300 impl Eq for $field_type {}
301
302 impl $struct_name {
303 pub fn to_protobuf(&self) -> BTreeMap<$key_type, $value_type> {
304 let data: $field_type = prost::Message::decode(self.0.as_slice()).unwrap();
305 data.inner
306 }
307
308 fn from_protobuf(val: BTreeMap<$key_type, $value_type>) -> Self {
309 Self(prost::Message::encode_to_vec(&$field_type { inner: val }))
310 }
311 }
312
313 impl From<BTreeMap<$key_type, $value_type>> for $struct_name {
314 fn from(value: BTreeMap<$key_type, $value_type>) -> Self {
315 Self::from_protobuf(value)
316 }
317 }
318
319 impl std::fmt::Debug for $struct_name {
320 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
321 self.to_protobuf().fmt(f)
322 }
323 }
324
325 impl Default for $struct_name {
326 fn default() -> Self {
327 Self(vec![])
328 }
329 }
330
331 impl sea_orm::sea_query::Nullable for $struct_name {
332 fn null() -> Value {
333 Value::Bytes(None)
334 }
335 }
336 };
337}
338
339pub(crate) use {derive_array_from_blob, derive_from_blob};
340
341derive_from_json_struct!(I32Array, Vec<i32>);
342
343impl From<Vec<u32>> for I32Array {
344 fn from(value: Vec<u32>) -> Self {
345 Self(value.into_iter().map(|id| id as _).collect())
346 }
347}
348
349impl I32Array {
350 pub fn into_u32_array(self) -> Vec<u32> {
351 self.0.into_iter().map(|id| id as _).collect()
352 }
353}
354
355derive_from_json_struct!(ActorUpstreamActors, BTreeMap<FragmentId, Vec<ActorId>>);
356
357impl From<BTreeMap<u32, Vec<u32>>> for ActorUpstreamActors {
358 fn from(val: BTreeMap<u32, Vec<u32>>) -> Self {
359 let mut map = BTreeMap::new();
360 for (k, v) in val {
361 map.insert(k as _, v.into_iter().map(|a| a as _).collect());
362 }
363 Self(map)
364 }
365}
366
367derive_btreemap_from_blob!(SecretRef, String, PbSecretRef, PbSecretRefMap);
368
369derive_from_blob!(StreamNode, PbStreamNode);
370derive_from_blob!(DataType, risingwave_pb::data::PbDataType);
371derive_array_from_blob!(
372 DataTypeArray,
373 risingwave_pb::data::PbDataType,
374 PbDataTypeArray
375);
376derive_array_from_blob!(
377 FieldArray,
378 risingwave_pb::plan_common::PbField,
379 PbFieldArray
380);
381derive_from_json_struct!(Property, BTreeMap<String, String>);
382derive_from_blob!(ColumnCatalog, risingwave_pb::plan_common::PbColumnCatalog);
383derive_array_from_blob!(
384 ColumnCatalogArray,
385 risingwave_pb::plan_common::PbColumnCatalog,
386 PbColumnCatalogArray
387);
388derive_from_blob!(StreamSourceInfo, risingwave_pb::catalog::PbStreamSourceInfo);
389derive_from_blob!(
390 WebhookSourceInfo,
391 risingwave_pb::catalog::PbWebhookSourceInfo
392);
393derive_from_blob!(WatermarkDesc, risingwave_pb::catalog::PbWatermarkDesc);
394derive_array_from_blob!(
395 WatermarkDescArray,
396 risingwave_pb::catalog::PbWatermarkDesc,
397 PbWatermarkDescArray
398);
399derive_array_from_blob!(
400 ExprNodeArray,
401 risingwave_pb::expr::PbExprNode,
402 PbExprNodeArray
403);
404derive_array_from_blob!(
405 ColumnOrderArray,
406 risingwave_pb::common::PbColumnOrder,
407 PbColumnOrderArray
408);
409derive_array_from_blob!(
410 IndexColumnPropertiesArray,
411 risingwave_pb::catalog::PbIndexColumnProperties,
412 PbIndexColumnPropertiesArray
413);
414derive_from_blob!(SinkFormatDesc, risingwave_pb::catalog::PbSinkFormatDesc);
415derive_from_blob!(Cardinality, risingwave_pb::plan_common::PbCardinality);
416derive_from_blob!(TableVersion, risingwave_pb::catalog::table::PbTableVersion);
417derive_from_blob!(
418 PrivateLinkService,
419 risingwave_pb::catalog::connection::PbPrivateLinkService
420);
421derive_from_blob!(ConnectionParams, risingwave_pb::catalog::ConnectionParams);
422derive_from_blob!(AuthInfo, risingwave_pb::user::PbAuthInfo);
423
424derive_from_blob!(ConnectorSplits, risingwave_pb::source::ConnectorSplits);
425derive_from_blob!(VnodeBitmap, risingwave_pb::common::Buffer);
426derive_from_blob!(ActorMapping, risingwave_pb::stream_plan::PbActorMapping);
427derive_from_blob!(ExprContext, risingwave_pb::plan_common::PbExprContext);
428
429derive_array_from_blob!(
430 TypePairArray,
431 risingwave_pb::stream_plan::dispatch_output_mapping::TypePair,
432 PbTypePairArray
433);
434
435derive_array_from_blob!(
436 HummockVersionDeltaArray,
437 risingwave_pb::hummock::PbHummockVersionDelta,
438 PbHummockVersionDeltaArray
439);
440
441#[derive(Clone, Debug, PartialEq, FromJsonQueryResult, Serialize, Deserialize)]
442pub enum StreamingParallelism {
443 Adaptive,
444 Fixed(usize),
445 Custom,
446}
447
448impl Eq for StreamingParallelism {}
449
450#[derive(
451 Hash, Copy, Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize,
452)]
453#[sea_orm(rs_type = "String", db_type = "string(None)")]
454pub enum DispatcherType {
455 #[sea_orm(string_value = "HASH")]
456 Hash,
457 #[sea_orm(string_value = "BROADCAST")]
458 Broadcast,
459 #[sea_orm(string_value = "SIMPLE")]
460 Simple,
461 #[sea_orm(string_value = "NO_SHUFFLE")]
462 NoShuffle,
463}
464
465impl From<PbDispatcherType> for DispatcherType {
466 fn from(val: PbDispatcherType) -> Self {
467 match val {
468 PbDispatcherType::Unspecified => unreachable!(),
469 PbDispatcherType::Hash => DispatcherType::Hash,
470 PbDispatcherType::Broadcast => DispatcherType::Broadcast,
471 PbDispatcherType::Simple => DispatcherType::Simple,
472 PbDispatcherType::NoShuffle => DispatcherType::NoShuffle,
473 }
474 }
475}
476
477impl From<DispatcherType> for PbDispatcherType {
478 fn from(val: DispatcherType) -> Self {
479 match val {
480 DispatcherType::Hash => PbDispatcherType::Hash,
481 DispatcherType::Broadcast => PbDispatcherType::Broadcast,
482 DispatcherType::Simple => PbDispatcherType::Simple,
483 DispatcherType::NoShuffle => PbDispatcherType::NoShuffle,
484 }
485 }
486}