1use std::collections::BTreeMap;
16
17use risingwave_pb::catalog::{PbCreateType, PbStreamJobStatus};
18use risingwave_pb::meta::table_fragments::PbState as PbStreamJobState;
19use risingwave_pb::secret::PbSecretRef;
20use risingwave_pb::stream_plan::{PbDispatcherType, PbStreamNode};
21use sea_orm::entity::prelude::*;
22use sea_orm::{DeriveActiveEnum, EnumIter, FromJsonQueryResult};
23use serde::{Deserialize, Serialize};
24
25pub mod prelude;
26
27pub mod actor;
28pub mod catalog_version;
29pub mod cluster;
30pub mod compaction_config;
31pub mod compaction_status;
32pub mod compaction_task;
33pub mod connection;
34pub mod database;
35pub mod exactly_once_iceberg_sink;
36pub mod fragment;
37pub mod fragment_relation;
38pub mod function;
39pub mod hummock_epoch_to_version;
40pub mod hummock_gc_history;
41pub mod hummock_pinned_snapshot;
42pub mod hummock_pinned_version;
43pub mod hummock_sequence;
44pub mod hummock_sstable_info;
45pub mod hummock_time_travel_delta;
46pub mod hummock_time_travel_version;
47pub mod hummock_version_delta;
48pub mod hummock_version_stats;
49pub mod iceberg_namespace_properties;
50pub mod iceberg_tables;
51pub mod index;
52pub mod object;
53pub mod object_dependency;
54pub mod schema;
55pub mod secret;
56pub mod serde_seaql_migration;
57pub mod session_parameter;
58pub mod sink;
59pub mod source;
60pub mod streaming_job;
61pub mod subscription;
62pub mod system_parameter;
63pub mod table;
64pub mod user;
65pub mod user_default_privilege;
66pub mod user_privilege;
67pub mod view;
68pub mod worker;
69pub mod worker_property;
70
71pub type WorkerId = i32;
72
73pub type TransactionId = i32;
74
75pub type ObjectId = i32;
76pub type DatabaseId = ObjectId;
77pub type SchemaId = ObjectId;
78pub type TableId = ObjectId;
79pub type SourceId = ObjectId;
80pub type SinkId = ObjectId;
81pub type SubscriptionId = ObjectId;
82pub type IndexId = ObjectId;
83pub type ViewId = ObjectId;
84pub type FunctionId = ObjectId;
85pub type ConnectionId = ObjectId;
86pub type SecretId = ObjectId;
87pub type UserId = i32;
88pub type PrivilegeId = i32;
89pub type DefaultPrivilegeId = i32;
90
91pub type HummockVersionId = i64;
92pub type Epoch = i64;
93pub type CompactionGroupId = i64;
94pub type CompactionTaskId = i64;
95pub type HummockSstableObjectId = i64;
96
97pub type FragmentId = i32;
98pub type ActorId = i32;
99
100#[derive(Clone, Copy, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
101#[sea_orm(rs_type = "String", db_type = "string(None)")]
102pub enum JobStatus {
103 #[sea_orm(string_value = "INITIAL")]
104 Initial,
105 #[sea_orm(string_value = "CREATING")]
106 Creating,
107 #[sea_orm(string_value = "CREATED")]
108 Created,
109}
110
111impl From<JobStatus> for PbStreamJobStatus {
112 fn from(job_status: JobStatus) -> Self {
113 match job_status {
114 JobStatus::Initial => Self::Unspecified,
115 JobStatus::Creating => Self::Creating,
116 JobStatus::Created => Self::Created,
117 }
118 }
119}
120
121impl From<JobStatus> for PbStreamJobState {
123 fn from(status: JobStatus) -> Self {
124 match status {
125 JobStatus::Initial => PbStreamJobState::Initial,
126 JobStatus::Creating => PbStreamJobState::Creating,
127 JobStatus::Created => PbStreamJobState::Created,
128 }
129 }
130}
131
132#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
133#[sea_orm(rs_type = "String", db_type = "string(None)")]
134pub enum CreateType {
135 #[sea_orm(string_value = "BACKGROUND")]
136 Background,
137 #[sea_orm(string_value = "FOREGROUND")]
138 Foreground,
139}
140
141impl From<CreateType> for PbCreateType {
142 fn from(create_type: CreateType) -> Self {
143 match create_type {
144 CreateType::Background => Self::Background,
145 CreateType::Foreground => Self::Foreground,
146 }
147 }
148}
149
150impl From<PbCreateType> for CreateType {
151 fn from(create_type: PbCreateType) -> Self {
152 match create_type {
153 PbCreateType::Background => Self::Background,
154 PbCreateType::Foreground => Self::Foreground,
155 PbCreateType::Unspecified => unreachable!("Unspecified create type"),
156 }
157 }
158}
159
160macro_rules! derive_from_json_struct {
162 ($struct_name:ident, $field_type:ty) => {
163 #[derive(Clone, Debug, PartialEq, FromJsonQueryResult, Serialize, Deserialize, Default)]
164 pub struct $struct_name(pub $field_type);
165 impl Eq for $struct_name {}
166 impl From<$field_type> for $struct_name {
167 fn from(value: $field_type) -> Self {
168 Self(value)
169 }
170 }
171
172 impl $struct_name {
173 pub fn into_inner(self) -> $field_type {
174 self.0
175 }
176
177 pub fn inner_ref(&self) -> &$field_type {
178 &self.0
179 }
180 }
181 };
182}
183
184macro_rules! derive_from_blob {
186 ($struct_name:ident, $field_type:ty) => {
187 #[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, DeriveValueType)]
188 pub struct $struct_name(#[sea_orm] Vec<u8>);
189
190 impl $struct_name {
191 pub fn to_protobuf(&self) -> $field_type {
192 prost::Message::decode(self.0.as_slice()).unwrap()
193 }
194
195 fn from_protobuf(val: &$field_type) -> Self {
196 Self(prost::Message::encode_to_vec(val))
197 }
198 }
199
200 impl sea_orm::sea_query::Nullable for $struct_name {
201 fn null() -> Value {
202 Value::Bytes(None)
203 }
204 }
205
206 impl From<&$field_type> for $struct_name {
207 fn from(value: &$field_type) -> Self {
208 Self::from_protobuf(value)
209 }
210 }
211
212 impl std::fmt::Debug for $struct_name {
213 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
214 self.to_protobuf().fmt(f)
215 }
216 }
217
218 impl Default for $struct_name {
219 fn default() -> Self {
220 Self::from_protobuf(&<$field_type>::default())
221 }
222 }
223 };
224}
225
226macro_rules! derive_array_from_blob {
228 ($struct_name:ident, $field_type:ty, $field_array_name:ident) => {
229 #[derive(Clone, PartialEq, Eq, DeriveValueType, serde::Deserialize, serde::Serialize)]
230 pub struct $struct_name(#[sea_orm] Vec<u8>);
231
232 #[derive(Clone, PartialEq, ::prost::Message)]
233 pub struct $field_array_name {
234 #[prost(message, repeated, tag = "1")]
235 inner: Vec<$field_type>,
236 }
237 impl Eq for $field_array_name {}
238
239 impl $struct_name {
240 pub fn to_protobuf(&self) -> Vec<$field_type> {
241 let data: $field_array_name = prost::Message::decode(self.0.as_slice()).unwrap();
242 data.inner
243 }
244
245 fn from_protobuf(val: Vec<$field_type>) -> Self {
246 Self(prost::Message::encode_to_vec(&$field_array_name {
247 inner: val,
248 }))
249 }
250 }
251
252 impl From<Vec<$field_type>> for $struct_name {
253 fn from(value: Vec<$field_type>) -> Self {
254 Self::from_protobuf(value)
255 }
256 }
257
258 impl std::fmt::Debug for $struct_name {
259 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
260 self.to_protobuf().fmt(f)
261 }
262 }
263
264 impl Default for $struct_name {
265 fn default() -> Self {
266 Self(vec![])
267 }
268 }
269
270 impl sea_orm::sea_query::Nullable for $struct_name {
271 fn null() -> Value {
272 Value::Bytes(None)
273 }
274 }
275 };
276}
277
278macro_rules! derive_btreemap_from_blob {
279 ($struct_name:ident, $key_type:ty, $value_type:ty, $field_type:ident) => {
280 #[derive(Clone, PartialEq, Eq, DeriveValueType, serde::Deserialize, serde::Serialize)]
281 pub struct $struct_name(#[sea_orm] Vec<u8>);
282
283 #[derive(Clone, PartialEq, ::prost::Message)]
284 pub struct $field_type {
285 #[prost(btree_map = "string, message")]
286 inner: BTreeMap<$key_type, $value_type>,
287 }
288 impl Eq for $field_type {}
289
290 impl $struct_name {
291 pub fn to_protobuf(&self) -> BTreeMap<$key_type, $value_type> {
292 let data: $field_type = prost::Message::decode(self.0.as_slice()).unwrap();
293 data.inner
294 }
295
296 fn from_protobuf(val: BTreeMap<$key_type, $value_type>) -> Self {
297 Self(prost::Message::encode_to_vec(&$field_type { inner: val }))
298 }
299 }
300
301 impl From<BTreeMap<$key_type, $value_type>> for $struct_name {
302 fn from(value: BTreeMap<$key_type, $value_type>) -> Self {
303 Self::from_protobuf(value)
304 }
305 }
306
307 impl std::fmt::Debug for $struct_name {
308 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
309 self.to_protobuf().fmt(f)
310 }
311 }
312
313 impl Default for $struct_name {
314 fn default() -> Self {
315 Self(vec![])
316 }
317 }
318
319 impl sea_orm::sea_query::Nullable for $struct_name {
320 fn null() -> Value {
321 Value::Bytes(None)
322 }
323 }
324 };
325}
326
327pub(crate) use {derive_array_from_blob, derive_from_blob};
328
329derive_from_json_struct!(I32Array, Vec<i32>);
330
331impl From<Vec<u32>> for I32Array {
332 fn from(value: Vec<u32>) -> Self {
333 Self(value.into_iter().map(|id| id as _).collect())
334 }
335}
336
337impl I32Array {
338 pub fn into_u32_array(self) -> Vec<u32> {
339 self.0.into_iter().map(|id| id as _).collect()
340 }
341}
342
343derive_from_json_struct!(ActorUpstreamActors, BTreeMap<FragmentId, Vec<ActorId>>);
344
345impl From<BTreeMap<u32, Vec<u32>>> for ActorUpstreamActors {
346 fn from(val: BTreeMap<u32, Vec<u32>>) -> Self {
347 let mut map = BTreeMap::new();
348 for (k, v) in val {
349 map.insert(k as _, v.into_iter().map(|a| a as _).collect());
350 }
351 Self(map)
352 }
353}
354
355derive_btreemap_from_blob!(SecretRef, String, PbSecretRef, PbSecretRefMap);
356
357derive_from_blob!(StreamNode, PbStreamNode);
358derive_from_blob!(DataType, risingwave_pb::data::PbDataType);
359derive_array_from_blob!(
360 DataTypeArray,
361 risingwave_pb::data::PbDataType,
362 PbDataTypeArray
363);
364derive_array_from_blob!(
365 FieldArray,
366 risingwave_pb::plan_common::PbField,
367 PbFieldArray
368);
369derive_from_json_struct!(Property, BTreeMap<String, String>);
370derive_from_blob!(ColumnCatalog, risingwave_pb::plan_common::PbColumnCatalog);
371derive_array_from_blob!(
372 ColumnCatalogArray,
373 risingwave_pb::plan_common::PbColumnCatalog,
374 PbColumnCatalogArray
375);
376derive_from_blob!(StreamSourceInfo, risingwave_pb::catalog::PbStreamSourceInfo);
377derive_from_blob!(
378 WebhookSourceInfo,
379 risingwave_pb::catalog::PbWebhookSourceInfo
380);
381derive_from_blob!(WatermarkDesc, risingwave_pb::catalog::PbWatermarkDesc);
382derive_array_from_blob!(
383 WatermarkDescArray,
384 risingwave_pb::catalog::PbWatermarkDesc,
385 PbWatermarkDescArray
386);
387derive_array_from_blob!(
388 ExprNodeArray,
389 risingwave_pb::expr::PbExprNode,
390 PbExprNodeArray
391);
392derive_array_from_blob!(
393 ColumnOrderArray,
394 risingwave_pb::common::PbColumnOrder,
395 PbColumnOrderArray
396);
397derive_array_from_blob!(
398 IndexColumnPropertiesArray,
399 risingwave_pb::catalog::PbIndexColumnProperties,
400 PbIndexColumnPropertiesArray
401);
402derive_from_blob!(SinkFormatDesc, risingwave_pb::catalog::PbSinkFormatDesc);
403derive_from_blob!(Cardinality, risingwave_pb::plan_common::PbCardinality);
404derive_from_blob!(TableVersion, risingwave_pb::catalog::table::PbTableVersion);
405derive_from_blob!(
406 PrivateLinkService,
407 risingwave_pb::catalog::connection::PbPrivateLinkService
408);
409derive_from_blob!(ConnectionParams, risingwave_pb::catalog::ConnectionParams);
410derive_from_blob!(AuthInfo, risingwave_pb::user::PbAuthInfo);
411
412derive_from_blob!(ConnectorSplits, risingwave_pb::source::ConnectorSplits);
413derive_from_blob!(VnodeBitmap, risingwave_pb::common::Buffer);
414derive_from_blob!(ActorMapping, risingwave_pb::stream_plan::PbActorMapping);
415derive_from_blob!(ExprContext, risingwave_pb::plan_common::PbExprContext);
416
417derive_array_from_blob!(
418 TypePairArray,
419 risingwave_pb::stream_plan::dispatch_output_mapping::TypePair,
420 PbTypePairArray
421);
422
423derive_array_from_blob!(
424 HummockVersionDeltaArray,
425 risingwave_pb::hummock::PbHummockVersionDelta,
426 PbHummockVersionDeltaArray
427);
428
429#[derive(Clone, Debug, PartialEq, FromJsonQueryResult, Serialize, Deserialize)]
430pub enum StreamingParallelism {
431 Adaptive,
432 Fixed(usize),
433 Custom,
434}
435
436impl Eq for StreamingParallelism {}
437
438#[derive(
439 Hash, Copy, Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize,
440)]
441#[sea_orm(rs_type = "String", db_type = "string(None)")]
442pub enum DispatcherType {
443 #[sea_orm(string_value = "HASH")]
444 Hash,
445 #[sea_orm(string_value = "BROADCAST")]
446 Broadcast,
447 #[sea_orm(string_value = "SIMPLE")]
448 Simple,
449 #[sea_orm(string_value = "NO_SHUFFLE")]
450 NoShuffle,
451}
452
453impl From<PbDispatcherType> for DispatcherType {
454 fn from(val: PbDispatcherType) -> Self {
455 match val {
456 PbDispatcherType::Unspecified => unreachable!(),
457 PbDispatcherType::Hash => DispatcherType::Hash,
458 PbDispatcherType::Broadcast => DispatcherType::Broadcast,
459 PbDispatcherType::Simple => DispatcherType::Simple,
460 PbDispatcherType::NoShuffle => DispatcherType::NoShuffle,
461 }
462 }
463}
464
465impl From<DispatcherType> for PbDispatcherType {
466 fn from(val: DispatcherType) -> Self {
467 match val {
468 DispatcherType::Hash => PbDispatcherType::Hash,
469 DispatcherType::Broadcast => PbDispatcherType::Broadcast,
470 DispatcherType::Simple => PbDispatcherType::Simple,
471 DispatcherType::NoShuffle => PbDispatcherType::NoShuffle,
472 }
473 }
474}