1use std::collections::BTreeMap;
16
17use risingwave_pb::catalog::{PbCreateType, PbStreamJobStatus};
18use risingwave_pb::meta::table_fragments::PbState as PbStreamJobState;
19use risingwave_pb::secret::PbSecretRef;
20use risingwave_pb::stream_plan::{PbDispatcherType, PbStreamNode};
21use sea_orm::entity::prelude::*;
22use sea_orm::{DeriveActiveEnum, EnumIter, FromJsonQueryResult};
23use serde::{Deserialize, Serialize};
24
25pub mod prelude;
26
27pub mod actor;
28pub mod catalog_version;
29pub mod cdc_table_snapshot_split;
30pub mod cluster;
31pub mod compaction_config;
32pub mod compaction_status;
33pub mod compaction_task;
34pub mod connection;
35pub mod database;
36pub mod exactly_once_iceberg_sink;
37pub mod fragment;
38pub mod fragment_relation;
39pub mod function;
40pub mod hummock_epoch_to_version;
41pub mod hummock_gc_history;
42pub mod hummock_pinned_snapshot;
43pub mod hummock_pinned_version;
44pub mod hummock_sequence;
45pub mod hummock_sstable_info;
46pub mod hummock_time_travel_delta;
47pub mod hummock_time_travel_version;
48pub mod hummock_version_delta;
49pub mod hummock_version_stats;
50pub mod iceberg_namespace_properties;
51pub mod iceberg_tables;
52pub mod index;
53pub mod object;
54pub mod object_dependency;
55pub mod schema;
56pub mod secret;
57pub mod serde_seaql_migration;
58pub mod session_parameter;
59pub mod sink;
60pub mod source;
61pub mod source_splits;
62pub mod streaming_job;
63pub mod subscription;
64pub mod system_parameter;
65pub mod table;
66pub mod user;
67pub mod user_default_privilege;
68pub mod user_privilege;
69pub mod view;
70pub mod worker;
71pub mod worker_property;
72
73pub type WorkerId = i32;
74
75pub type TransactionId = i32;
76
77pub type ObjectId = i32;
78pub type DatabaseId = ObjectId;
79pub type SchemaId = ObjectId;
80pub type TableId = ObjectId;
81pub type SourceId = ObjectId;
82pub type SinkId = ObjectId;
83pub type SubscriptionId = ObjectId;
84pub type IndexId = ObjectId;
85pub type ViewId = ObjectId;
86pub type FunctionId = ObjectId;
87pub type ConnectionId = ObjectId;
88pub type SecretId = ObjectId;
89pub type UserId = i32;
90pub type PrivilegeId = i32;
91pub type DefaultPrivilegeId = i32;
92
93pub type HummockVersionId = i64;
94pub type Epoch = i64;
95pub type CompactionGroupId = i64;
96pub type CompactionTaskId = i64;
97pub type HummockSstableObjectId = i64;
98
99pub type FragmentId = i32;
100pub type ActorId = i32;
101
102#[derive(Clone, Copy, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
103#[sea_orm(rs_type = "String", db_type = "string(None)")]
104pub enum JobStatus {
105 #[sea_orm(string_value = "INITIAL")]
106 Initial,
107 #[sea_orm(string_value = "CREATING")]
108 Creating,
109 #[sea_orm(string_value = "CREATED")]
110 Created,
111}
112
113impl From<JobStatus> for PbStreamJobStatus {
114 fn from(job_status: JobStatus) -> Self {
115 match job_status {
116 JobStatus::Initial => Self::Unspecified,
117 JobStatus::Creating => Self::Creating,
118 JobStatus::Created => Self::Created,
119 }
120 }
121}
122
123impl From<JobStatus> for PbStreamJobState {
125 fn from(status: JobStatus) -> Self {
126 match status {
127 JobStatus::Initial => PbStreamJobState::Initial,
128 JobStatus::Creating => PbStreamJobState::Creating,
129 JobStatus::Created => PbStreamJobState::Created,
130 }
131 }
132}
133
134#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
135#[sea_orm(rs_type = "String", db_type = "string(None)")]
136pub enum CreateType {
137 #[sea_orm(string_value = "BACKGROUND")]
138 Background,
139 #[sea_orm(string_value = "FOREGROUND")]
140 Foreground,
141}
142
143impl From<CreateType> for PbCreateType {
144 fn from(create_type: CreateType) -> Self {
145 match create_type {
146 CreateType::Background => Self::Background,
147 CreateType::Foreground => Self::Foreground,
148 }
149 }
150}
151
152impl From<PbCreateType> for CreateType {
153 fn from(create_type: PbCreateType) -> Self {
154 match create_type {
155 PbCreateType::Background => Self::Background,
156 PbCreateType::Foreground => Self::Foreground,
157 PbCreateType::Unspecified => unreachable!("Unspecified create type"),
158 }
159 }
160}
161
162impl CreateType {
163 pub fn as_str(&self) -> &'static str {
164 match self {
165 CreateType::Background => "BACKGROUND",
166 CreateType::Foreground => "FOREGROUND",
167 }
168 }
169}
170
171macro_rules! derive_from_json_struct {
173 ($struct_name:ident, $field_type:ty) => {
174 #[derive(Clone, Debug, PartialEq, FromJsonQueryResult, Serialize, Deserialize, Default)]
175 pub struct $struct_name(pub $field_type);
176 impl Eq for $struct_name {}
177 impl From<$field_type> for $struct_name {
178 fn from(value: $field_type) -> Self {
179 Self(value)
180 }
181 }
182
183 impl $struct_name {
184 pub fn into_inner(self) -> $field_type {
185 self.0
186 }
187
188 pub fn inner_ref(&self) -> &$field_type {
189 &self.0
190 }
191 }
192 };
193}
194
195macro_rules! derive_from_blob {
197 ($struct_name:ident, $field_type:ty) => {
198 #[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, DeriveValueType)]
199 pub struct $struct_name(#[sea_orm] Vec<u8>);
200
201 impl $struct_name {
202 pub fn to_protobuf(&self) -> $field_type {
203 prost::Message::decode(self.0.as_slice()).unwrap()
204 }
205
206 fn from_protobuf(val: &$field_type) -> Self {
207 Self(prost::Message::encode_to_vec(val))
208 }
209 }
210
211 impl sea_orm::sea_query::Nullable for $struct_name {
212 fn null() -> Value {
213 Value::Bytes(None)
214 }
215 }
216
217 impl From<&$field_type> for $struct_name {
218 fn from(value: &$field_type) -> Self {
219 Self::from_protobuf(value)
220 }
221 }
222
223 impl std::fmt::Debug for $struct_name {
224 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
225 self.to_protobuf().fmt(f)
226 }
227 }
228
229 impl Default for $struct_name {
230 fn default() -> Self {
231 Self::from_protobuf(&<$field_type>::default())
232 }
233 }
234 };
235}
236
237macro_rules! derive_array_from_blob {
239 ($struct_name:ident, $field_type:ty, $field_array_name:ident) => {
240 #[derive(Clone, PartialEq, Eq, DeriveValueType, serde::Deserialize, serde::Serialize)]
241 pub struct $struct_name(#[sea_orm] Vec<u8>);
242
243 #[derive(Clone, PartialEq, ::prost::Message)]
244 pub struct $field_array_name {
245 #[prost(message, repeated, tag = "1")]
246 inner: Vec<$field_type>,
247 }
248 impl Eq for $field_array_name {}
249
250 impl $struct_name {
251 pub fn to_protobuf(&self) -> Vec<$field_type> {
252 let data: $field_array_name = prost::Message::decode(self.0.as_slice()).unwrap();
253 data.inner
254 }
255
256 fn from_protobuf(val: Vec<$field_type>) -> Self {
257 Self(prost::Message::encode_to_vec(&$field_array_name {
258 inner: val,
259 }))
260 }
261 }
262
263 impl From<Vec<$field_type>> for $struct_name {
264 fn from(value: Vec<$field_type>) -> Self {
265 Self::from_protobuf(value)
266 }
267 }
268
269 impl std::fmt::Debug for $struct_name {
270 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
271 self.to_protobuf().fmt(f)
272 }
273 }
274
275 impl Default for $struct_name {
276 fn default() -> Self {
277 Self(vec![])
278 }
279 }
280
281 impl sea_orm::sea_query::Nullable for $struct_name {
282 fn null() -> Value {
283 Value::Bytes(None)
284 }
285 }
286 };
287}
288
289macro_rules! derive_btreemap_from_blob {
290 ($struct_name:ident, $key_type:ty, $value_type:ty, $field_type:ident) => {
291 #[derive(Clone, PartialEq, Eq, DeriveValueType, serde::Deserialize, serde::Serialize)]
292 pub struct $struct_name(#[sea_orm] Vec<u8>);
293
294 #[derive(Clone, PartialEq, ::prost::Message)]
295 pub struct $field_type {
296 #[prost(btree_map = "string, message")]
297 inner: BTreeMap<$key_type, $value_type>,
298 }
299 impl Eq for $field_type {}
300
301 impl $struct_name {
302 pub fn to_protobuf(&self) -> BTreeMap<$key_type, $value_type> {
303 let data: $field_type = prost::Message::decode(self.0.as_slice()).unwrap();
304 data.inner
305 }
306
307 fn from_protobuf(val: BTreeMap<$key_type, $value_type>) -> Self {
308 Self(prost::Message::encode_to_vec(&$field_type { inner: val }))
309 }
310 }
311
312 impl From<BTreeMap<$key_type, $value_type>> for $struct_name {
313 fn from(value: BTreeMap<$key_type, $value_type>) -> Self {
314 Self::from_protobuf(value)
315 }
316 }
317
318 impl std::fmt::Debug for $struct_name {
319 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
320 self.to_protobuf().fmt(f)
321 }
322 }
323
324 impl Default for $struct_name {
325 fn default() -> Self {
326 Self(vec![])
327 }
328 }
329
330 impl sea_orm::sea_query::Nullable for $struct_name {
331 fn null() -> Value {
332 Value::Bytes(None)
333 }
334 }
335 };
336}
337
338pub(crate) use {derive_array_from_blob, derive_from_blob};
339
340derive_from_json_struct!(I32Array, Vec<i32>);
341
342impl From<Vec<u32>> for I32Array {
343 fn from(value: Vec<u32>) -> Self {
344 Self(value.into_iter().map(|id| id as _).collect())
345 }
346}
347
348impl I32Array {
349 pub fn into_u32_array(self) -> Vec<u32> {
350 self.0.into_iter().map(|id| id as _).collect()
351 }
352}
353
354derive_from_json_struct!(ActorUpstreamActors, BTreeMap<FragmentId, Vec<ActorId>>);
355
356impl From<BTreeMap<u32, Vec<u32>>> for ActorUpstreamActors {
357 fn from(val: BTreeMap<u32, Vec<u32>>) -> Self {
358 let mut map = BTreeMap::new();
359 for (k, v) in val {
360 map.insert(k as _, v.into_iter().map(|a| a as _).collect());
361 }
362 Self(map)
363 }
364}
365
366derive_btreemap_from_blob!(SecretRef, String, PbSecretRef, PbSecretRefMap);
367
368derive_from_blob!(StreamNode, PbStreamNode);
369derive_from_blob!(DataType, risingwave_pb::data::PbDataType);
370derive_array_from_blob!(
371 DataTypeArray,
372 risingwave_pb::data::PbDataType,
373 PbDataTypeArray
374);
375derive_array_from_blob!(
376 FieldArray,
377 risingwave_pb::plan_common::PbField,
378 PbFieldArray
379);
380derive_from_json_struct!(Property, BTreeMap<String, String>);
381derive_from_blob!(ColumnCatalog, risingwave_pb::plan_common::PbColumnCatalog);
382derive_array_from_blob!(
383 ColumnCatalogArray,
384 risingwave_pb::plan_common::PbColumnCatalog,
385 PbColumnCatalogArray
386);
387derive_from_blob!(StreamSourceInfo, risingwave_pb::catalog::PbStreamSourceInfo);
388derive_from_blob!(
389 WebhookSourceInfo,
390 risingwave_pb::catalog::PbWebhookSourceInfo
391);
392derive_from_blob!(WatermarkDesc, risingwave_pb::catalog::PbWatermarkDesc);
393derive_array_from_blob!(
394 WatermarkDescArray,
395 risingwave_pb::catalog::PbWatermarkDesc,
396 PbWatermarkDescArray
397);
398derive_array_from_blob!(
399 ExprNodeArray,
400 risingwave_pb::expr::PbExprNode,
401 PbExprNodeArray
402);
403derive_array_from_blob!(
404 ColumnOrderArray,
405 risingwave_pb::common::PbColumnOrder,
406 PbColumnOrderArray
407);
408derive_array_from_blob!(
409 IndexColumnPropertiesArray,
410 risingwave_pb::catalog::PbIndexColumnProperties,
411 PbIndexColumnPropertiesArray
412);
413derive_from_blob!(SinkFormatDesc, risingwave_pb::catalog::PbSinkFormatDesc);
414derive_from_blob!(Cardinality, risingwave_pb::plan_common::PbCardinality);
415derive_from_blob!(TableVersion, risingwave_pb::catalog::table::PbTableVersion);
416derive_from_blob!(
417 PrivateLinkService,
418 risingwave_pb::catalog::connection::PbPrivateLinkService
419);
420derive_from_blob!(ConnectionParams, risingwave_pb::catalog::ConnectionParams);
421derive_from_blob!(AuthInfo, risingwave_pb::user::PbAuthInfo);
422
423derive_from_blob!(ConnectorSplits, risingwave_pb::source::ConnectorSplits);
424derive_from_blob!(VnodeBitmap, risingwave_pb::common::Buffer);
425derive_from_blob!(ActorMapping, risingwave_pb::stream_plan::PbActorMapping);
426derive_from_blob!(ExprContext, risingwave_pb::plan_common::PbExprContext);
427
428derive_array_from_blob!(
429 TypePairArray,
430 risingwave_pb::stream_plan::dispatch_output_mapping::TypePair,
431 PbTypePairArray
432);
433
434derive_array_from_blob!(
435 HummockVersionDeltaArray,
436 risingwave_pb::hummock::PbHummockVersionDelta,
437 PbHummockVersionDeltaArray
438);
439
440#[derive(Clone, Debug, PartialEq, FromJsonQueryResult, Serialize, Deserialize)]
441pub enum StreamingParallelism {
442 Adaptive,
443 Fixed(usize),
444 Custom,
445}
446
447impl Eq for StreamingParallelism {}
448
449#[derive(
450 Hash, Copy, Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize,
451)]
452#[sea_orm(rs_type = "String", db_type = "string(None)")]
453pub enum DispatcherType {
454 #[sea_orm(string_value = "HASH")]
455 Hash,
456 #[sea_orm(string_value = "BROADCAST")]
457 Broadcast,
458 #[sea_orm(string_value = "SIMPLE")]
459 Simple,
460 #[sea_orm(string_value = "NO_SHUFFLE")]
461 NoShuffle,
462}
463
464impl From<PbDispatcherType> for DispatcherType {
465 fn from(val: PbDispatcherType) -> Self {
466 match val {
467 PbDispatcherType::Unspecified => unreachable!(),
468 PbDispatcherType::Hash => DispatcherType::Hash,
469 PbDispatcherType::Broadcast => DispatcherType::Broadcast,
470 PbDispatcherType::Simple => DispatcherType::Simple,
471 PbDispatcherType::NoShuffle => DispatcherType::NoShuffle,
472 }
473 }
474}
475
476impl From<DispatcherType> for PbDispatcherType {
477 fn from(val: DispatcherType) -> Self {
478 match val {
479 DispatcherType::Hash => PbDispatcherType::Hash,
480 DispatcherType::Broadcast => PbDispatcherType::Broadcast,
481 DispatcherType::Simple => PbDispatcherType::Simple,
482 DispatcherType::NoShuffle => PbDispatcherType::NoShuffle,
483 }
484 }
485}