risingwave_frontend/catalog/
source_catalog.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::catalog::{ColumnCatalog, ICEBERG_SOURCE_PREFIX, SourceVersionId};
16use risingwave_common::util::epoch::Epoch;
17use risingwave_connector::{WithOptionsSecResolved, WithPropertiesExt};
18use risingwave_pb::catalog::{PbSource, StreamSourceInfo, WatermarkDesc};
19use risingwave_pb::plan_common::SourceRefreshMode;
20use risingwave_sqlparser::ast;
21use risingwave_sqlparser::parser::Parser;
22use thiserror_ext::AsReport as _;
23
24use super::purify::try_purify_table_source_create_sql_ast;
25use super::{ColumnId, ConnectionId, DatabaseId, OwnedByUserCatalog, SchemaId, SourceId};
26use crate::catalog::TableId;
27use crate::error::Result;
28use crate::session::current::notice_to_user;
29use crate::user::UserId;
30
31/// This struct `SourceCatalog` is used in frontend.
32/// Compared with `PbSource`, it only maintains information used during optimization.
33#[derive(Clone, Debug, PartialEq, Eq, Hash)]
34pub struct SourceCatalog {
35    pub id: SourceId,
36    pub name: String,
37    pub schema_id: SchemaId,
38    pub database_id: DatabaseId,
39    pub columns: Vec<ColumnCatalog>,
40    pub pk_col_ids: Vec<ColumnId>,
41    pub append_only: bool,
42    pub owner: UserId,
43    pub info: StreamSourceInfo,
44    pub row_id_index: Option<usize>,
45    pub with_properties: WithOptionsSecResolved,
46    pub watermark_descs: Vec<WatermarkDesc>,
47    pub associated_table_id: Option<TableId>,
48    pub definition: String,
49    pub connection_id: Option<ConnectionId>,
50    pub created_at_epoch: Option<Epoch>,
51    pub initialized_at_epoch: Option<Epoch>,
52    pub version: SourceVersionId,
53    pub created_at_cluster_version: Option<String>,
54    pub initialized_at_cluster_version: Option<String>,
55    pub rate_limit: Option<u32>,
56    pub refresh_mode: Option<SourceRefreshMode>,
57}
58
59impl SourceCatalog {
60    /// Returns the SQL definition when the source was created.
61    pub fn create_sql(&self) -> String {
62        self.definition.clone()
63    }
64
65    /// Returns the parsed SQL definition when the source was created.
66    ///
67    /// Returns error if it's invalid.
68    pub fn create_sql_ast(&self) -> Result<ast::Statement> {
69        Ok(Parser::parse_exactly_one(&self.definition)?)
70    }
71
72    pub fn to_prost(&self) -> PbSource {
73        let (with_properties, secret_refs) = self.with_properties.clone().into_parts();
74        PbSource {
75            id: self.id,
76            schema_id: self.schema_id,
77            database_id: self.database_id,
78            name: self.name.clone(),
79            row_id_index: self.row_id_index.map(|idx| idx as _),
80            columns: self.columns.iter().map(|c| c.to_protobuf()).collect(),
81            pk_column_ids: self.pk_col_ids.iter().map(Into::into).collect(),
82            with_properties,
83            owner: self.owner,
84            info: Some(self.info.clone()),
85            watermark_descs: self.watermark_descs.clone(),
86            definition: self.definition.clone(),
87            connection_id: self.connection_id,
88            initialized_at_epoch: self.initialized_at_epoch.map(|x| x.0),
89            created_at_epoch: self.created_at_epoch.map(|x| x.0),
90            optional_associated_table_id: self.associated_table_id.map(Into::into),
91            version: self.version,
92            created_at_cluster_version: self.created_at_cluster_version.clone(),
93            initialized_at_cluster_version: self.initialized_at_cluster_version.clone(),
94            secret_refs,
95            rate_limit: self.rate_limit,
96            refresh_mode: self.refresh_mode,
97        }
98    }
99
100    /// Get a reference to the source catalog's version.
101    pub fn version(&self) -> SourceVersionId {
102        self.version
103    }
104
105    pub fn connector_name(&self) -> String {
106        self.with_properties
107            .get_connector()
108            .expect("connector name is missing")
109    }
110
111    pub fn is_iceberg_connector(&self) -> bool {
112        self.with_properties.is_iceberg_connector()
113    }
114
115    /// If this source is an iceberg source, returns the corresponding iceberg table name.
116    pub fn iceberg_table_name(&self) -> Option<String> {
117        if self.name.starts_with(ICEBERG_SOURCE_PREFIX) {
118            Some(self.name[ICEBERG_SOURCE_PREFIX.len()..].to_string())
119        } else {
120            None
121        }
122    }
123
124    /// Returns the SQL definition when the source was created, purified with best effort.
125    pub fn create_sql_purified(&self) -> String {
126        self.create_sql_ast_purified()
127            .and_then(|stmt| stmt.try_to_string().map_err(Into::into))
128            .unwrap_or_else(|_| self.create_sql())
129    }
130
131    /// Returns the parsed SQL definition when the source was created, purified with best effort.
132    ///
133    /// Returns error if it's invalid.
134    pub fn create_sql_ast_purified(&self) -> Result<ast::Statement> {
135        if self.with_properties.is_cdc_connector() {
136            // For CDC sources, we should not purify the SQL definition to add column definitions
137            // or constraints.
138            return self.create_sql_ast();
139        }
140
141        match try_purify_table_source_create_sql_ast(
142            self.create_sql_ast()?,
143            &self.columns,
144            self.row_id_index,
145            &self.pk_col_ids,
146        ) {
147            Ok(stmt) => return Ok(stmt),
148            Err(e) => notice_to_user(format!(
149                "error occurred while purifying definition for source \"{}\", \
150                     results may be inaccurate: {}",
151                self.name,
152                e.as_report()
153            )),
154        }
155
156        self.create_sql_ast()
157    }
158
159    /// Fills the `definition` field with the purified SQL definition.
160    ///
161    /// There's no need to call this method for correctness because we automatically purify the
162    /// SQL definition at the time of querying. However, this helps to maintain more accurate
163    /// `definition` field in the catalog when directly inspected for debugging purposes.
164    pub fn fill_purified_create_sql(&mut self) {
165        self.definition = self.create_sql_purified();
166    }
167}
168
169impl From<&PbSource> for SourceCatalog {
170    fn from(prost: &PbSource) -> Self {
171        let id = prost.id;
172        let name = prost.name.clone();
173        let database_id = prost.database_id;
174        let schema_id = prost.schema_id;
175        let prost_columns = prost.columns.clone();
176        let pk_col_ids = prost
177            .pk_column_ids
178            .clone()
179            .into_iter()
180            .map(Into::into)
181            .collect();
182        let connector_props_with_secrets =
183            WithOptionsSecResolved::new(prost.with_properties.clone(), prost.secret_refs.clone());
184        let columns = prost_columns.into_iter().map(ColumnCatalog::from).collect();
185        let row_id_index = prost.row_id_index.map(|idx| idx as _);
186
187        let append_only = row_id_index.is_some();
188        let owner = prost.owner;
189        let watermark_descs = prost.get_watermark_descs().clone();
190
191        let associated_table_id = prost.optional_associated_table_id.map(Into::into);
192        let version = prost.version;
193
194        let connection_id = prost.connection_id;
195        let rate_limit = prost.rate_limit;
196
197        Self {
198            id,
199            name,
200            schema_id,
201            database_id,
202            columns,
203            pk_col_ids,
204            append_only,
205            owner,
206            info: prost.info.clone().unwrap(),
207            row_id_index,
208            with_properties: connector_props_with_secrets,
209            watermark_descs,
210            associated_table_id,
211            definition: prost.definition.clone(),
212            connection_id,
213            created_at_epoch: prost.created_at_epoch.map(Epoch::from),
214            initialized_at_epoch: prost.initialized_at_epoch.map(Epoch::from),
215            version,
216            created_at_cluster_version: prost.created_at_cluster_version.clone(),
217            initialized_at_cluster_version: prost.initialized_at_cluster_version.clone(),
218            rate_limit,
219            refresh_mode: prost.refresh_mode,
220        }
221    }
222}
223
224impl OwnedByUserCatalog for SourceCatalog {
225    fn owner(&self) -> UserId {
226        self.owner
227    }
228}