1use std::collections::BTreeMap;
16
17use risingwave_pb::catalog::{PbCreateType, PbStreamJobStatus};
18use risingwave_pb::meta::table_fragments::PbState as PbStreamJobState;
19use risingwave_pb::secret::PbSecretRef;
20use risingwave_pb::stream_plan::PbStreamNode;
21use sea_orm::entity::prelude::*;
22use sea_orm::{DeriveActiveEnum, EnumIter, FromJsonQueryResult};
23use serde::{Deserialize, Serialize};
24
25pub mod prelude;
26
27pub mod actor;
28pub mod actor_dispatcher;
29pub mod catalog_version;
30pub mod cluster;
31pub mod compaction_config;
32pub mod compaction_status;
33pub mod compaction_task;
34pub mod connection;
35pub mod database;
36pub mod exactly_once_iceberg_sink;
37pub mod fragment;
38pub mod fragment_relation;
39pub mod function;
40pub mod hummock_epoch_to_version;
41pub mod hummock_gc_history;
42pub mod hummock_pinned_snapshot;
43pub mod hummock_pinned_version;
44pub mod hummock_sequence;
45pub mod hummock_sstable_info;
46pub mod hummock_time_travel_delta;
47pub mod hummock_time_travel_version;
48pub mod hummock_version_delta;
49pub mod hummock_version_stats;
50pub mod index;
51pub mod object;
52pub mod object_dependency;
53pub mod schema;
54pub mod secret;
55pub mod serde_seaql_migration;
56pub mod session_parameter;
57pub mod sink;
58pub mod source;
59pub mod streaming_job;
60pub mod subscription;
61pub mod system_parameter;
62pub mod table;
63pub mod user;
64pub mod user_privilege;
65pub mod view;
66pub mod worker;
67pub mod worker_property;
68
69pub type WorkerId = i32;
70
71pub type TransactionId = i32;
72
73pub type ObjectId = i32;
74pub type DatabaseId = ObjectId;
75pub type SchemaId = ObjectId;
76pub type TableId = ObjectId;
77pub type SourceId = ObjectId;
78pub type SinkId = ObjectId;
79pub type SubscriptionId = ObjectId;
80pub type IndexId = ObjectId;
81pub type ViewId = ObjectId;
82pub type FunctionId = ObjectId;
83pub type ConnectionId = ObjectId;
84pub type SecretId = ObjectId;
85pub type UserId = i32;
86pub type PrivilegeId = i32;
87
88pub type HummockVersionId = i64;
89pub type Epoch = i64;
90pub type CompactionGroupId = i64;
91pub type CompactionTaskId = i64;
92pub type HummockSstableObjectId = i64;
93
94pub type FragmentId = i32;
95pub type ActorId = i32;
96
97#[derive(Clone, Copy, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
98#[sea_orm(rs_type = "String", db_type = "string(None)")]
99pub enum JobStatus {
100 #[sea_orm(string_value = "INITIAL")]
101 Initial,
102 #[sea_orm(string_value = "CREATING")]
103 Creating,
104 #[sea_orm(string_value = "CREATED")]
105 Created,
106}
107
108impl From<JobStatus> for PbStreamJobStatus {
109 fn from(job_status: JobStatus) -> Self {
110 match job_status {
111 JobStatus::Initial => Self::Unspecified,
112 JobStatus::Creating => Self::Creating,
113 JobStatus::Created => Self::Created,
114 }
115 }
116}
117
118impl From<JobStatus> for PbStreamJobState {
120 fn from(status: JobStatus) -> Self {
121 match status {
122 JobStatus::Initial => PbStreamJobState::Initial,
123 JobStatus::Creating => PbStreamJobState::Creating,
124 JobStatus::Created => PbStreamJobState::Created,
125 }
126 }
127}
128
129#[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
130#[sea_orm(rs_type = "String", db_type = "string(None)")]
131pub enum CreateType {
132 #[sea_orm(string_value = "BACKGROUND")]
133 Background,
134 #[sea_orm(string_value = "FOREGROUND")]
135 Foreground,
136}
137
138impl From<CreateType> for PbCreateType {
139 fn from(create_type: CreateType) -> Self {
140 match create_type {
141 CreateType::Background => Self::Background,
142 CreateType::Foreground => Self::Foreground,
143 }
144 }
145}
146
147impl From<PbCreateType> for CreateType {
148 fn from(create_type: PbCreateType) -> Self {
149 match create_type {
150 PbCreateType::Background => Self::Background,
151 PbCreateType::Foreground => Self::Foreground,
152 PbCreateType::Unspecified => unreachable!("Unspecified create type"),
153 }
154 }
155}
156
157macro_rules! derive_from_json_struct {
159 ($struct_name:ident, $field_type:ty) => {
160 #[derive(Clone, Debug, PartialEq, FromJsonQueryResult, Serialize, Deserialize, Default)]
161 pub struct $struct_name(pub $field_type);
162 impl Eq for $struct_name {}
163 impl From<$field_type> for $struct_name {
164 fn from(value: $field_type) -> Self {
165 Self(value)
166 }
167 }
168
169 impl $struct_name {
170 pub fn into_inner(self) -> $field_type {
171 self.0
172 }
173
174 pub fn inner_ref(&self) -> &$field_type {
175 &self.0
176 }
177 }
178 };
179}
180
181macro_rules! derive_from_blob {
183 ($struct_name:ident, $field_type:ty) => {
184 #[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, DeriveValueType)]
185 pub struct $struct_name(#[sea_orm] Vec<u8>);
186
187 impl $struct_name {
188 pub fn to_protobuf(&self) -> $field_type {
189 prost::Message::decode(self.0.as_slice()).unwrap()
190 }
191
192 fn from_protobuf(val: &$field_type) -> Self {
193 Self(prost::Message::encode_to_vec(val))
194 }
195 }
196
197 impl sea_orm::sea_query::Nullable for $struct_name {
198 fn null() -> Value {
199 Value::Bytes(None)
200 }
201 }
202
203 impl From<&$field_type> for $struct_name {
204 fn from(value: &$field_type) -> Self {
205 Self::from_protobuf(value)
206 }
207 }
208
209 impl std::fmt::Debug for $struct_name {
210 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
211 self.to_protobuf().fmt(f)
212 }
213 }
214
215 impl Default for $struct_name {
216 fn default() -> Self {
217 Self::from_protobuf(&<$field_type>::default())
218 }
219 }
220 };
221}
222
223macro_rules! derive_array_from_blob {
225 ($struct_name:ident, $field_type:ty, $field_array_name:ident) => {
226 #[derive(Clone, PartialEq, Eq, DeriveValueType, serde::Deserialize, serde::Serialize)]
227 pub struct $struct_name(#[sea_orm] Vec<u8>);
228
229 #[derive(Clone, PartialEq, ::prost::Message)]
230 pub struct $field_array_name {
231 #[prost(message, repeated, tag = "1")]
232 inner: Vec<$field_type>,
233 }
234 impl Eq for $field_array_name {}
235
236 impl $struct_name {
237 pub fn to_protobuf(&self) -> Vec<$field_type> {
238 let data: $field_array_name = prost::Message::decode(self.0.as_slice()).unwrap();
239 data.inner
240 }
241
242 fn from_protobuf(val: Vec<$field_type>) -> Self {
243 Self(prost::Message::encode_to_vec(&$field_array_name {
244 inner: val,
245 }))
246 }
247 }
248
249 impl From<Vec<$field_type>> for $struct_name {
250 fn from(value: Vec<$field_type>) -> Self {
251 Self::from_protobuf(value)
252 }
253 }
254
255 impl std::fmt::Debug for $struct_name {
256 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
257 self.to_protobuf().fmt(f)
258 }
259 }
260
261 impl Default for $struct_name {
262 fn default() -> Self {
263 Self(vec![])
264 }
265 }
266
267 impl sea_orm::sea_query::Nullable for $struct_name {
268 fn null() -> Value {
269 Value::Bytes(None)
270 }
271 }
272 };
273}
274
275macro_rules! derive_btreemap_from_blob {
276 ($struct_name:ident, $key_type:ty, $value_type:ty, $field_type:ident) => {
277 #[derive(Clone, PartialEq, Eq, DeriveValueType, serde::Deserialize, serde::Serialize)]
278 pub struct $struct_name(#[sea_orm] Vec<u8>);
279
280 #[derive(Clone, PartialEq, ::prost::Message)]
281 pub struct $field_type {
282 #[prost(btree_map = "string, message")]
283 inner: BTreeMap<$key_type, $value_type>,
284 }
285 impl Eq for $field_type {}
286
287 impl $struct_name {
288 pub fn to_protobuf(&self) -> BTreeMap<$key_type, $value_type> {
289 let data: $field_type = prost::Message::decode(self.0.as_slice()).unwrap();
290 data.inner
291 }
292
293 fn from_protobuf(val: BTreeMap<$key_type, $value_type>) -> Self {
294 Self(prost::Message::encode_to_vec(&$field_type { inner: val }))
295 }
296 }
297
298 impl From<BTreeMap<$key_type, $value_type>> for $struct_name {
299 fn from(value: BTreeMap<$key_type, $value_type>) -> Self {
300 Self::from_protobuf(value)
301 }
302 }
303
304 impl std::fmt::Debug for $struct_name {
305 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
306 self.to_protobuf().fmt(f)
307 }
308 }
309
310 impl Default for $struct_name {
311 fn default() -> Self {
312 Self(vec![])
313 }
314 }
315
316 impl sea_orm::sea_query::Nullable for $struct_name {
317 fn null() -> Value {
318 Value::Bytes(None)
319 }
320 }
321 };
322}
323
324pub(crate) use {derive_array_from_blob, derive_from_blob};
325
326derive_from_json_struct!(I32Array, Vec<i32>);
327
328impl From<Vec<u32>> for I32Array {
329 fn from(value: Vec<u32>) -> Self {
330 Self(value.into_iter().map(|id| id as _).collect())
331 }
332}
333
334impl I32Array {
335 pub fn into_u32_array(self) -> Vec<u32> {
336 self.0.into_iter().map(|id| id as _).collect()
337 }
338}
339
340derive_from_json_struct!(ActorUpstreamActors, BTreeMap<FragmentId, Vec<ActorId>>);
341
342impl From<BTreeMap<u32, Vec<u32>>> for ActorUpstreamActors {
343 fn from(val: BTreeMap<u32, Vec<u32>>) -> Self {
344 let mut map = BTreeMap::new();
345 for (k, v) in val {
346 map.insert(k as _, v.into_iter().map(|a| a as _).collect());
347 }
348 Self(map)
349 }
350}
351
352derive_btreemap_from_blob!(SecretRef, String, PbSecretRef, PbSecretRefMap);
353
354derive_from_blob!(StreamNode, PbStreamNode);
355derive_from_blob!(DataType, risingwave_pb::data::PbDataType);
356derive_array_from_blob!(
357 DataTypeArray,
358 risingwave_pb::data::PbDataType,
359 PbDataTypeArray
360);
361derive_array_from_blob!(
362 FieldArray,
363 risingwave_pb::plan_common::PbField,
364 PbFieldArray
365);
366derive_from_json_struct!(Property, BTreeMap<String, String>);
367derive_from_blob!(ColumnCatalog, risingwave_pb::plan_common::PbColumnCatalog);
368derive_array_from_blob!(
369 ColumnCatalogArray,
370 risingwave_pb::plan_common::PbColumnCatalog,
371 PbColumnCatalogArray
372);
373derive_from_blob!(StreamSourceInfo, risingwave_pb::catalog::PbStreamSourceInfo);
374derive_from_blob!(
375 WebhookSourceInfo,
376 risingwave_pb::catalog::PbWebhookSourceInfo
377);
378derive_from_blob!(WatermarkDesc, risingwave_pb::catalog::PbWatermarkDesc);
379derive_array_from_blob!(
380 WatermarkDescArray,
381 risingwave_pb::catalog::PbWatermarkDesc,
382 PbWatermarkDescArray
383);
384derive_array_from_blob!(
385 ExprNodeArray,
386 risingwave_pb::expr::PbExprNode,
387 PbExprNodeArray
388);
389derive_array_from_blob!(
390 ColumnOrderArray,
391 risingwave_pb::common::PbColumnOrder,
392 PbColumnOrderArray
393);
394derive_array_from_blob!(
395 IndexColumnPropertiesArray,
396 risingwave_pb::catalog::PbIndexColumnProperties,
397 PbIndexColumnPropertiesArray
398);
399derive_from_blob!(SinkFormatDesc, risingwave_pb::catalog::PbSinkFormatDesc);
400derive_from_blob!(Cardinality, risingwave_pb::plan_common::PbCardinality);
401derive_from_blob!(TableVersion, risingwave_pb::catalog::table::PbTableVersion);
402derive_from_blob!(
403 PrivateLinkService,
404 risingwave_pb::catalog::connection::PbPrivateLinkService
405);
406derive_from_blob!(ConnectionParams, risingwave_pb::catalog::ConnectionParams);
407derive_from_blob!(AuthInfo, risingwave_pb::user::PbAuthInfo);
408
409derive_from_blob!(ConnectorSplits, risingwave_pb::source::ConnectorSplits);
410derive_from_blob!(VnodeBitmap, risingwave_pb::common::Buffer);
411derive_from_blob!(ActorMapping, risingwave_pb::stream_plan::PbActorMapping);
412derive_from_blob!(ExprContext, risingwave_pb::plan_common::PbExprContext);
413
414derive_array_from_blob!(
415 HummockVersionDeltaArray,
416 risingwave_pb::hummock::PbHummockVersionDelta,
417 PbHummockVersionDeltaArray
418);
419
420#[derive(Clone, Debug, PartialEq, FromJsonQueryResult, Serialize, Deserialize)]
421pub enum StreamingParallelism {
422 Adaptive,
423 Fixed(usize),
424 Custom,
425}
426
427impl Eq for StreamingParallelism {}