risingwave_frontend/catalog/
source_catalog.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::Context as _;
16use itertools::Itertools as _;
17use risingwave_common::catalog::{ColumnCatalog, SourceVersionId};
18use risingwave_common::util::epoch::Epoch;
19use risingwave_connector::{WithOptionsSecResolved, WithPropertiesExt};
20use risingwave_pb::catalog::source::OptionalAssociatedTableId;
21use risingwave_pb::catalog::{PbSource, StreamSourceInfo, WatermarkDesc};
22use risingwave_sqlparser::ast;
23use risingwave_sqlparser::parser::Parser;
24use thiserror_ext::AsReport as _;
25
26use super::purify::try_purify_table_source_create_sql_ast;
27use super::{ColumnId, ConnectionId, DatabaseId, OwnedByUserCatalog, SchemaId, SourceId};
28use crate::catalog::TableId;
29use crate::error::Result;
30use crate::session::current::notice_to_user;
31use crate::user::UserId;
32
33/// This struct `SourceCatalog` is used in frontend.
34/// Compared with `PbSource`, it only maintains information used during optimization.
35#[derive(Clone, Debug, PartialEq, Eq, Hash)]
36pub struct SourceCatalog {
37    pub id: SourceId,
38    pub name: String,
39    pub schema_id: SchemaId,
40    pub database_id: DatabaseId,
41    pub columns: Vec<ColumnCatalog>,
42    pub pk_col_ids: Vec<ColumnId>,
43    pub append_only: bool,
44    pub owner: UserId,
45    pub info: StreamSourceInfo,
46    pub row_id_index: Option<usize>,
47    pub with_properties: WithOptionsSecResolved,
48    pub watermark_descs: Vec<WatermarkDesc>,
49    pub associated_table_id: Option<TableId>,
50    pub definition: String,
51    pub connection_id: Option<ConnectionId>,
52    pub created_at_epoch: Option<Epoch>,
53    pub initialized_at_epoch: Option<Epoch>,
54    pub version: SourceVersionId,
55    pub created_at_cluster_version: Option<String>,
56    pub initialized_at_cluster_version: Option<String>,
57    pub rate_limit: Option<u32>,
58}
59
60impl SourceCatalog {
61    /// Returns the SQL definition when the source was created.
62    pub fn create_sql(&self) -> String {
63        self.definition.clone()
64    }
65
66    /// Returns the parsed SQL definition when the source was created.
67    ///
68    /// Returns error if it's invalid.
69    pub fn create_sql_ast(&self) -> Result<ast::Statement> {
70        Ok(Parser::parse_sql(&self.definition)
71            .context("unable to parse definition sql")?
72            .into_iter()
73            .exactly_one()
74            .context("expecting exactly one statement in definition")?)
75    }
76
77    pub fn to_prost(&self) -> PbSource {
78        let (with_properties, secret_refs) = self.with_properties.clone().into_parts();
79        PbSource {
80            id: self.id,
81            schema_id: self.schema_id,
82            database_id: self.database_id,
83            name: self.name.clone(),
84            row_id_index: self.row_id_index.map(|idx| idx as _),
85            columns: self.columns.iter().map(|c| c.to_protobuf()).collect(),
86            pk_column_ids: self.pk_col_ids.iter().map(Into::into).collect(),
87            with_properties,
88            owner: self.owner,
89            info: Some(self.info.clone()),
90            watermark_descs: self.watermark_descs.clone(),
91            definition: self.definition.clone(),
92            connection_id: self.connection_id,
93            initialized_at_epoch: self.initialized_at_epoch.map(|x| x.0),
94            created_at_epoch: self.created_at_epoch.map(|x| x.0),
95            optional_associated_table_id: self
96                .associated_table_id
97                .map(|id| OptionalAssociatedTableId::AssociatedTableId(id.table_id)),
98            version: self.version,
99            created_at_cluster_version: self.created_at_cluster_version.clone(),
100            initialized_at_cluster_version: self.initialized_at_cluster_version.clone(),
101            secret_refs,
102            rate_limit: self.rate_limit,
103        }
104    }
105
106    /// Get a reference to the source catalog's version.
107    pub fn version(&self) -> SourceVersionId {
108        self.version
109    }
110
111    pub fn connector_name(&self) -> String {
112        self.with_properties
113            .get_connector()
114            .expect("connector name is missing")
115    }
116}
117
118impl SourceCatalog {
119    /// Returns the SQL definition when the source was created, purified with best effort.
120    pub fn create_sql_purified(&self) -> String {
121        self.create_sql_ast_purified()
122            .map(|stmt| stmt.to_string())
123            .unwrap_or_else(|_| self.create_sql())
124    }
125
126    /// Returns the parsed SQL definition when the source was created, purified with best effort.
127    ///
128    /// Returns error if it's invalid.
129    pub fn create_sql_ast_purified(&self) -> Result<ast::Statement> {
130        match try_purify_table_source_create_sql_ast(
131            self.create_sql_ast()?,
132            &self.columns,
133            self.row_id_index,
134            &self.pk_col_ids,
135        ) {
136            Ok(stmt) => return Ok(stmt),
137            Err(e) => notice_to_user(format!(
138                "error occurred while purifying definition for source \"{}\", \
139                     results may be inaccurate: {}",
140                self.name,
141                e.as_report()
142            )),
143        }
144
145        self.create_sql_ast()
146    }
147
148    /// Fills the `definition` field with the purified SQL definition.
149    ///
150    /// There's no need to call this method for correctness because we automatically purify the
151    /// SQL definition at the time of querying. However, this helps to maintain more accurate
152    /// `definition` field in the catalog when directly inspected for debugging purposes.
153    pub fn fill_purified_create_sql(&mut self) {
154        self.definition = self.create_sql_purified();
155    }
156}
157
158impl From<&PbSource> for SourceCatalog {
159    fn from(prost: &PbSource) -> Self {
160        let id = prost.id;
161        let name = prost.name.clone();
162        let database_id = prost.database_id;
163        let schema_id = prost.schema_id;
164        let prost_columns = prost.columns.clone();
165        let pk_col_ids = prost
166            .pk_column_ids
167            .clone()
168            .into_iter()
169            .map(Into::into)
170            .collect();
171        let connector_props_with_secrets =
172            WithOptionsSecResolved::new(prost.with_properties.clone(), prost.secret_refs.clone());
173        let columns = prost_columns.into_iter().map(ColumnCatalog::from).collect();
174        let row_id_index = prost.row_id_index.map(|idx| idx as _);
175
176        let append_only = row_id_index.is_some();
177        let owner = prost.owner;
178        let watermark_descs = prost.get_watermark_descs().clone();
179
180        let associated_table_id = prost.optional_associated_table_id.map(|id| match id {
181            OptionalAssociatedTableId::AssociatedTableId(id) => id,
182        });
183        let version = prost.version;
184
185        let connection_id = prost.connection_id;
186        let rate_limit = prost.rate_limit;
187
188        Self {
189            id,
190            name,
191            schema_id,
192            database_id,
193            columns,
194            pk_col_ids,
195            append_only,
196            owner,
197            info: prost.info.clone().unwrap(),
198            row_id_index,
199            with_properties: connector_props_with_secrets,
200            watermark_descs,
201            associated_table_id: associated_table_id.map(|x| x.into()),
202            definition: prost.definition.clone(),
203            connection_id,
204            created_at_epoch: prost.created_at_epoch.map(Epoch::from),
205            initialized_at_epoch: prost.initialized_at_epoch.map(Epoch::from),
206            version,
207            created_at_cluster_version: prost.created_at_cluster_version.clone(),
208            initialized_at_cluster_version: prost.initialized_at_cluster_version.clone(),
209            rate_limit,
210        }
211    }
212}
213
214impl OwnedByUserCatalog for SourceCatalog {
215    fn owner(&self) -> UserId {
216        self.owner
217    }
218}