risingwave_frontend/catalog/
source_catalog.rs1use risingwave_common::catalog::{ColumnCatalog, ICEBERG_SOURCE_PREFIX, SourceVersionId};
16use risingwave_common::util::epoch::Epoch;
17use risingwave_connector::{WithOptionsSecResolved, WithPropertiesExt};
18use risingwave_pb::catalog::{PbSource, StreamSourceInfo, WatermarkDesc};
19use risingwave_pb::plan_common::SourceRefreshMode;
20use risingwave_sqlparser::ast;
21use risingwave_sqlparser::parser::Parser;
22use thiserror_ext::AsReport as _;
23
24use super::purify::try_purify_table_source_create_sql_ast;
25use super::{ColumnId, ConnectionId, DatabaseId, OwnedByUserCatalog, SchemaId, SourceId};
26use crate::catalog::TableId;
27use crate::error::Result;
28use crate::session::current::notice_to_user;
29use crate::user::UserId;
30
31#[derive(Clone, Debug, PartialEq, Eq, Hash)]
34pub struct SourceCatalog {
35 pub id: SourceId,
36 pub name: String,
37 pub schema_id: SchemaId,
38 pub database_id: DatabaseId,
39 pub columns: Vec<ColumnCatalog>,
40 pub pk_col_ids: Vec<ColumnId>,
41 pub append_only: bool,
42 pub owner: UserId,
43 pub info: StreamSourceInfo,
44 pub row_id_index: Option<usize>,
45 pub with_properties: WithOptionsSecResolved,
46 pub watermark_descs: Vec<WatermarkDesc>,
47 pub associated_table_id: Option<TableId>,
48 pub definition: String,
49 pub connection_id: Option<ConnectionId>,
50 pub created_at_epoch: Option<Epoch>,
51 pub initialized_at_epoch: Option<Epoch>,
52 pub version: SourceVersionId,
53 pub created_at_cluster_version: Option<String>,
54 pub initialized_at_cluster_version: Option<String>,
55 pub rate_limit: Option<u32>,
56 pub refresh_mode: Option<SourceRefreshMode>,
57}
58
59impl SourceCatalog {
60 pub fn create_sql(&self) -> String {
62 self.definition.clone()
63 }
64
65 pub fn create_sql_ast(&self) -> Result<ast::Statement> {
69 Ok(Parser::parse_exactly_one(&self.definition)?)
70 }
71
72 pub fn to_prost(&self) -> PbSource {
73 let (with_properties, secret_refs) = self.with_properties.clone().into_parts();
74 PbSource {
75 id: self.id,
76 schema_id: self.schema_id,
77 database_id: self.database_id,
78 name: self.name.clone(),
79 row_id_index: self.row_id_index.map(|idx| idx as _),
80 columns: self.columns.iter().map(|c| c.to_protobuf()).collect(),
81 pk_column_ids: self.pk_col_ids.iter().map(Into::into).collect(),
82 with_properties,
83 owner: self.owner,
84 info: Some(self.info.clone()),
85 watermark_descs: self.watermark_descs.clone(),
86 definition: self.definition.clone(),
87 connection_id: self.connection_id,
88 initialized_at_epoch: self.initialized_at_epoch.map(|x| x.0),
89 created_at_epoch: self.created_at_epoch.map(|x| x.0),
90 optional_associated_table_id: self.associated_table_id.map(Into::into),
91 version: self.version,
92 created_at_cluster_version: self.created_at_cluster_version.clone(),
93 initialized_at_cluster_version: self.initialized_at_cluster_version.clone(),
94 secret_refs,
95 rate_limit: self.rate_limit,
96 refresh_mode: self.refresh_mode,
97 }
98 }
99
100 pub fn version(&self) -> SourceVersionId {
102 self.version
103 }
104
105 pub fn connector_name(&self) -> String {
106 self.with_properties
107 .get_connector()
108 .expect("connector name is missing")
109 }
110
111 pub fn is_iceberg_connector(&self) -> bool {
112 self.with_properties.is_iceberg_connector()
113 }
114
115 pub fn iceberg_table_name(&self) -> Option<String> {
117 if self.name.starts_with(ICEBERG_SOURCE_PREFIX) {
118 Some(self.name[ICEBERG_SOURCE_PREFIX.len()..].to_string())
119 } else {
120 None
121 }
122 }
123
124 pub fn create_sql_purified(&self) -> String {
126 self.create_sql_ast_purified()
127 .and_then(|stmt| stmt.try_to_string().map_err(Into::into))
128 .unwrap_or_else(|_| self.create_sql())
129 }
130
131 pub fn create_sql_ast_purified(&self) -> Result<ast::Statement> {
135 if self.with_properties.is_cdc_connector() {
136 return self.create_sql_ast();
139 }
140
141 match try_purify_table_source_create_sql_ast(
142 self.create_sql_ast()?,
143 &self.columns,
144 self.row_id_index,
145 &self.pk_col_ids,
146 ) {
147 Ok(stmt) => return Ok(stmt),
148 Err(e) => notice_to_user(format!(
149 "error occurred while purifying definition for source \"{}\", \
150 results may be inaccurate: {}",
151 self.name,
152 e.as_report()
153 )),
154 }
155
156 self.create_sql_ast()
157 }
158
159 pub fn fill_purified_create_sql(&mut self) {
165 self.definition = self.create_sql_purified();
166 }
167}
168
169impl From<&PbSource> for SourceCatalog {
170 fn from(prost: &PbSource) -> Self {
171 let id = prost.id;
172 let name = prost.name.clone();
173 let database_id = prost.database_id;
174 let schema_id = prost.schema_id;
175 let prost_columns = prost.columns.clone();
176 let pk_col_ids = prost
177 .pk_column_ids
178 .clone()
179 .into_iter()
180 .map(Into::into)
181 .collect();
182 let connector_props_with_secrets =
183 WithOptionsSecResolved::new(prost.with_properties.clone(), prost.secret_refs.clone());
184 let columns = prost_columns.into_iter().map(ColumnCatalog::from).collect();
185 let row_id_index = prost.row_id_index.map(|idx| idx as _);
186
187 let append_only = row_id_index.is_some();
188 let owner = prost.owner;
189 let watermark_descs = prost.get_watermark_descs().clone();
190
191 let associated_table_id = prost.optional_associated_table_id.map(Into::into);
192 let version = prost.version;
193
194 let connection_id = prost.connection_id;
195 let rate_limit = prost.rate_limit;
196
197 Self {
198 id,
199 name,
200 schema_id,
201 database_id,
202 columns,
203 pk_col_ids,
204 append_only,
205 owner,
206 info: prost.info.clone().unwrap(),
207 row_id_index,
208 with_properties: connector_props_with_secrets,
209 watermark_descs,
210 associated_table_id,
211 definition: prost.definition.clone(),
212 connection_id,
213 created_at_epoch: prost.created_at_epoch.map(Epoch::from),
214 initialized_at_epoch: prost.initialized_at_epoch.map(Epoch::from),
215 version,
216 created_at_cluster_version: prost.created_at_cluster_version.clone(),
217 initialized_at_cluster_version: prost.initialized_at_cluster_version.clone(),
218 rate_limit,
219 refresh_mode: prost.refresh_mode,
220 }
221 }
222}
223
224impl OwnedByUserCatalog for SourceCatalog {
225 fn owner(&self) -> UserId {
226 self.owner
227 }
228}