risingwave_connector/source/reader/
desc.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use risingwave_common::bail;
19use risingwave_common::catalog::ColumnCatalog;
20use risingwave_common::util::iter_util::ZipEqFast;
21use risingwave_pb::catalog::PbStreamSourceInfo;
22use risingwave_pb::plan_common::PbColumnCatalog;
23
24#[expect(deprecated)]
25use super::fs_reader::LegacyFsSourceReader;
26use super::reader::SourceReader;
27use crate::error::ConnectorResult;
28use crate::parser::additional_columns::{
29    derive_pulsar_message_id_data_column, source_add_partition_offset_cols,
30};
31use crate::parser::{EncodingProperties, ProtocolProperties, SpecificParserConfig};
32use crate::source::monitor::SourceMetrics;
33use crate::source::{SourceColumnDesc, SourceColumnType, UPSTREAM_SOURCE_KEY};
34use crate::{WithOptionsSecResolved, WithPropertiesExt};
35
36pub const DEFAULT_CONNECTOR_MESSAGE_BUFFER_SIZE: usize = 16;
37
38/// `SourceDesc` describes a stream source.
39#[derive(Debug, Clone)]
40pub struct SourceDesc {
41    pub source_info: PbStreamSourceInfo,
42    pub source: SourceReader,
43    pub columns: Vec<SourceColumnDesc>,
44    pub metrics: Arc<SourceMetrics>,
45}
46
47impl SourceDesc {
48    pub fn update_reader(
49        &mut self,
50        props_plaintext: HashMap<String, String>,
51    ) -> ConnectorResult<()> {
52        let props_wrapper =
53            WithOptionsSecResolved::without_secrets(props_plaintext.into_iter().collect());
54        let parser_config = SpecificParserConfig::new(&self.source_info, &props_wrapper)?;
55
56        let reader = SourceReader::new(
57            props_wrapper,
58            self.columns.clone(),
59            self.source.connector_message_buffer_size,
60            parser_config,
61        )?;
62        self.source = reader;
63        Ok(())
64    }
65}
66
67/// `FsSourceDesc` describes a stream source.
68#[deprecated = "will be replaced by new fs source (list + fetch)"]
69#[expect(deprecated)]
70#[derive(Debug)]
71pub struct LegacyFsSourceDesc {
72    pub source: LegacyFsSourceReader,
73    pub columns: Vec<SourceColumnDesc>,
74    pub metrics: Arc<SourceMetrics>,
75}
76
77#[derive(Clone)]
78pub struct SourceDescBuilder {
79    columns: Vec<ColumnCatalog>,
80    metrics: Arc<SourceMetrics>,
81    row_id_index: Option<usize>,
82    with_properties: WithOptionsSecResolved,
83    source_info: PbStreamSourceInfo,
84    connector_message_buffer_size: usize,
85    pk_indices: Vec<usize>,
86}
87
88impl SourceDescBuilder {
89    pub fn new(
90        columns: Vec<PbColumnCatalog>,
91        metrics: Arc<SourceMetrics>,
92        row_id_index: Option<usize>,
93        with_properties: WithOptionsSecResolved,
94        source_info: PbStreamSourceInfo,
95        connector_message_buffer_size: usize,
96        pk_indices: Vec<usize>,
97    ) -> Self {
98        Self {
99            columns: columns.into_iter().map(ColumnCatalog::from).collect(),
100            metrics,
101            row_id_index,
102            with_properties,
103            source_info,
104            connector_message_buffer_size,
105            pk_indices,
106        }
107    }
108
109    /// This function builds `SourceColumnDesc` from `ColumnCatalog`, and handle the creation
110    /// of hidden columns like partition/file, offset that are not specified by user.
111    pub fn column_catalogs_to_source_column_descs(&self) -> Vec<SourceColumnDesc> {
112        let connector_name = self
113            .with_properties
114            .get(UPSTREAM_SOURCE_KEY)
115            .map(|s| s.to_lowercase())
116            .unwrap();
117        let (columns_exist, additional_columns) = {
118            let (mut columns_exist, mut additional_columns) =
119                source_add_partition_offset_cols(&self.columns, &connector_name, false);
120
121            // add `message_id_data` column for pulsar source, which is used for ack message
122            if self.with_properties.is_pulsar_connector() {
123                derive_pulsar_message_id_data_column(
124                    &connector_name,
125                    &mut columns_exist,
126                    &mut additional_columns,
127                );
128            }
129            (columns_exist, additional_columns)
130        };
131
132        let mut columns: Vec<_> = self
133            .columns
134            .iter()
135            .map(|c| SourceColumnDesc::from(&c.column_desc))
136            .collect();
137
138        // currently iceberg uses other columns. See `extract_iceberg_columns`
139        // TODO: unify logic.
140        if connector_name != "iceberg" {
141            for (existed, c) in columns_exist.iter().zip_eq_fast(&additional_columns) {
142                if !existed {
143                    columns.push(SourceColumnDesc::hidden_addition_col_from_column_desc(c));
144                }
145            }
146        }
147
148        if let Some(row_id_index) = self.row_id_index {
149            columns[row_id_index].column_type = SourceColumnType::RowId;
150        }
151        for pk_index in &self.pk_indices {
152            columns[*pk_index].is_pk = true;
153        }
154        columns
155    }
156
157    pub fn build(self) -> ConnectorResult<SourceDesc> {
158        let columns = self.column_catalogs_to_source_column_descs();
159
160        let parser_config = SpecificParserConfig::new(&self.source_info, &self.with_properties)?;
161
162        let source = SourceReader::new(
163            self.with_properties,
164            columns.clone(),
165            self.connector_message_buffer_size,
166            parser_config,
167        )?;
168
169        Ok(SourceDesc {
170            source,
171            columns,
172            metrics: self.metrics,
173            source_info: self.source_info,
174        })
175    }
176
177    pub fn metrics(&self) -> Arc<SourceMetrics> {
178        self.metrics.clone()
179    }
180
181    #[deprecated = "will be replaced by new fs source (list + fetch)"]
182    #[expect(deprecated)]
183    pub fn build_fs_source_desc(&self) -> ConnectorResult<LegacyFsSourceDesc> {
184        let parser_config = SpecificParserConfig::new(&self.source_info, &self.with_properties)?;
185
186        match (
187            &parser_config.protocol_config,
188            &parser_config.encoding_config,
189        ) {
190            (
191                ProtocolProperties::Plain,
192                EncodingProperties::Csv(_) | EncodingProperties::Json(_),
193            ) => {}
194            (format, encode) => {
195                bail!(
196                    "Unsupported combination of format {:?} and encode {:?}",
197                    format,
198                    encode,
199                );
200            }
201        }
202
203        let columns = self.column_catalogs_to_source_column_descs();
204
205        let source = LegacyFsSourceReader::new(
206            self.with_properties.clone(),
207            columns.clone(),
208            parser_config,
209        )?;
210
211        Ok(LegacyFsSourceDesc {
212            source,
213            columns,
214            metrics: self.metrics.clone(),
215        })
216    }
217
218    pub fn with_properties(&self) -> WithOptionsSecResolved {
219        self.with_properties.clone()
220    }
221}
222
223pub mod test_utils {
224    use std::collections::BTreeMap;
225
226    use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, Schema};
227    use risingwave_pb::catalog::StreamSourceInfo;
228
229    use super::{DEFAULT_CONNECTOR_MESSAGE_BUFFER_SIZE, SourceDescBuilder};
230
231    pub fn create_source_desc_builder(
232        schema: &Schema,
233        row_id_index: Option<usize>,
234        source_info: StreamSourceInfo,
235        with_properties: BTreeMap<String, String>,
236        pk_indices: Vec<usize>,
237    ) -> SourceDescBuilder {
238        let columns = schema
239            .fields
240            .iter()
241            .enumerate()
242            .map(|(i, f)| {
243                ColumnCatalog::visible(ColumnDesc::named(
244                    f.name.clone(),
245                    (i as i32).into(), // use column index as column id
246                    f.data_type.clone(),
247                ))
248            })
249            .collect();
250        let options_with_secret = crate::WithOptionsSecResolved::without_secrets(with_properties);
251        SourceDescBuilder {
252            columns,
253            metrics: Default::default(),
254            row_id_index,
255            with_properties: options_with_secret,
256            source_info,
257            connector_message_buffer_size: DEFAULT_CONNECTOR_MESSAGE_BUFFER_SIZE,
258            pk_indices,
259        }
260    }
261}