risingwave_meta_model/
source.rs1use risingwave_pb::catalog::PbSource;
16use risingwave_pb::catalog::source::OptionalAssociatedTableId;
17use sea_orm::ActiveValue::Set;
18use sea_orm::entity::prelude::*;
19use serde::{Deserialize, Serialize};
20
21use crate::{
22 ColumnCatalogArray, ConnectionId, I32Array, Property, SecretRef, SourceId, StreamSourceInfo,
23 TableId, WatermarkDescArray,
24};
25
26#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
27#[sea_orm(table_name = "source")]
28pub struct Model {
29 #[sea_orm(primary_key, auto_increment = false)]
30 pub source_id: SourceId,
31 pub name: String,
32 pub row_id_index: Option<i32>,
33 pub columns: ColumnCatalogArray,
34 pub pk_column_ids: I32Array,
35 pub with_properties: Property,
36 pub definition: String,
37 pub source_info: Option<StreamSourceInfo>,
38 pub watermark_descs: WatermarkDescArray,
39 pub optional_associated_table_id: Option<TableId>,
40 pub connection_id: Option<ConnectionId>,
41 pub version: i64,
42 pub secret_ref: Option<SecretRef>,
44 pub rate_limit: Option<i32>,
45}
46
47#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
48pub enum Relation {
49 #[sea_orm(
50 belongs_to = "super::connection::Entity",
51 from = "Column::ConnectionId",
52 to = "super::connection::Column::ConnectionId",
53 on_update = "NoAction",
54 on_delete = "NoAction"
55 )]
56 Connection,
57 #[sea_orm(
58 belongs_to = "super::object::Entity",
59 from = "Column::SourceId",
60 to = "super::object::Column::Oid",
61 on_update = "NoAction",
62 on_delete = "Cascade"
63 )]
64 Object,
65 #[sea_orm(has_many = "super::table::Entity")]
66 Table,
67}
68
69impl Related<super::connection::Entity> for Entity {
70 fn to() -> RelationDef {
71 Relation::Connection.def()
72 }
73}
74
75impl Related<super::object::Entity> for Entity {
76 fn to() -> RelationDef {
77 Relation::Object.def()
78 }
79}
80
81impl Related<super::table::Entity> for Entity {
82 fn to() -> RelationDef {
83 Relation::Table.def()
84 }
85}
86
87impl ActiveModelBehavior for ActiveModel {}
88
89impl From<PbSource> for ActiveModel {
90 fn from(source: PbSource) -> Self {
91 let optional_associated_table_id = source.optional_associated_table_id.map(|x| match x {
92 OptionalAssociatedTableId::AssociatedTableId(id) => id as TableId,
93 });
94 Self {
95 source_id: Set(source.id as _),
96 name: Set(source.name),
97 row_id_index: Set(source.row_id_index.map(|x| x as _)),
98 columns: Set(ColumnCatalogArray::from(source.columns)),
99 pk_column_ids: Set(I32Array(source.pk_column_ids)),
100 with_properties: Set(Property(source.with_properties)),
101 definition: Set(source.definition),
102 source_info: Set(source.info.as_ref().map(StreamSourceInfo::from)),
103 watermark_descs: Set(WatermarkDescArray::from(source.watermark_descs)),
104 optional_associated_table_id: Set(optional_associated_table_id),
105 connection_id: Set(source.connection_id.map(|id| id as _)),
106 version: Set(source.version as _),
107 secret_ref: Set(Some(SecretRef::from(source.secret_refs))),
108 rate_limit: Set(source.rate_limit.map(|id| id as _)),
109 }
110 }
111}
112
113impl Model {
114 pub fn is_shared(&self) -> bool {
115 self.source_info
116 .as_ref()
117 .is_some_and(|s| s.to_protobuf().is_shared())
118 }
119}