risingwave_connector/source/reader/
desc.rs1use std::sync::Arc;
16
17use risingwave_common::bail;
18use risingwave_common::catalog::ColumnCatalog;
19use risingwave_common::util::iter_util::ZipEqFast;
20use risingwave_pb::catalog::PbStreamSourceInfo;
21use risingwave_pb::plan_common::PbColumnCatalog;
22
23#[expect(deprecated)]
24use super::fs_reader::LegacyFsSourceReader;
25use super::reader::SourceReader;
26use crate::WithOptionsSecResolved;
27use crate::error::ConnectorResult;
28use crate::parser::additional_columns::source_add_partition_offset_cols;
29use crate::parser::{EncodingProperties, ProtocolProperties, SpecificParserConfig};
30use crate::source::monitor::SourceMetrics;
31use crate::source::{SourceColumnDesc, SourceColumnType, UPSTREAM_SOURCE_KEY};
32
33pub const DEFAULT_CONNECTOR_MESSAGE_BUFFER_SIZE: usize = 16;
34
35#[derive(Debug, Clone)]
37pub struct SourceDesc {
38 pub source: SourceReader,
39 pub columns: Vec<SourceColumnDesc>,
40 pub metrics: Arc<SourceMetrics>,
41}
42
43#[deprecated = "will be replaced by new fs source (list + fetch)"]
45#[expect(deprecated)]
46#[derive(Debug)]
47pub struct LegacyFsSourceDesc {
48 pub source: LegacyFsSourceReader,
49 pub columns: Vec<SourceColumnDesc>,
50 pub metrics: Arc<SourceMetrics>,
51}
52
53#[derive(Clone)]
54pub struct SourceDescBuilder {
55 columns: Vec<ColumnCatalog>,
56 metrics: Arc<SourceMetrics>,
57 row_id_index: Option<usize>,
58 with_properties: WithOptionsSecResolved,
59 source_info: PbStreamSourceInfo,
60 connector_message_buffer_size: usize,
61 pk_indices: Vec<usize>,
62}
63
64impl SourceDescBuilder {
65 pub fn new(
66 columns: Vec<PbColumnCatalog>,
67 metrics: Arc<SourceMetrics>,
68 row_id_index: Option<usize>,
69 with_properties: WithOptionsSecResolved,
70 source_info: PbStreamSourceInfo,
71 connector_message_buffer_size: usize,
72 pk_indices: Vec<usize>,
73 ) -> Self {
74 Self {
75 columns: columns.into_iter().map(ColumnCatalog::from).collect(),
76 metrics,
77 row_id_index,
78 with_properties,
79 source_info,
80 connector_message_buffer_size,
81 pk_indices,
82 }
83 }
84
85 pub fn column_catalogs_to_source_column_descs(&self) -> Vec<SourceColumnDesc> {
88 let connector_name = self
89 .with_properties
90 .get(UPSTREAM_SOURCE_KEY)
91 .map(|s| s.to_lowercase())
92 .unwrap();
93 let (columns_exist, additional_columns) =
94 source_add_partition_offset_cols(&self.columns, &connector_name, false);
95
96 let mut columns: Vec<_> = self
97 .columns
98 .iter()
99 .map(|c| SourceColumnDesc::from(&c.column_desc))
100 .collect();
101
102 if connector_name != "iceberg" {
105 for (existed, c) in columns_exist.iter().zip_eq_fast(&additional_columns) {
106 if !existed {
107 columns.push(SourceColumnDesc::hidden_addition_col_from_column_desc(c));
108 }
109 }
110 }
111
112 if let Some(row_id_index) = self.row_id_index {
113 columns[row_id_index].column_type = SourceColumnType::RowId;
114 }
115 for pk_index in &self.pk_indices {
116 columns[*pk_index].is_pk = true;
117 }
118 columns
119 }
120
121 pub fn build(self) -> ConnectorResult<SourceDesc> {
122 let columns = self.column_catalogs_to_source_column_descs();
123
124 let parser_config = SpecificParserConfig::new(&self.source_info, &self.with_properties)?;
125
126 let source = SourceReader::new(
127 self.with_properties,
128 columns.clone(),
129 self.connector_message_buffer_size,
130 parser_config,
131 )?;
132
133 Ok(SourceDesc {
134 source,
135 columns,
136 metrics: self.metrics,
137 })
138 }
139
140 pub fn metrics(&self) -> Arc<SourceMetrics> {
141 self.metrics.clone()
142 }
143
144 #[deprecated = "will be replaced by new fs source (list + fetch)"]
145 #[expect(deprecated)]
146 pub fn build_fs_source_desc(&self) -> ConnectorResult<LegacyFsSourceDesc> {
147 let parser_config = SpecificParserConfig::new(&self.source_info, &self.with_properties)?;
148
149 match (
150 &parser_config.protocol_config,
151 &parser_config.encoding_config,
152 ) {
153 (
154 ProtocolProperties::Plain,
155 EncodingProperties::Csv(_) | EncodingProperties::Json(_),
156 ) => {}
157 (format, encode) => {
158 bail!(
159 "Unsupported combination of format {:?} and encode {:?}",
160 format,
161 encode,
162 );
163 }
164 }
165
166 let columns = self.column_catalogs_to_source_column_descs();
167
168 let source = LegacyFsSourceReader::new(
169 self.with_properties.clone(),
170 columns.clone(),
171 parser_config,
172 )?;
173
174 Ok(LegacyFsSourceDesc {
175 source,
176 columns,
177 metrics: self.metrics.clone(),
178 })
179 }
180
181 pub fn with_properties(&self) -> WithOptionsSecResolved {
182 self.with_properties.clone()
183 }
184}
185
186pub mod test_utils {
187 use std::collections::BTreeMap;
188
189 use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, Schema};
190 use risingwave_pb::catalog::StreamSourceInfo;
191
192 use super::{DEFAULT_CONNECTOR_MESSAGE_BUFFER_SIZE, SourceDescBuilder};
193
194 pub fn create_source_desc_builder(
195 schema: &Schema,
196 row_id_index: Option<usize>,
197 source_info: StreamSourceInfo,
198 with_properties: BTreeMap<String, String>,
199 pk_indices: Vec<usize>,
200 ) -> SourceDescBuilder {
201 let columns = schema
202 .fields
203 .iter()
204 .enumerate()
205 .map(|(i, f)| {
206 ColumnCatalog::visible(ColumnDesc::named(
207 f.name.clone(),
208 (i as i32).into(), f.data_type.clone(),
210 ))
211 })
212 .collect();
213 let options_with_secret =
214 crate::WithOptionsSecResolved::without_secrets(with_properties.clone());
215 SourceDescBuilder {
216 columns,
217 metrics: Default::default(),
218 row_id_index,
219 with_properties: options_with_secret,
220 source_info,
221 connector_message_buffer_size: DEFAULT_CONNECTOR_MESSAGE_BUFFER_SIZE,
222 pk_indices,
223 }
224 }
225}