1use std::collections::BTreeMap;
16
17pub use risingwave_common::id::*;
18use risingwave_pb::catalog::{PbCreateType, PbStreamJobStatus};
19use risingwave_pb::meta::table_fragments::PbState as PbStreamJobState;
20use risingwave_pb::secret::PbSecretRef;
21use risingwave_pb::stream_plan::{PbDispatcherType, PbStreamNode};
22use sea_orm::entity::prelude::*;
23use sea_orm::{DeriveActiveEnum, EnumIter, FromJsonQueryResult, Value};
24use serde::{Deserialize, Serialize};
25
26pub mod prelude;
27
28pub mod catalog_version;
29pub mod cdc_table_snapshot_split;
30pub mod cluster;
31pub mod compaction_config;
32pub mod compaction_status;
33pub mod compaction_task;
34pub mod connection;
35pub mod database;
36pub mod exactly_once_iceberg_sink;
37pub mod fragment;
38pub mod fragment_relation;
39pub mod fragment_splits;
40pub mod function;
41pub mod hummock_epoch_to_version;
42pub mod hummock_gc_history;
43pub mod hummock_pinned_snapshot;
44pub mod hummock_pinned_version;
45pub mod hummock_sequence;
46pub mod hummock_sstable_info;
47pub mod hummock_time_travel_delta;
48pub mod hummock_time_travel_version;
49pub mod hummock_version_delta;
50pub mod hummock_version_stats;
51pub mod iceberg_namespace_properties;
52pub mod iceberg_tables;
53pub mod index;
54pub mod object;
55pub mod object_dependency;
56pub mod pending_sink_state;
57pub mod refresh_job;
58pub mod schema;
59pub mod secret;
60pub mod serde_seaql_migration;
61pub mod session_parameter;
62pub mod sink;
63pub mod source;
64pub mod streaming_job;
65pub mod subscription;
66pub mod system_parameter;
67pub mod table;
68pub mod user;
69pub mod user_default_privilege;
70pub mod user_privilege;
71pub mod view;
72pub mod worker;
73pub mod worker_property;
74
75pub type TransactionId = i32;
76
77pub type PrivilegeId = i32;
78pub type DefaultPrivilegeId = i32;
79
80pub use risingwave_pb::id::{CompactionGroupId, HummockSstableObjectId, HummockVersionId};
81pub type Epoch = i64;
82pub type CompactionTaskId = i64;
83
84#[derive(Clone, Copy, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
85#[sea_orm(rs_type = "String", db_type = "string(None)")]
86pub enum JobStatus {
87 #[sea_orm(string_value = "INITIAL")]
88 Initial,
89 #[sea_orm(string_value = "CREATING")]
90 Creating,
91 #[sea_orm(string_value = "CREATED")]
92 Created,
93}
94
95impl From<JobStatus> for PbStreamJobStatus {
96 fn from(job_status: JobStatus) -> Self {
97 match job_status {
98 JobStatus::Initial => Self::Unspecified,
99 JobStatus::Creating => Self::Creating,
100 JobStatus::Created => Self::Created,
101 }
102 }
103}
104
105impl From<JobStatus> for PbStreamJobState {
107 fn from(status: JobStatus) -> Self {
108 match status {
109 JobStatus::Initial => PbStreamJobState::Initial,
110 JobStatus::Creating => PbStreamJobState::Creating,
111 JobStatus::Created => PbStreamJobState::Created,
112 }
113 }
114}
115
116#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
117#[sea_orm(rs_type = "String", db_type = "string(None)")]
118pub enum CreateType {
119 #[sea_orm(string_value = "BACKGROUND")]
120 Background,
121 #[sea_orm(string_value = "FOREGROUND")]
122 Foreground,
123}
124
125impl From<CreateType> for PbCreateType {
126 fn from(create_type: CreateType) -> Self {
127 match create_type {
128 CreateType::Background => Self::Background,
129 CreateType::Foreground => Self::Foreground,
130 }
131 }
132}
133
134impl From<PbCreateType> for CreateType {
135 fn from(create_type: PbCreateType) -> Self {
136 match create_type {
137 PbCreateType::Background => Self::Background,
138 PbCreateType::Foreground => Self::Foreground,
139 PbCreateType::Unspecified => unreachable!("Unspecified create type"),
140 }
141 }
142}
143
144impl CreateType {
145 pub fn as_str(&self) -> &'static str {
146 match self {
147 CreateType::Background => "BACKGROUND",
148 CreateType::Foreground => "FOREGROUND",
149 }
150 }
151}
152
153#[macro_export]
155macro_rules! derive_from_json_struct {
156 ($struct_name:ident, $field_type:ty) => {
157 #[derive(Clone, Debug, PartialEq, FromJsonQueryResult, Serialize, Deserialize, Default)]
158 pub struct $struct_name(pub $field_type);
159 impl Eq for $struct_name {}
160 impl From<$field_type> for $struct_name {
161 fn from(value: $field_type) -> Self {
162 Self(value)
163 }
164 }
165
166 impl $struct_name {
167 pub fn into_inner(self) -> $field_type {
168 self.0
169 }
170
171 pub fn inner_ref(&self) -> &$field_type {
172 &self.0
173 }
174 }
175 };
176}
177
178macro_rules! derive_from_blob {
180 ($struct_name:ident, $field_type:ty) => {
181 #[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, DeriveValueType)]
182 pub struct $struct_name(#[sea_orm] Vec<u8>);
183
184 impl $struct_name {
185 pub fn to_protobuf(&self) -> $field_type {
186 prost::Message::decode(self.0.as_slice()).unwrap()
187 }
188
189 fn from_protobuf(val: &$field_type) -> Self {
190 Self(prost::Message::encode_to_vec(val))
191 }
192 }
193
194 impl sea_orm::sea_query::Nullable for $struct_name {
195 fn null() -> Value {
196 Value::Bytes(None)
197 }
198 }
199
200 impl From<&$field_type> for $struct_name {
201 fn from(value: &$field_type) -> Self {
202 Self::from_protobuf(value)
203 }
204 }
205
206 impl std::fmt::Debug for $struct_name {
207 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
208 self.to_protobuf().fmt(f)
209 }
210 }
211
212 impl Default for $struct_name {
213 fn default() -> Self {
214 Self::from_protobuf(&<$field_type>::default())
215 }
216 }
217 };
218}
219
220macro_rules! derive_array_from_blob {
222 ($struct_name:ident, $field_type:ty, $field_array_name:ident) => {
223 #[derive(Clone, PartialEq, Eq, DeriveValueType, serde::Deserialize, serde::Serialize)]
224 pub struct $struct_name(#[sea_orm] Vec<u8>);
225
226 #[derive(Clone, PartialEq, ::prost::Message)]
227 pub struct $field_array_name {
228 #[prost(message, repeated, tag = "1")]
229 inner: Vec<$field_type>,
230 }
231 impl Eq for $field_array_name {}
232
233 impl $struct_name {
234 pub fn to_protobuf(&self) -> Vec<$field_type> {
235 let data: $field_array_name = prost::Message::decode(self.0.as_slice()).unwrap();
236 data.inner
237 }
238
239 fn from_protobuf(val: Vec<$field_type>) -> Self {
240 Self(prost::Message::encode_to_vec(&$field_array_name {
241 inner: val,
242 }))
243 }
244 }
245
246 impl From<Vec<$field_type>> for $struct_name {
247 fn from(value: Vec<$field_type>) -> Self {
248 Self::from_protobuf(value)
249 }
250 }
251
252 impl std::fmt::Debug for $struct_name {
253 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
254 self.to_protobuf().fmt(f)
255 }
256 }
257
258 impl Default for $struct_name {
259 fn default() -> Self {
260 Self(vec![])
261 }
262 }
263
264 impl sea_orm::sea_query::Nullable for $struct_name {
265 fn null() -> Value {
266 Value::Bytes(None)
267 }
268 }
269 };
270}
271
272macro_rules! derive_btreemap_from_blob {
273 ($struct_name:ident, $key_type:ty, $value_type:ty, $field_type:ident) => {
274 #[derive(Clone, PartialEq, Eq, DeriveValueType, serde::Deserialize, serde::Serialize)]
275 pub struct $struct_name(#[sea_orm] Vec<u8>);
276
277 #[derive(Clone, PartialEq, ::prost::Message)]
278 pub struct $field_type {
279 #[prost(btree_map = "string, message")]
280 inner: BTreeMap<$key_type, $value_type>,
281 }
282 impl Eq for $field_type {}
283
284 impl $struct_name {
285 pub fn to_protobuf(&self) -> BTreeMap<$key_type, $value_type> {
286 let data: $field_type = prost::Message::decode(self.0.as_slice()).unwrap();
287 data.inner
288 }
289
290 fn from_protobuf(val: BTreeMap<$key_type, $value_type>) -> Self {
291 Self(prost::Message::encode_to_vec(&$field_type { inner: val }))
292 }
293 }
294
295 impl From<BTreeMap<$key_type, $value_type>> for $struct_name {
296 fn from(value: BTreeMap<$key_type, $value_type>) -> Self {
297 Self::from_protobuf(value)
298 }
299 }
300
301 impl std::fmt::Debug for $struct_name {
302 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
303 self.to_protobuf().fmt(f)
304 }
305 }
306
307 impl Default for $struct_name {
308 fn default() -> Self {
309 Self(vec![])
310 }
311 }
312
313 impl sea_orm::sea_query::Nullable for $struct_name {
314 fn null() -> Value {
315 Value::Bytes(None)
316 }
317 }
318 };
319}
320
321pub(crate) use {derive_array_from_blob, derive_from_blob};
322
323derive_from_json_struct!(TableIdArray, Vec<TableId>);
324
325derive_from_json_struct!(I32Array, Vec<i32>);
326
327impl From<Vec<u32>> for I32Array {
328 fn from(value: Vec<u32>) -> Self {
329 Self(value.into_iter().map(|id| id as _).collect())
330 }
331}
332
333impl I32Array {
334 pub fn into_u32_array(self) -> Vec<u32> {
335 self.0.into_iter().map(|id| id as _).collect()
336 }
337}
338
339derive_btreemap_from_blob!(SecretRef, String, PbSecretRef, PbSecretRefMap);
340
341derive_from_blob!(StreamNode, PbStreamNode);
342derive_from_blob!(DataType, risingwave_pb::data::PbDataType);
343derive_array_from_blob!(
344 DataTypeArray,
345 risingwave_pb::data::PbDataType,
346 PbDataTypeArray
347);
348derive_array_from_blob!(
349 FieldArray,
350 risingwave_pb::plan_common::PbField,
351 PbFieldArray
352);
353derive_from_json_struct!(Property, BTreeMap<String, String>);
354derive_from_blob!(ColumnCatalog, risingwave_pb::plan_common::PbColumnCatalog);
355derive_array_from_blob!(
356 ColumnCatalogArray,
357 risingwave_pb::plan_common::PbColumnCatalog,
358 PbColumnCatalogArray
359);
360derive_from_blob!(StreamSourceInfo, risingwave_pb::catalog::PbStreamSourceInfo);
361derive_from_blob!(
362 WebhookSourceInfo,
363 risingwave_pb::catalog::PbWebhookSourceInfo
364);
365derive_from_blob!(WatermarkDesc, risingwave_pb::catalog::PbWatermarkDesc);
366derive_array_from_blob!(
367 WatermarkDescArray,
368 risingwave_pb::catalog::PbWatermarkDesc,
369 PbWatermarkDescArray
370);
371derive_array_from_blob!(
372 ExprNodeArray,
373 risingwave_pb::expr::PbExprNode,
374 PbExprNodeArray
375);
376derive_array_from_blob!(
377 ColumnOrderArray,
378 risingwave_pb::common::PbColumnOrder,
379 PbColumnOrderArray
380);
381derive_array_from_blob!(
382 IndexColumnPropertiesArray,
383 risingwave_pb::catalog::PbIndexColumnProperties,
384 PbIndexColumnPropertiesArray
385);
386derive_from_blob!(SinkFormatDesc, risingwave_pb::catalog::PbSinkFormatDesc);
387derive_from_blob!(Cardinality, risingwave_pb::plan_common::PbCardinality);
388derive_from_blob!(TableVersion, risingwave_pb::catalog::table::PbTableVersion);
389derive_from_blob!(
390 PrivateLinkService,
391 risingwave_pb::catalog::connection::PbPrivateLinkService
392);
393derive_from_blob!(ConnectionParams, risingwave_pb::catalog::ConnectionParams);
394derive_from_blob!(AuthInfo, risingwave_pb::user::PbAuthInfo);
395
396derive_from_blob!(ConnectorSplits, risingwave_pb::source::ConnectorSplits);
397derive_from_blob!(VnodeBitmap, risingwave_pb::common::Buffer);
398derive_from_blob!(ActorMapping, risingwave_pb::stream_plan::PbActorMapping);
399derive_from_blob!(ExprContext, risingwave_pb::plan_common::PbExprContext);
400derive_from_blob!(
401 SourceRefreshMode,
402 risingwave_pb::plan_common::PbSourceRefreshMode
403);
404
405derive_from_blob!(
406 SinkSchemachange,
407 risingwave_pb::stream_plan::PbSinkSchemaChange
408);
409
410derive_array_from_blob!(
411 TypePairArray,
412 risingwave_pb::stream_plan::dispatch_output_mapping::TypePair,
413 PbTypePairArray
414);
415
416derive_array_from_blob!(
417 HummockVersionDeltaArray,
418 risingwave_pb::hummock::PbHummockVersionDelta,
419 PbHummockVersionDeltaArray
420);
421
422#[derive(Clone, Debug, PartialEq, FromJsonQueryResult, Serialize, Deserialize)]
423pub enum StreamingParallelism {
424 Adaptive,
425 Fixed(usize),
426 Custom,
427}
428
429impl Eq for StreamingParallelism {}
430
431#[derive(
432 Hash, Copy, Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize,
433)]
434#[sea_orm(rs_type = "String", db_type = "string(None)")]
435pub enum DispatcherType {
436 #[sea_orm(string_value = "HASH")]
437 Hash,
438 #[sea_orm(string_value = "BROADCAST")]
439 Broadcast,
440 #[sea_orm(string_value = "SIMPLE")]
441 Simple,
442 #[sea_orm(string_value = "NO_SHUFFLE")]
443 NoShuffle,
444}
445
446impl From<PbDispatcherType> for DispatcherType {
447 fn from(val: PbDispatcherType) -> Self {
448 match val {
449 PbDispatcherType::Unspecified => unreachable!(),
450 PbDispatcherType::Hash => DispatcherType::Hash,
451 PbDispatcherType::Broadcast => DispatcherType::Broadcast,
452 PbDispatcherType::Simple => DispatcherType::Simple,
453 PbDispatcherType::NoShuffle => DispatcherType::NoShuffle,
454 }
455 }
456}
457
458impl From<DispatcherType> for PbDispatcherType {
459 fn from(val: DispatcherType) -> Self {
460 match val {
461 DispatcherType::Hash => PbDispatcherType::Hash,
462 DispatcherType::Broadcast => PbDispatcherType::Broadcast,
463 DispatcherType::Simple => PbDispatcherType::Simple,
464 DispatcherType::NoShuffle => PbDispatcherType::NoShuffle,
465 }
466 }
467}