risingwave_meta_model/
source.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_pb::catalog::PbSource;
16use sea_orm::ActiveValue::Set;
17use sea_orm::entity::prelude::*;
18use serde::{Deserialize, Serialize};
19
20use crate::{
21    ColumnCatalogArray, ConnectionId, I32Array, Property, SecretRef, SourceId, SourceRefreshMode,
22    StreamSourceInfo, TableId, WatermarkDescArray,
23};
24
25#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
26#[sea_orm(table_name = "source")]
27pub struct Model {
28    #[sea_orm(primary_key, auto_increment = false)]
29    pub source_id: SourceId,
30    pub name: String,
31    pub row_id_index: Option<i32>,
32    pub columns: ColumnCatalogArray,
33    pub pk_column_ids: I32Array,
34    pub with_properties: Property,
35    pub definition: String,
36    pub source_info: Option<StreamSourceInfo>,
37    pub watermark_descs: WatermarkDescArray,
38    pub optional_associated_table_id: Option<TableId>,
39    pub connection_id: Option<ConnectionId>,
40    pub version: i64,
41    // `secret_ref` stores the mapping info mapping from property name to secret id and type.
42    pub secret_ref: Option<SecretRef>,
43    pub rate_limit: Option<i32>,
44    pub refresh_mode: Option<SourceRefreshMode>,
45}
46
47#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
48pub enum Relation {
49    #[sea_orm(
50        belongs_to = "super::connection::Entity",
51        from = "Column::ConnectionId",
52        to = "super::connection::Column::ConnectionId",
53        on_update = "NoAction",
54        on_delete = "NoAction"
55    )]
56    Connection,
57    #[sea_orm(
58        belongs_to = "super::object::Entity",
59        from = "Column::SourceId",
60        to = "super::object::Column::Oid",
61        on_update = "NoAction",
62        on_delete = "Cascade"
63    )]
64    Object,
65    #[sea_orm(has_many = "super::table::Entity")]
66    Table,
67}
68
69impl Related<super::connection::Entity> for Entity {
70    fn to() -> RelationDef {
71        Relation::Connection.def()
72    }
73}
74
75impl Related<super::object::Entity> for Entity {
76    fn to() -> RelationDef {
77        Relation::Object.def()
78    }
79}
80
81impl Related<super::table::Entity> for Entity {
82    fn to() -> RelationDef {
83        Relation::Table.def()
84    }
85}
86
87impl ActiveModelBehavior for ActiveModel {}
88
89impl From<PbSource> for ActiveModel {
90    fn from(source: PbSource) -> Self {
91        let optional_associated_table_id = source.optional_associated_table_id.map(Into::into);
92        Self {
93            source_id: Set(source.id),
94            name: Set(source.name),
95            row_id_index: Set(source.row_id_index.map(|x| x as _)),
96            columns: Set(ColumnCatalogArray::from(source.columns)),
97            pk_column_ids: Set(I32Array(source.pk_column_ids)),
98            with_properties: Set(Property(source.with_properties)),
99            definition: Set(source.definition),
100            source_info: Set(source.info.as_ref().map(StreamSourceInfo::from)),
101            watermark_descs: Set(WatermarkDescArray::from(source.watermark_descs)),
102            optional_associated_table_id: Set(optional_associated_table_id),
103            connection_id: Set(source.connection_id),
104            version: Set(source.version as _),
105            secret_ref: Set(Some(SecretRef::from(source.secret_refs))),
106            rate_limit: Set(source.rate_limit.map(|id| id as _)),
107            refresh_mode: Set(source.refresh_mode.as_ref().map(SourceRefreshMode::from)),
108        }
109    }
110}
111
112impl Model {
113    pub fn is_shared(&self) -> bool {
114        self.source_info
115            .as_ref()
116            .is_some_and(|s| s.to_protobuf().is_shared())
117    }
118}