risingwave_frontend/catalog/
source_catalog.rs1use anyhow::Context as _;
16use itertools::Itertools as _;
17use risingwave_common::catalog::{ColumnCatalog, SourceVersionId};
18use risingwave_common::util::epoch::Epoch;
19use risingwave_connector::{WithOptionsSecResolved, WithPropertiesExt};
20use risingwave_pb::catalog::source::OptionalAssociatedTableId;
21use risingwave_pb::catalog::{PbSource, StreamSourceInfo, WatermarkDesc};
22use risingwave_sqlparser::ast;
23use risingwave_sqlparser::parser::Parser;
24use thiserror_ext::AsReport as _;
25
26use super::purify::try_purify_table_source_create_sql_ast;
27use super::{ColumnId, ConnectionId, DatabaseId, OwnedByUserCatalog, SchemaId, SourceId};
28use crate::catalog::TableId;
29use crate::error::Result;
30use crate::session::current::notice_to_user;
31use crate::user::UserId;
32
33#[derive(Clone, Debug, PartialEq, Eq, Hash)]
36pub struct SourceCatalog {
37 pub id: SourceId,
38 pub name: String,
39 pub schema_id: SchemaId,
40 pub database_id: DatabaseId,
41 pub columns: Vec<ColumnCatalog>,
42 pub pk_col_ids: Vec<ColumnId>,
43 pub append_only: bool,
44 pub owner: UserId,
45 pub info: StreamSourceInfo,
46 pub row_id_index: Option<usize>,
47 pub with_properties: WithOptionsSecResolved,
48 pub watermark_descs: Vec<WatermarkDesc>,
49 pub associated_table_id: Option<TableId>,
50 pub definition: String,
51 pub connection_id: Option<ConnectionId>,
52 pub created_at_epoch: Option<Epoch>,
53 pub initialized_at_epoch: Option<Epoch>,
54 pub version: SourceVersionId,
55 pub created_at_cluster_version: Option<String>,
56 pub initialized_at_cluster_version: Option<String>,
57 pub rate_limit: Option<u32>,
58}
59
60impl SourceCatalog {
61 pub fn create_sql(&self) -> String {
63 self.definition.clone()
64 }
65
66 pub fn create_sql_ast(&self) -> Result<ast::Statement> {
70 Ok(Parser::parse_sql(&self.definition)
71 .context("unable to parse definition sql")?
72 .into_iter()
73 .exactly_one()
74 .context("expecting exactly one statement in definition")?)
75 }
76
77 pub fn to_prost(&self) -> PbSource {
78 let (with_properties, secret_refs) = self.with_properties.clone().into_parts();
79 PbSource {
80 id: self.id,
81 schema_id: self.schema_id,
82 database_id: self.database_id,
83 name: self.name.clone(),
84 row_id_index: self.row_id_index.map(|idx| idx as _),
85 columns: self.columns.iter().map(|c| c.to_protobuf()).collect(),
86 pk_column_ids: self.pk_col_ids.iter().map(Into::into).collect(),
87 with_properties,
88 owner: self.owner,
89 info: Some(self.info.clone()),
90 watermark_descs: self.watermark_descs.clone(),
91 definition: self.definition.clone(),
92 connection_id: self.connection_id,
93 initialized_at_epoch: self.initialized_at_epoch.map(|x| x.0),
94 created_at_epoch: self.created_at_epoch.map(|x| x.0),
95 optional_associated_table_id: self
96 .associated_table_id
97 .map(|id| OptionalAssociatedTableId::AssociatedTableId(id.table_id)),
98 version: self.version,
99 created_at_cluster_version: self.created_at_cluster_version.clone(),
100 initialized_at_cluster_version: self.initialized_at_cluster_version.clone(),
101 secret_refs,
102 rate_limit: self.rate_limit,
103 }
104 }
105
106 pub fn version(&self) -> SourceVersionId {
108 self.version
109 }
110
111 pub fn connector_name(&self) -> String {
112 self.with_properties
113 .get_connector()
114 .expect("connector name is missing")
115 }
116}
117
118impl SourceCatalog {
119 pub fn create_sql_purified(&self) -> String {
121 self.create_sql_ast_purified()
122 .map(|stmt| stmt.to_string())
123 .unwrap_or_else(|_| self.create_sql())
124 }
125
126 pub fn create_sql_ast_purified(&self) -> Result<ast::Statement> {
130 match try_purify_table_source_create_sql_ast(
131 self.create_sql_ast()?,
132 &self.columns,
133 self.row_id_index,
134 &self.pk_col_ids,
135 ) {
136 Ok(stmt) => return Ok(stmt),
137 Err(e) => notice_to_user(format!(
138 "error occurred while purifying definition for source \"{}\", \
139 results may be inaccurate: {}",
140 self.name,
141 e.as_report()
142 )),
143 }
144
145 self.create_sql_ast()
146 }
147
148 pub fn fill_purified_create_sql(&mut self) {
154 self.definition = self.create_sql_purified();
155 }
156}
157
158impl From<&PbSource> for SourceCatalog {
159 fn from(prost: &PbSource) -> Self {
160 let id = prost.id;
161 let name = prost.name.clone();
162 let database_id = prost.database_id;
163 let schema_id = prost.schema_id;
164 let prost_columns = prost.columns.clone();
165 let pk_col_ids = prost
166 .pk_column_ids
167 .clone()
168 .into_iter()
169 .map(Into::into)
170 .collect();
171 let connector_props_with_secrets =
172 WithOptionsSecResolved::new(prost.with_properties.clone(), prost.secret_refs.clone());
173 let columns = prost_columns.into_iter().map(ColumnCatalog::from).collect();
174 let row_id_index = prost.row_id_index.map(|idx| idx as _);
175
176 let append_only = row_id_index.is_some();
177 let owner = prost.owner;
178 let watermark_descs = prost.get_watermark_descs().clone();
179
180 let associated_table_id = prost.optional_associated_table_id.map(|id| match id {
181 OptionalAssociatedTableId::AssociatedTableId(id) => id,
182 });
183 let version = prost.version;
184
185 let connection_id = prost.connection_id;
186 let rate_limit = prost.rate_limit;
187
188 Self {
189 id,
190 name,
191 schema_id,
192 database_id,
193 columns,
194 pk_col_ids,
195 append_only,
196 owner,
197 info: prost.info.clone().unwrap(),
198 row_id_index,
199 with_properties: connector_props_with_secrets,
200 watermark_descs,
201 associated_table_id: associated_table_id.map(|x| x.into()),
202 definition: prost.definition.clone(),
203 connection_id,
204 created_at_epoch: prost.created_at_epoch.map(Epoch::from),
205 initialized_at_epoch: prost.initialized_at_epoch.map(Epoch::from),
206 version,
207 created_at_cluster_version: prost.created_at_cluster_version.clone(),
208 initialized_at_cluster_version: prost.initialized_at_cluster_version.clone(),
209 rate_limit,
210 }
211 }
212}
213
214impl OwnedByUserCatalog for SourceCatalog {
215 fn owner(&self) -> UserId {
216 self.owner
217 }
218}