risingwave_meta_model/
source.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_pb::catalog::PbSource;
16use risingwave_pb::catalog::source::OptionalAssociatedTableId;
17use sea_orm::ActiveValue::Set;
18use sea_orm::entity::prelude::*;
19use serde::{Deserialize, Serialize};
20
21use crate::{
22    ColumnCatalogArray, ConnectionId, I32Array, Property, SecretRef, SourceId, StreamSourceInfo,
23    TableId, WatermarkDescArray,
24};
25
26#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
27#[sea_orm(table_name = "source")]
28pub struct Model {
29    #[sea_orm(primary_key, auto_increment = false)]
30    pub source_id: SourceId,
31    pub name: String,
32    pub row_id_index: Option<i32>,
33    pub columns: ColumnCatalogArray,
34    pub pk_column_ids: I32Array,
35    pub with_properties: Property,
36    pub definition: String,
37    pub source_info: Option<StreamSourceInfo>,
38    pub watermark_descs: WatermarkDescArray,
39    pub optional_associated_table_id: Option<TableId>,
40    pub connection_id: Option<ConnectionId>,
41    pub version: i64,
42    // `secret_ref` stores the mapping info mapping from property name to secret id and type.
43    pub secret_ref: Option<SecretRef>,
44    pub rate_limit: Option<i32>,
45}
46
47#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
48pub enum Relation {
49    #[sea_orm(
50        belongs_to = "super::connection::Entity",
51        from = "Column::ConnectionId",
52        to = "super::connection::Column::ConnectionId",
53        on_update = "NoAction",
54        on_delete = "NoAction"
55    )]
56    Connection,
57    #[sea_orm(
58        belongs_to = "super::object::Entity",
59        from = "Column::SourceId",
60        to = "super::object::Column::Oid",
61        on_update = "NoAction",
62        on_delete = "Cascade"
63    )]
64    Object,
65    #[sea_orm(has_many = "super::table::Entity")]
66    Table,
67}
68
69impl Related<super::connection::Entity> for Entity {
70    fn to() -> RelationDef {
71        Relation::Connection.def()
72    }
73}
74
75impl Related<super::object::Entity> for Entity {
76    fn to() -> RelationDef {
77        Relation::Object.def()
78    }
79}
80
81impl Related<super::table::Entity> for Entity {
82    fn to() -> RelationDef {
83        Relation::Table.def()
84    }
85}
86
87impl ActiveModelBehavior for ActiveModel {}
88
89impl From<PbSource> for ActiveModel {
90    fn from(source: PbSource) -> Self {
91        let optional_associated_table_id = source.optional_associated_table_id.map(|x| match x {
92            OptionalAssociatedTableId::AssociatedTableId(id) => id as TableId,
93        });
94        Self {
95            source_id: Set(source.id as _),
96            name: Set(source.name),
97            row_id_index: Set(source.row_id_index.map(|x| x as _)),
98            columns: Set(ColumnCatalogArray::from(source.columns)),
99            pk_column_ids: Set(I32Array(source.pk_column_ids)),
100            with_properties: Set(Property(source.with_properties)),
101            definition: Set(source.definition),
102            source_info: Set(source.info.as_ref().map(StreamSourceInfo::from)),
103            watermark_descs: Set(WatermarkDescArray::from(source.watermark_descs)),
104            optional_associated_table_id: Set(optional_associated_table_id),
105            connection_id: Set(source.connection_id.map(|id| id as _)),
106            version: Set(source.version as _),
107            secret_ref: Set(Some(SecretRef::from(source.secret_refs))),
108            rate_limit: Set(source.rate_limit.map(|id| id as _)),
109        }
110    }
111}
112
113impl Model {
114    pub fn is_shared(&self) -> bool {
115        self.source_info
116            .as_ref()
117            .is_some_and(|s| s.to_protobuf().is_shared())
118    }
119}