1use std::collections::BTreeMap;
16
17use risingwave_pb::catalog::{PbCreateType, PbStreamJobStatus};
18use risingwave_pb::meta::table_fragments::PbState as PbStreamJobState;
19use risingwave_pb::secret::PbSecretRef;
20use risingwave_pb::stream_plan::{PbDispatcherType, PbStreamNode};
21use sea_orm::entity::prelude::*;
22use sea_orm::{DeriveActiveEnum, EnumIter, FromJsonQueryResult};
23use serde::{Deserialize, Serialize};
24
25pub mod prelude;
26
27pub mod actor;
28pub mod catalog_version;
29pub mod cluster;
30pub mod compaction_config;
31pub mod compaction_status;
32pub mod compaction_task;
33pub mod connection;
34pub mod database;
35pub mod exactly_once_iceberg_sink;
36pub mod fragment;
37pub mod fragment_relation;
38pub mod function;
39pub mod hummock_epoch_to_version;
40pub mod hummock_gc_history;
41pub mod hummock_pinned_snapshot;
42pub mod hummock_pinned_version;
43pub mod hummock_sequence;
44pub mod hummock_sstable_info;
45pub mod hummock_time_travel_delta;
46pub mod hummock_time_travel_version;
47pub mod hummock_version_delta;
48pub mod hummock_version_stats;
49pub mod iceberg_namespace_properties;
50pub mod iceberg_tables;
51pub mod index;
52pub mod object;
53pub mod object_dependency;
54pub mod schema;
55pub mod secret;
56pub mod serde_seaql_migration;
57pub mod session_parameter;
58pub mod sink;
59pub mod source;
60pub mod streaming_job;
61pub mod subscription;
62pub mod system_parameter;
63pub mod table;
64pub mod user;
65pub mod user_default_privilege;
66pub mod user_privilege;
67pub mod view;
68pub mod worker;
69pub mod worker_property;
70
71pub type WorkerId = i32;
72
73pub type TransactionId = i32;
74
75pub type ObjectId = i32;
76pub type DatabaseId = ObjectId;
77pub type SchemaId = ObjectId;
78pub type TableId = ObjectId;
79pub type SourceId = ObjectId;
80pub type SinkId = ObjectId;
81pub type SubscriptionId = ObjectId;
82pub type IndexId = ObjectId;
83pub type ViewId = ObjectId;
84pub type FunctionId = ObjectId;
85pub type ConnectionId = ObjectId;
86pub type SecretId = ObjectId;
87pub type UserId = i32;
88pub type PrivilegeId = i32;
89pub type DefaultPrivilegeId = i32;
90
91pub type HummockVersionId = i64;
92pub type Epoch = i64;
93pub type CompactionGroupId = i64;
94pub type CompactionTaskId = i64;
95pub type HummockSstableObjectId = i64;
96
97pub type FragmentId = i32;
98pub type ActorId = i32;
99
100#[derive(Clone, Copy, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
101#[sea_orm(rs_type = "String", db_type = "string(None)")]
102pub enum JobStatus {
103 #[sea_orm(string_value = "INITIAL")]
104 Initial,
105 #[sea_orm(string_value = "CREATING")]
106 Creating,
107 #[sea_orm(string_value = "CREATED")]
108 Created,
109}
110
111impl From<JobStatus> for PbStreamJobStatus {
112 fn from(job_status: JobStatus) -> Self {
113 match job_status {
114 JobStatus::Initial => Self::Unspecified,
115 JobStatus::Creating => Self::Creating,
116 JobStatus::Created => Self::Created,
117 }
118 }
119}
120
121impl From<JobStatus> for PbStreamJobState {
123 fn from(status: JobStatus) -> Self {
124 match status {
125 JobStatus::Initial => PbStreamJobState::Initial,
126 JobStatus::Creating => PbStreamJobState::Creating,
127 JobStatus::Created => PbStreamJobState::Created,
128 }
129 }
130}
131
132#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
133#[sea_orm(rs_type = "String", db_type = "string(None)")]
134pub enum CreateType {
135 #[sea_orm(string_value = "BACKGROUND")]
136 Background,
137 #[sea_orm(string_value = "FOREGROUND")]
138 Foreground,
139}
140
141impl From<CreateType> for PbCreateType {
142 fn from(create_type: CreateType) -> Self {
143 match create_type {
144 CreateType::Background => Self::Background,
145 CreateType::Foreground => Self::Foreground,
146 }
147 }
148}
149
150impl From<PbCreateType> for CreateType {
151 fn from(create_type: PbCreateType) -> Self {
152 match create_type {
153 PbCreateType::Background => Self::Background,
154 PbCreateType::Foreground => Self::Foreground,
155 PbCreateType::Unspecified => unreachable!("Unspecified create type"),
156 }
157 }
158}
159
160impl CreateType {
161 pub fn as_str(&self) -> &'static str {
162 match self {
163 CreateType::Background => "BACKGROUND",
164 CreateType::Foreground => "FOREGROUND",
165 }
166 }
167}
168
169macro_rules! derive_from_json_struct {
171 ($struct_name:ident, $field_type:ty) => {
172 #[derive(Clone, Debug, PartialEq, FromJsonQueryResult, Serialize, Deserialize, Default)]
173 pub struct $struct_name(pub $field_type);
174 impl Eq for $struct_name {}
175 impl From<$field_type> for $struct_name {
176 fn from(value: $field_type) -> Self {
177 Self(value)
178 }
179 }
180
181 impl $struct_name {
182 pub fn into_inner(self) -> $field_type {
183 self.0
184 }
185
186 pub fn inner_ref(&self) -> &$field_type {
187 &self.0
188 }
189 }
190 };
191}
192
193macro_rules! derive_from_blob {
195 ($struct_name:ident, $field_type:ty) => {
196 #[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, DeriveValueType)]
197 pub struct $struct_name(#[sea_orm] Vec<u8>);
198
199 impl $struct_name {
200 pub fn to_protobuf(&self) -> $field_type {
201 prost::Message::decode(self.0.as_slice()).unwrap()
202 }
203
204 fn from_protobuf(val: &$field_type) -> Self {
205 Self(prost::Message::encode_to_vec(val))
206 }
207 }
208
209 impl sea_orm::sea_query::Nullable for $struct_name {
210 fn null() -> Value {
211 Value::Bytes(None)
212 }
213 }
214
215 impl From<&$field_type> for $struct_name {
216 fn from(value: &$field_type) -> Self {
217 Self::from_protobuf(value)
218 }
219 }
220
221 impl std::fmt::Debug for $struct_name {
222 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
223 self.to_protobuf().fmt(f)
224 }
225 }
226
227 impl Default for $struct_name {
228 fn default() -> Self {
229 Self::from_protobuf(&<$field_type>::default())
230 }
231 }
232 };
233}
234
235macro_rules! derive_array_from_blob {
237 ($struct_name:ident, $field_type:ty, $field_array_name:ident) => {
238 #[derive(Clone, PartialEq, Eq, DeriveValueType, serde::Deserialize, serde::Serialize)]
239 pub struct $struct_name(#[sea_orm] Vec<u8>);
240
241 #[derive(Clone, PartialEq, ::prost::Message)]
242 pub struct $field_array_name {
243 #[prost(message, repeated, tag = "1")]
244 inner: Vec<$field_type>,
245 }
246 impl Eq for $field_array_name {}
247
248 impl $struct_name {
249 pub fn to_protobuf(&self) -> Vec<$field_type> {
250 let data: $field_array_name = prost::Message::decode(self.0.as_slice()).unwrap();
251 data.inner
252 }
253
254 fn from_protobuf(val: Vec<$field_type>) -> Self {
255 Self(prost::Message::encode_to_vec(&$field_array_name {
256 inner: val,
257 }))
258 }
259 }
260
261 impl From<Vec<$field_type>> for $struct_name {
262 fn from(value: Vec<$field_type>) -> Self {
263 Self::from_protobuf(value)
264 }
265 }
266
267 impl std::fmt::Debug for $struct_name {
268 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
269 self.to_protobuf().fmt(f)
270 }
271 }
272
273 impl Default for $struct_name {
274 fn default() -> Self {
275 Self(vec![])
276 }
277 }
278
279 impl sea_orm::sea_query::Nullable for $struct_name {
280 fn null() -> Value {
281 Value::Bytes(None)
282 }
283 }
284 };
285}
286
287macro_rules! derive_btreemap_from_blob {
288 ($struct_name:ident, $key_type:ty, $value_type:ty, $field_type:ident) => {
289 #[derive(Clone, PartialEq, Eq, DeriveValueType, serde::Deserialize, serde::Serialize)]
290 pub struct $struct_name(#[sea_orm] Vec<u8>);
291
292 #[derive(Clone, PartialEq, ::prost::Message)]
293 pub struct $field_type {
294 #[prost(btree_map = "string, message")]
295 inner: BTreeMap<$key_type, $value_type>,
296 }
297 impl Eq for $field_type {}
298
299 impl $struct_name {
300 pub fn to_protobuf(&self) -> BTreeMap<$key_type, $value_type> {
301 let data: $field_type = prost::Message::decode(self.0.as_slice()).unwrap();
302 data.inner
303 }
304
305 fn from_protobuf(val: BTreeMap<$key_type, $value_type>) -> Self {
306 Self(prost::Message::encode_to_vec(&$field_type { inner: val }))
307 }
308 }
309
310 impl From<BTreeMap<$key_type, $value_type>> for $struct_name {
311 fn from(value: BTreeMap<$key_type, $value_type>) -> Self {
312 Self::from_protobuf(value)
313 }
314 }
315
316 impl std::fmt::Debug for $struct_name {
317 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
318 self.to_protobuf().fmt(f)
319 }
320 }
321
322 impl Default for $struct_name {
323 fn default() -> Self {
324 Self(vec![])
325 }
326 }
327
328 impl sea_orm::sea_query::Nullable for $struct_name {
329 fn null() -> Value {
330 Value::Bytes(None)
331 }
332 }
333 };
334}
335
336pub(crate) use {derive_array_from_blob, derive_from_blob};
337
338derive_from_json_struct!(I32Array, Vec<i32>);
339
340impl From<Vec<u32>> for I32Array {
341 fn from(value: Vec<u32>) -> Self {
342 Self(value.into_iter().map(|id| id as _).collect())
343 }
344}
345
346impl I32Array {
347 pub fn into_u32_array(self) -> Vec<u32> {
348 self.0.into_iter().map(|id| id as _).collect()
349 }
350}
351
352derive_from_json_struct!(ActorUpstreamActors, BTreeMap<FragmentId, Vec<ActorId>>);
353
354impl From<BTreeMap<u32, Vec<u32>>> for ActorUpstreamActors {
355 fn from(val: BTreeMap<u32, Vec<u32>>) -> Self {
356 let mut map = BTreeMap::new();
357 for (k, v) in val {
358 map.insert(k as _, v.into_iter().map(|a| a as _).collect());
359 }
360 Self(map)
361 }
362}
363
364derive_btreemap_from_blob!(SecretRef, String, PbSecretRef, PbSecretRefMap);
365
366derive_from_blob!(StreamNode, PbStreamNode);
367derive_from_blob!(DataType, risingwave_pb::data::PbDataType);
368derive_array_from_blob!(
369 DataTypeArray,
370 risingwave_pb::data::PbDataType,
371 PbDataTypeArray
372);
373derive_array_from_blob!(
374 FieldArray,
375 risingwave_pb::plan_common::PbField,
376 PbFieldArray
377);
378derive_from_json_struct!(Property, BTreeMap<String, String>);
379derive_from_blob!(ColumnCatalog, risingwave_pb::plan_common::PbColumnCatalog);
380derive_array_from_blob!(
381 ColumnCatalogArray,
382 risingwave_pb::plan_common::PbColumnCatalog,
383 PbColumnCatalogArray
384);
385derive_from_blob!(StreamSourceInfo, risingwave_pb::catalog::PbStreamSourceInfo);
386derive_from_blob!(
387 WebhookSourceInfo,
388 risingwave_pb::catalog::PbWebhookSourceInfo
389);
390derive_from_blob!(WatermarkDesc, risingwave_pb::catalog::PbWatermarkDesc);
391derive_array_from_blob!(
392 WatermarkDescArray,
393 risingwave_pb::catalog::PbWatermarkDesc,
394 PbWatermarkDescArray
395);
396derive_array_from_blob!(
397 ExprNodeArray,
398 risingwave_pb::expr::PbExprNode,
399 PbExprNodeArray
400);
401derive_array_from_blob!(
402 ColumnOrderArray,
403 risingwave_pb::common::PbColumnOrder,
404 PbColumnOrderArray
405);
406derive_array_from_blob!(
407 IndexColumnPropertiesArray,
408 risingwave_pb::catalog::PbIndexColumnProperties,
409 PbIndexColumnPropertiesArray
410);
411derive_from_blob!(SinkFormatDesc, risingwave_pb::catalog::PbSinkFormatDesc);
412derive_from_blob!(Cardinality, risingwave_pb::plan_common::PbCardinality);
413derive_from_blob!(TableVersion, risingwave_pb::catalog::table::PbTableVersion);
414derive_from_blob!(
415 PrivateLinkService,
416 risingwave_pb::catalog::connection::PbPrivateLinkService
417);
418derive_from_blob!(ConnectionParams, risingwave_pb::catalog::ConnectionParams);
419derive_from_blob!(AuthInfo, risingwave_pb::user::PbAuthInfo);
420
421derive_from_blob!(ConnectorSplits, risingwave_pb::source::ConnectorSplits);
422derive_from_blob!(VnodeBitmap, risingwave_pb::common::Buffer);
423derive_from_blob!(ActorMapping, risingwave_pb::stream_plan::PbActorMapping);
424derive_from_blob!(ExprContext, risingwave_pb::plan_common::PbExprContext);
425
426derive_array_from_blob!(
427 TypePairArray,
428 risingwave_pb::stream_plan::dispatch_output_mapping::TypePair,
429 PbTypePairArray
430);
431
432derive_array_from_blob!(
433 HummockVersionDeltaArray,
434 risingwave_pb::hummock::PbHummockVersionDelta,
435 PbHummockVersionDeltaArray
436);
437
438#[derive(Clone, Debug, PartialEq, FromJsonQueryResult, Serialize, Deserialize)]
439pub enum StreamingParallelism {
440 Adaptive,
441 Fixed(usize),
442 Custom,
443}
444
445impl Eq for StreamingParallelism {}
446
447#[derive(
448 Hash, Copy, Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize,
449)]
450#[sea_orm(rs_type = "String", db_type = "string(None)")]
451pub enum DispatcherType {
452 #[sea_orm(string_value = "HASH")]
453 Hash,
454 #[sea_orm(string_value = "BROADCAST")]
455 Broadcast,
456 #[sea_orm(string_value = "SIMPLE")]
457 Simple,
458 #[sea_orm(string_value = "NO_SHUFFLE")]
459 NoShuffle,
460}
461
462impl From<PbDispatcherType> for DispatcherType {
463 fn from(val: PbDispatcherType) -> Self {
464 match val {
465 PbDispatcherType::Unspecified => unreachable!(),
466 PbDispatcherType::Hash => DispatcherType::Hash,
467 PbDispatcherType::Broadcast => DispatcherType::Broadcast,
468 PbDispatcherType::Simple => DispatcherType::Simple,
469 PbDispatcherType::NoShuffle => DispatcherType::NoShuffle,
470 }
471 }
472}
473
474impl From<DispatcherType> for PbDispatcherType {
475 fn from(val: DispatcherType) -> Self {
476 match val {
477 DispatcherType::Hash => PbDispatcherType::Hash,
478 DispatcherType::Broadcast => PbDispatcherType::Broadcast,
479 DispatcherType::Simple => PbDispatcherType::Simple,
480 DispatcherType::NoShuffle => PbDispatcherType::NoShuffle,
481 }
482 }
483}