1use std::collections::BTreeMap;
16
17use risingwave_pb::catalog::{PbCreateType, PbStreamJobStatus};
18use risingwave_pb::meta::table_fragments::PbState as PbStreamJobState;
19use risingwave_pb::secret::PbSecretRef;
20use risingwave_pb::stream_plan::{PbDispatcherType, PbStreamNode};
21use sea_orm::entity::prelude::*;
22use sea_orm::{DeriveActiveEnum, EnumIter, FromJsonQueryResult};
23use serde::{Deserialize, Serialize};
24
25pub mod prelude;
26
27pub mod actor;
28pub mod catalog_version;
29pub mod cluster;
30pub mod compaction_config;
31pub mod compaction_status;
32pub mod compaction_task;
33pub mod connection;
34pub mod database;
35pub mod exactly_once_iceberg_sink;
36pub mod fragment;
37pub mod fragment_relation;
38pub mod function;
39pub mod hummock_epoch_to_version;
40pub mod hummock_gc_history;
41pub mod hummock_pinned_snapshot;
42pub mod hummock_pinned_version;
43pub mod hummock_sequence;
44pub mod hummock_sstable_info;
45pub mod hummock_time_travel_delta;
46pub mod hummock_time_travel_version;
47pub mod hummock_version_delta;
48pub mod hummock_version_stats;
49pub mod iceberg_namespace_properties;
50pub mod iceberg_tables;
51pub mod index;
52pub mod object;
53pub mod object_dependency;
54pub mod schema;
55pub mod secret;
56pub mod serde_seaql_migration;
57pub mod session_parameter;
58pub mod sink;
59pub mod source;
60pub mod streaming_job;
61pub mod subscription;
62pub mod system_parameter;
63pub mod table;
64pub mod user;
65pub mod user_privilege;
66pub mod view;
67pub mod worker;
68pub mod worker_property;
69
70pub type WorkerId = i32;
71
72pub type TransactionId = i32;
73
74pub type ObjectId = i32;
75pub type DatabaseId = ObjectId;
76pub type SchemaId = ObjectId;
77pub type TableId = ObjectId;
78pub type SourceId = ObjectId;
79pub type SinkId = ObjectId;
80pub type SubscriptionId = ObjectId;
81pub type IndexId = ObjectId;
82pub type ViewId = ObjectId;
83pub type FunctionId = ObjectId;
84pub type ConnectionId = ObjectId;
85pub type SecretId = ObjectId;
86pub type UserId = i32;
87pub type PrivilegeId = i32;
88
89pub type HummockVersionId = i64;
90pub type Epoch = i64;
91pub type CompactionGroupId = i64;
92pub type CompactionTaskId = i64;
93pub type HummockSstableObjectId = i64;
94
95pub type FragmentId = i32;
96pub type ActorId = i32;
97
98#[derive(Clone, Copy, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
99#[sea_orm(rs_type = "String", db_type = "string(None)")]
100pub enum JobStatus {
101 #[sea_orm(string_value = "INITIAL")]
102 Initial,
103 #[sea_orm(string_value = "CREATING")]
104 Creating,
105 #[sea_orm(string_value = "CREATED")]
106 Created,
107}
108
109impl From<JobStatus> for PbStreamJobStatus {
110 fn from(job_status: JobStatus) -> Self {
111 match job_status {
112 JobStatus::Initial => Self::Unspecified,
113 JobStatus::Creating => Self::Creating,
114 JobStatus::Created => Self::Created,
115 }
116 }
117}
118
119impl From<JobStatus> for PbStreamJobState {
121 fn from(status: JobStatus) -> Self {
122 match status {
123 JobStatus::Initial => PbStreamJobState::Initial,
124 JobStatus::Creating => PbStreamJobState::Creating,
125 JobStatus::Created => PbStreamJobState::Created,
126 }
127 }
128}
129
130#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
131#[sea_orm(rs_type = "String", db_type = "string(None)")]
132pub enum CreateType {
133 #[sea_orm(string_value = "BACKGROUND")]
134 Background,
135 #[sea_orm(string_value = "FOREGROUND")]
136 Foreground,
137}
138
139impl From<CreateType> for PbCreateType {
140 fn from(create_type: CreateType) -> Self {
141 match create_type {
142 CreateType::Background => Self::Background,
143 CreateType::Foreground => Self::Foreground,
144 }
145 }
146}
147
148impl From<PbCreateType> for CreateType {
149 fn from(create_type: PbCreateType) -> Self {
150 match create_type {
151 PbCreateType::Background => Self::Background,
152 PbCreateType::Foreground => Self::Foreground,
153 PbCreateType::Unspecified => unreachable!("Unspecified create type"),
154 }
155 }
156}
157
158macro_rules! derive_from_json_struct {
160 ($struct_name:ident, $field_type:ty) => {
161 #[derive(Clone, Debug, PartialEq, FromJsonQueryResult, Serialize, Deserialize, Default)]
162 pub struct $struct_name(pub $field_type);
163 impl Eq for $struct_name {}
164 impl From<$field_type> for $struct_name {
165 fn from(value: $field_type) -> Self {
166 Self(value)
167 }
168 }
169
170 impl $struct_name {
171 pub fn into_inner(self) -> $field_type {
172 self.0
173 }
174
175 pub fn inner_ref(&self) -> &$field_type {
176 &self.0
177 }
178 }
179 };
180}
181
182macro_rules! derive_from_blob {
184 ($struct_name:ident, $field_type:ty) => {
185 #[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, DeriveValueType)]
186 pub struct $struct_name(#[sea_orm] Vec<u8>);
187
188 impl $struct_name {
189 pub fn to_protobuf(&self) -> $field_type {
190 prost::Message::decode(self.0.as_slice()).unwrap()
191 }
192
193 fn from_protobuf(val: &$field_type) -> Self {
194 Self(prost::Message::encode_to_vec(val))
195 }
196 }
197
198 impl sea_orm::sea_query::Nullable for $struct_name {
199 fn null() -> Value {
200 Value::Bytes(None)
201 }
202 }
203
204 impl From<&$field_type> for $struct_name {
205 fn from(value: &$field_type) -> Self {
206 Self::from_protobuf(value)
207 }
208 }
209
210 impl std::fmt::Debug for $struct_name {
211 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
212 self.to_protobuf().fmt(f)
213 }
214 }
215
216 impl Default for $struct_name {
217 fn default() -> Self {
218 Self::from_protobuf(&<$field_type>::default())
219 }
220 }
221 };
222}
223
224macro_rules! derive_array_from_blob {
226 ($struct_name:ident, $field_type:ty, $field_array_name:ident) => {
227 #[derive(Clone, PartialEq, Eq, DeriveValueType, serde::Deserialize, serde::Serialize)]
228 pub struct $struct_name(#[sea_orm] Vec<u8>);
229
230 #[derive(Clone, PartialEq, ::prost::Message)]
231 pub struct $field_array_name {
232 #[prost(message, repeated, tag = "1")]
233 inner: Vec<$field_type>,
234 }
235 impl Eq for $field_array_name {}
236
237 impl $struct_name {
238 pub fn to_protobuf(&self) -> Vec<$field_type> {
239 let data: $field_array_name = prost::Message::decode(self.0.as_slice()).unwrap();
240 data.inner
241 }
242
243 fn from_protobuf(val: Vec<$field_type>) -> Self {
244 Self(prost::Message::encode_to_vec(&$field_array_name {
245 inner: val,
246 }))
247 }
248 }
249
250 impl From<Vec<$field_type>> for $struct_name {
251 fn from(value: Vec<$field_type>) -> Self {
252 Self::from_protobuf(value)
253 }
254 }
255
256 impl std::fmt::Debug for $struct_name {
257 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
258 self.to_protobuf().fmt(f)
259 }
260 }
261
262 impl Default for $struct_name {
263 fn default() -> Self {
264 Self(vec![])
265 }
266 }
267
268 impl sea_orm::sea_query::Nullable for $struct_name {
269 fn null() -> Value {
270 Value::Bytes(None)
271 }
272 }
273 };
274}
275
276macro_rules! derive_btreemap_from_blob {
277 ($struct_name:ident, $key_type:ty, $value_type:ty, $field_type:ident) => {
278 #[derive(Clone, PartialEq, Eq, DeriveValueType, serde::Deserialize, serde::Serialize)]
279 pub struct $struct_name(#[sea_orm] Vec<u8>);
280
281 #[derive(Clone, PartialEq, ::prost::Message)]
282 pub struct $field_type {
283 #[prost(btree_map = "string, message")]
284 inner: BTreeMap<$key_type, $value_type>,
285 }
286 impl Eq for $field_type {}
287
288 impl $struct_name {
289 pub fn to_protobuf(&self) -> BTreeMap<$key_type, $value_type> {
290 let data: $field_type = prost::Message::decode(self.0.as_slice()).unwrap();
291 data.inner
292 }
293
294 fn from_protobuf(val: BTreeMap<$key_type, $value_type>) -> Self {
295 Self(prost::Message::encode_to_vec(&$field_type { inner: val }))
296 }
297 }
298
299 impl From<BTreeMap<$key_type, $value_type>> for $struct_name {
300 fn from(value: BTreeMap<$key_type, $value_type>) -> Self {
301 Self::from_protobuf(value)
302 }
303 }
304
305 impl std::fmt::Debug for $struct_name {
306 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
307 self.to_protobuf().fmt(f)
308 }
309 }
310
311 impl Default for $struct_name {
312 fn default() -> Self {
313 Self(vec![])
314 }
315 }
316
317 impl sea_orm::sea_query::Nullable for $struct_name {
318 fn null() -> Value {
319 Value::Bytes(None)
320 }
321 }
322 };
323}
324
325pub(crate) use {derive_array_from_blob, derive_from_blob};
326
327derive_from_json_struct!(I32Array, Vec<i32>);
328
329impl From<Vec<u32>> for I32Array {
330 fn from(value: Vec<u32>) -> Self {
331 Self(value.into_iter().map(|id| id as _).collect())
332 }
333}
334
335impl I32Array {
336 pub fn into_u32_array(self) -> Vec<u32> {
337 self.0.into_iter().map(|id| id as _).collect()
338 }
339}
340
341derive_from_json_struct!(ActorUpstreamActors, BTreeMap<FragmentId, Vec<ActorId>>);
342
343impl From<BTreeMap<u32, Vec<u32>>> for ActorUpstreamActors {
344 fn from(val: BTreeMap<u32, Vec<u32>>) -> Self {
345 let mut map = BTreeMap::new();
346 for (k, v) in val {
347 map.insert(k as _, v.into_iter().map(|a| a as _).collect());
348 }
349 Self(map)
350 }
351}
352
353derive_btreemap_from_blob!(SecretRef, String, PbSecretRef, PbSecretRefMap);
354
355derive_from_blob!(StreamNode, PbStreamNode);
356derive_from_blob!(DataType, risingwave_pb::data::PbDataType);
357derive_array_from_blob!(
358 DataTypeArray,
359 risingwave_pb::data::PbDataType,
360 PbDataTypeArray
361);
362derive_array_from_blob!(
363 FieldArray,
364 risingwave_pb::plan_common::PbField,
365 PbFieldArray
366);
367derive_from_json_struct!(Property, BTreeMap<String, String>);
368derive_from_blob!(ColumnCatalog, risingwave_pb::plan_common::PbColumnCatalog);
369derive_array_from_blob!(
370 ColumnCatalogArray,
371 risingwave_pb::plan_common::PbColumnCatalog,
372 PbColumnCatalogArray
373);
374derive_from_blob!(StreamSourceInfo, risingwave_pb::catalog::PbStreamSourceInfo);
375derive_from_blob!(
376 WebhookSourceInfo,
377 risingwave_pb::catalog::PbWebhookSourceInfo
378);
379derive_from_blob!(WatermarkDesc, risingwave_pb::catalog::PbWatermarkDesc);
380derive_array_from_blob!(
381 WatermarkDescArray,
382 risingwave_pb::catalog::PbWatermarkDesc,
383 PbWatermarkDescArray
384);
385derive_array_from_blob!(
386 ExprNodeArray,
387 risingwave_pb::expr::PbExprNode,
388 PbExprNodeArray
389);
390derive_array_from_blob!(
391 ColumnOrderArray,
392 risingwave_pb::common::PbColumnOrder,
393 PbColumnOrderArray
394);
395derive_array_from_blob!(
396 IndexColumnPropertiesArray,
397 risingwave_pb::catalog::PbIndexColumnProperties,
398 PbIndexColumnPropertiesArray
399);
400derive_from_blob!(SinkFormatDesc, risingwave_pb::catalog::PbSinkFormatDesc);
401derive_from_blob!(Cardinality, risingwave_pb::plan_common::PbCardinality);
402derive_from_blob!(TableVersion, risingwave_pb::catalog::table::PbTableVersion);
403derive_from_blob!(
404 PrivateLinkService,
405 risingwave_pb::catalog::connection::PbPrivateLinkService
406);
407derive_from_blob!(ConnectionParams, risingwave_pb::catalog::ConnectionParams);
408derive_from_blob!(AuthInfo, risingwave_pb::user::PbAuthInfo);
409
410derive_from_blob!(ConnectorSplits, risingwave_pb::source::ConnectorSplits);
411derive_from_blob!(VnodeBitmap, risingwave_pb::common::Buffer);
412derive_from_blob!(ActorMapping, risingwave_pb::stream_plan::PbActorMapping);
413derive_from_blob!(ExprContext, risingwave_pb::plan_common::PbExprContext);
414
415derive_array_from_blob!(
416 HummockVersionDeltaArray,
417 risingwave_pb::hummock::PbHummockVersionDelta,
418 PbHummockVersionDeltaArray
419);
420
421#[derive(Clone, Debug, PartialEq, FromJsonQueryResult, Serialize, Deserialize)]
422pub enum StreamingParallelism {
423 Adaptive,
424 Fixed(usize),
425 Custom,
426}
427
428impl Eq for StreamingParallelism {}
429
430#[derive(
431 Hash, Copy, Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize,
432)]
433#[sea_orm(rs_type = "String", db_type = "string(None)")]
434pub enum DispatcherType {
435 #[sea_orm(string_value = "HASH")]
436 Hash,
437 #[sea_orm(string_value = "BROADCAST")]
438 Broadcast,
439 #[sea_orm(string_value = "SIMPLE")]
440 Simple,
441 #[sea_orm(string_value = "NO_SHUFFLE")]
442 NoShuffle,
443}
444
445impl From<PbDispatcherType> for DispatcherType {
446 fn from(val: PbDispatcherType) -> Self {
447 match val {
448 PbDispatcherType::Unspecified => unreachable!(),
449 PbDispatcherType::Hash => DispatcherType::Hash,
450 PbDispatcherType::Broadcast => DispatcherType::Broadcast,
451 PbDispatcherType::Simple => DispatcherType::Simple,
452 PbDispatcherType::NoShuffle => DispatcherType::NoShuffle,
453 }
454 }
455}
456
457impl From<DispatcherType> for PbDispatcherType {
458 fn from(val: DispatcherType) -> Self {
459 match val {
460 DispatcherType::Hash => PbDispatcherType::Hash,
461 DispatcherType::Broadcast => PbDispatcherType::Broadcast,
462 DispatcherType::Simple => PbDispatcherType::Simple,
463 DispatcherType::NoShuffle => PbDispatcherType::NoShuffle,
464 }
465 }
466}