risingwave_connector/source/reader/
desc.rs1use std::collections::HashMap;
16use std::sync::Arc;
17
18use risingwave_common::bail;
19use risingwave_common::catalog::ColumnCatalog;
20use risingwave_common::util::iter_util::ZipEqFast;
21use risingwave_pb::catalog::PbStreamSourceInfo;
22use risingwave_pb::plan_common::PbColumnCatalog;
23
24#[expect(deprecated)]
25use super::fs_reader::LegacyFsSourceReader;
26use super::reader::SourceReader;
27use crate::error::ConnectorResult;
28use crate::parser::additional_columns::{
29 derive_pulsar_message_id_data_column, source_add_partition_offset_cols,
30};
31use crate::parser::{EncodingProperties, ProtocolProperties, SpecificParserConfig};
32use crate::source::monitor::SourceMetrics;
33use crate::source::{SourceColumnDesc, SourceColumnType, UPSTREAM_SOURCE_KEY};
34use crate::{WithOptionsSecResolved, WithPropertiesExt};
35
36pub const DEFAULT_CONNECTOR_MESSAGE_BUFFER_SIZE: usize = 16;
37
38#[derive(Debug, Clone)]
40pub struct SourceDesc {
41 pub source_info: PbStreamSourceInfo,
42 pub source: SourceReader,
43 pub columns: Vec<SourceColumnDesc>,
44 pub metrics: Arc<SourceMetrics>,
45}
46
47impl SourceDesc {
48 pub fn update_reader(
49 &mut self,
50 props_plaintext: HashMap<String, String>,
51 ) -> ConnectorResult<()> {
52 let props_wrapper =
53 WithOptionsSecResolved::without_secrets(props_plaintext.into_iter().collect());
54 let parser_config = SpecificParserConfig::new(&self.source_info, &props_wrapper)?;
55
56 let reader = SourceReader::new(
57 props_wrapper,
58 self.columns.clone(),
59 self.source.connector_message_buffer_size,
60 parser_config,
61 )?;
62 self.source = reader;
63 Ok(())
64 }
65}
66
67#[deprecated = "will be replaced by new fs source (list + fetch)"]
69#[expect(deprecated)]
70#[derive(Debug)]
71pub struct LegacyFsSourceDesc {
72 pub source: LegacyFsSourceReader,
73 pub columns: Vec<SourceColumnDesc>,
74 pub metrics: Arc<SourceMetrics>,
75}
76
77#[derive(Clone)]
78pub struct SourceDescBuilder {
79 columns: Vec<ColumnCatalog>,
80 metrics: Arc<SourceMetrics>,
81 row_id_index: Option<usize>,
82 with_properties: WithOptionsSecResolved,
83 source_info: PbStreamSourceInfo,
84 connector_message_buffer_size: usize,
85 pk_indices: Vec<usize>,
86}
87
88impl SourceDescBuilder {
89 pub fn new(
90 columns: Vec<PbColumnCatalog>,
91 metrics: Arc<SourceMetrics>,
92 row_id_index: Option<usize>,
93 with_properties: WithOptionsSecResolved,
94 source_info: PbStreamSourceInfo,
95 connector_message_buffer_size: usize,
96 pk_indices: Vec<usize>,
97 ) -> Self {
98 Self {
99 columns: columns.into_iter().map(ColumnCatalog::from).collect(),
100 metrics,
101 row_id_index,
102 with_properties,
103 source_info,
104 connector_message_buffer_size,
105 pk_indices,
106 }
107 }
108
109 pub fn column_catalogs_to_source_column_descs(&self) -> Vec<SourceColumnDesc> {
112 let connector_name = self
113 .with_properties
114 .get(UPSTREAM_SOURCE_KEY)
115 .map(|s| s.to_lowercase())
116 .unwrap();
117 let (columns_exist, additional_columns) = {
118 let (mut columns_exist, mut additional_columns) =
119 source_add_partition_offset_cols(&self.columns, &connector_name, false);
120
121 if self.with_properties.is_pulsar_connector() {
123 derive_pulsar_message_id_data_column(
124 &connector_name,
125 &mut columns_exist,
126 &mut additional_columns,
127 );
128 }
129 (columns_exist, additional_columns)
130 };
131
132 let mut columns: Vec<_> = self
133 .columns
134 .iter()
135 .map(|c| SourceColumnDesc::from(&c.column_desc))
136 .collect();
137
138 if connector_name != "iceberg" {
141 for (existed, c) in columns_exist.iter().zip_eq_fast(&additional_columns) {
142 if !existed {
143 columns.push(SourceColumnDesc::hidden_addition_col_from_column_desc(c));
144 }
145 }
146 }
147
148 if let Some(row_id_index) = self.row_id_index {
149 columns[row_id_index].column_type = SourceColumnType::RowId;
150 }
151 for pk_index in &self.pk_indices {
152 columns[*pk_index].is_pk = true;
153 }
154 columns
155 }
156
157 pub fn build(self) -> ConnectorResult<SourceDesc> {
158 let columns = self.column_catalogs_to_source_column_descs();
159
160 let parser_config = SpecificParserConfig::new(&self.source_info, &self.with_properties)?;
161
162 let source = SourceReader::new(
163 self.with_properties,
164 columns.clone(),
165 self.connector_message_buffer_size,
166 parser_config,
167 )?;
168
169 Ok(SourceDesc {
170 source,
171 columns,
172 metrics: self.metrics,
173 source_info: self.source_info,
174 })
175 }
176
177 pub fn metrics(&self) -> Arc<SourceMetrics> {
178 self.metrics.clone()
179 }
180
181 #[deprecated = "will be replaced by new fs source (list + fetch)"]
182 #[expect(deprecated)]
183 pub fn build_fs_source_desc(&self) -> ConnectorResult<LegacyFsSourceDesc> {
184 let parser_config = SpecificParserConfig::new(&self.source_info, &self.with_properties)?;
185
186 match (
187 &parser_config.protocol_config,
188 &parser_config.encoding_config,
189 ) {
190 (
191 ProtocolProperties::Plain,
192 EncodingProperties::Csv(_) | EncodingProperties::Json(_),
193 ) => {}
194 (format, encode) => {
195 bail!(
196 "Unsupported combination of format {:?} and encode {:?}",
197 format,
198 encode,
199 );
200 }
201 }
202
203 let columns = self.column_catalogs_to_source_column_descs();
204
205 let source = LegacyFsSourceReader::new(
206 self.with_properties.clone(),
207 columns.clone(),
208 parser_config,
209 )?;
210
211 Ok(LegacyFsSourceDesc {
212 source,
213 columns,
214 metrics: self.metrics.clone(),
215 })
216 }
217
218 pub fn with_properties(&self) -> WithOptionsSecResolved {
219 self.with_properties.clone()
220 }
221}
222
223pub mod test_utils {
224 use std::collections::BTreeMap;
225
226 use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, Schema};
227 use risingwave_pb::catalog::StreamSourceInfo;
228
229 use super::{DEFAULT_CONNECTOR_MESSAGE_BUFFER_SIZE, SourceDescBuilder};
230
231 pub fn create_source_desc_builder(
232 schema: &Schema,
233 row_id_index: Option<usize>,
234 source_info: StreamSourceInfo,
235 with_properties: BTreeMap<String, String>,
236 pk_indices: Vec<usize>,
237 ) -> SourceDescBuilder {
238 let columns = schema
239 .fields
240 .iter()
241 .enumerate()
242 .map(|(i, f)| {
243 ColumnCatalog::visible(ColumnDesc::named(
244 f.name.clone(),
245 (i as i32).into(), f.data_type.clone(),
247 ))
248 })
249 .collect();
250 let options_with_secret = crate::WithOptionsSecResolved::without_secrets(with_properties);
251 SourceDescBuilder {
252 columns,
253 metrics: Default::default(),
254 row_id_index,
255 with_properties: options_with_secret,
256 source_info,
257 connector_message_buffer_size: DEFAULT_CONNECTOR_MESSAGE_BUFFER_SIZE,
258 pk_indices,
259 }
260 }
261}