risingwave_connector/source/reader/
desc.rs1use std::collections::HashMap;
16use std::sync::Arc;
17
18use risingwave_common::bail;
19use risingwave_common::catalog::ColumnCatalog;
20use risingwave_common::util::iter_util::ZipEqFast;
21use risingwave_pb::catalog::PbStreamSourceInfo;
22use risingwave_pb::plan_common::PbColumnCatalog;
23
24#[expect(deprecated)]
25use super::fs_reader::LegacyFsSourceReader;
26use super::reader::SourceReader;
27use crate::WithOptionsSecResolved;
28use crate::error::ConnectorResult;
29use crate::parser::additional_columns::source_add_partition_offset_cols;
30use crate::parser::{EncodingProperties, ProtocolProperties, SpecificParserConfig};
31use crate::source::monitor::SourceMetrics;
32use crate::source::{SourceColumnDesc, SourceColumnType, UPSTREAM_SOURCE_KEY};
33
34pub const DEFAULT_CONNECTOR_MESSAGE_BUFFER_SIZE: usize = 16;
35
36#[derive(Debug, Clone)]
38pub struct SourceDesc {
39 pub source_info: PbStreamSourceInfo,
40 pub source: SourceReader,
41 pub columns: Vec<SourceColumnDesc>,
42 pub metrics: Arc<SourceMetrics>,
43}
44
45impl SourceDesc {
46 pub fn update_reader(
47 &mut self,
48 props_plaintext: HashMap<String, String>,
49 ) -> ConnectorResult<()> {
50 let props_wrapper =
51 WithOptionsSecResolved::without_secrets(props_plaintext.into_iter().collect());
52 let parser_config = SpecificParserConfig::new(&self.source_info, &props_wrapper)?;
53
54 let reader = SourceReader::new(
55 props_wrapper,
56 self.columns.clone(),
57 self.source.connector_message_buffer_size,
58 parser_config,
59 )?;
60 self.source = reader;
61 Ok(())
62 }
63}
64
65#[deprecated = "will be replaced by new fs source (list + fetch)"]
67#[expect(deprecated)]
68#[derive(Debug)]
69pub struct LegacyFsSourceDesc {
70 pub source: LegacyFsSourceReader,
71 pub columns: Vec<SourceColumnDesc>,
72 pub metrics: Arc<SourceMetrics>,
73}
74
75#[derive(Clone)]
76pub struct SourceDescBuilder {
77 columns: Vec<ColumnCatalog>,
78 metrics: Arc<SourceMetrics>,
79 row_id_index: Option<usize>,
80 with_properties: WithOptionsSecResolved,
81 source_info: PbStreamSourceInfo,
82 connector_message_buffer_size: usize,
83 pk_indices: Vec<usize>,
84}
85
86impl SourceDescBuilder {
87 pub fn new(
88 columns: Vec<PbColumnCatalog>,
89 metrics: Arc<SourceMetrics>,
90 row_id_index: Option<usize>,
91 with_properties: WithOptionsSecResolved,
92 source_info: PbStreamSourceInfo,
93 connector_message_buffer_size: usize,
94 pk_indices: Vec<usize>,
95 ) -> Self {
96 Self {
97 columns: columns.into_iter().map(ColumnCatalog::from).collect(),
98 metrics,
99 row_id_index,
100 with_properties,
101 source_info,
102 connector_message_buffer_size,
103 pk_indices,
104 }
105 }
106
107 pub fn column_catalogs_to_source_column_descs(&self) -> Vec<SourceColumnDesc> {
110 let connector_name = self
111 .with_properties
112 .get(UPSTREAM_SOURCE_KEY)
113 .map(|s| s.to_lowercase())
114 .unwrap();
115 let (columns_exist, additional_columns) =
116 source_add_partition_offset_cols(&self.columns, &connector_name, false);
117
118 let mut columns: Vec<_> = self
119 .columns
120 .iter()
121 .map(|c| SourceColumnDesc::from(&c.column_desc))
122 .collect();
123
124 if connector_name != "iceberg" {
127 for (existed, c) in columns_exist.iter().zip_eq_fast(&additional_columns) {
128 if !existed {
129 columns.push(SourceColumnDesc::hidden_addition_col_from_column_desc(c));
130 }
131 }
132 }
133
134 if let Some(row_id_index) = self.row_id_index {
135 columns[row_id_index].column_type = SourceColumnType::RowId;
136 }
137 for pk_index in &self.pk_indices {
138 columns[*pk_index].is_pk = true;
139 }
140 columns
141 }
142
143 pub fn build(self) -> ConnectorResult<SourceDesc> {
144 let columns = self.column_catalogs_to_source_column_descs();
145
146 let parser_config = SpecificParserConfig::new(&self.source_info, &self.with_properties)?;
147
148 let source = SourceReader::new(
149 self.with_properties,
150 columns.clone(),
151 self.connector_message_buffer_size,
152 parser_config,
153 )?;
154
155 Ok(SourceDesc {
156 source,
157 columns,
158 metrics: self.metrics,
159 source_info: self.source_info,
160 })
161 }
162
163 pub fn metrics(&self) -> Arc<SourceMetrics> {
164 self.metrics.clone()
165 }
166
167 #[deprecated = "will be replaced by new fs source (list + fetch)"]
168 #[expect(deprecated)]
169 pub fn build_fs_source_desc(&self) -> ConnectorResult<LegacyFsSourceDesc> {
170 let parser_config = SpecificParserConfig::new(&self.source_info, &self.with_properties)?;
171
172 match (
173 &parser_config.protocol_config,
174 &parser_config.encoding_config,
175 ) {
176 (
177 ProtocolProperties::Plain,
178 EncodingProperties::Csv(_) | EncodingProperties::Json(_),
179 ) => {}
180 (format, encode) => {
181 bail!(
182 "Unsupported combination of format {:?} and encode {:?}",
183 format,
184 encode,
185 );
186 }
187 }
188
189 let columns = self.column_catalogs_to_source_column_descs();
190
191 let source = LegacyFsSourceReader::new(
192 self.with_properties.clone(),
193 columns.clone(),
194 parser_config,
195 )?;
196
197 Ok(LegacyFsSourceDesc {
198 source,
199 columns,
200 metrics: self.metrics.clone(),
201 })
202 }
203
204 pub fn with_properties(&self) -> WithOptionsSecResolved {
205 self.with_properties.clone()
206 }
207}
208
209pub mod test_utils {
210 use std::collections::BTreeMap;
211
212 use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, Schema};
213 use risingwave_pb::catalog::StreamSourceInfo;
214
215 use super::{DEFAULT_CONNECTOR_MESSAGE_BUFFER_SIZE, SourceDescBuilder};
216
217 pub fn create_source_desc_builder(
218 schema: &Schema,
219 row_id_index: Option<usize>,
220 source_info: StreamSourceInfo,
221 with_properties: BTreeMap<String, String>,
222 pk_indices: Vec<usize>,
223 ) -> SourceDescBuilder {
224 let columns = schema
225 .fields
226 .iter()
227 .enumerate()
228 .map(|(i, f)| {
229 ColumnCatalog::visible(ColumnDesc::named(
230 f.name.clone(),
231 (i as i32).into(), f.data_type.clone(),
233 ))
234 })
235 .collect();
236 let options_with_secret =
237 crate::WithOptionsSecResolved::without_secrets(with_properties.clone());
238 SourceDescBuilder {
239 columns,
240 metrics: Default::default(),
241 row_id_index,
242 with_properties: options_with_secret,
243 source_info,
244 connector_message_buffer_size: DEFAULT_CONNECTOR_MESSAGE_BUFFER_SIZE,
245 pk_indices,
246 }
247 }
248}