risingwave_connector/source/reader/
desc.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use risingwave_common::bail;
19use risingwave_common::catalog::ColumnCatalog;
20use risingwave_common::util::iter_util::ZipEqFast;
21use risingwave_pb::catalog::PbStreamSourceInfo;
22use risingwave_pb::plan_common::PbColumnCatalog;
23
24#[expect(deprecated)]
25use super::fs_reader::LegacyFsSourceReader;
26use super::reader::SourceReader;
27use crate::WithOptionsSecResolved;
28use crate::error::ConnectorResult;
29use crate::parser::additional_columns::source_add_partition_offset_cols;
30use crate::parser::{EncodingProperties, ProtocolProperties, SpecificParserConfig};
31use crate::source::monitor::SourceMetrics;
32use crate::source::{SourceColumnDesc, SourceColumnType, UPSTREAM_SOURCE_KEY};
33
34pub const DEFAULT_CONNECTOR_MESSAGE_BUFFER_SIZE: usize = 16;
35
36/// `SourceDesc` describes a stream source.
37#[derive(Debug, Clone)]
38pub struct SourceDesc {
39    pub source_info: PbStreamSourceInfo,
40    pub source: SourceReader,
41    pub columns: Vec<SourceColumnDesc>,
42    pub metrics: Arc<SourceMetrics>,
43}
44
45impl SourceDesc {
46    pub fn update_reader(
47        &mut self,
48        props_plaintext: HashMap<String, String>,
49    ) -> ConnectorResult<()> {
50        let props_wrapper =
51            WithOptionsSecResolved::without_secrets(props_plaintext.into_iter().collect());
52        let parser_config = SpecificParserConfig::new(&self.source_info, &props_wrapper)?;
53
54        let reader = SourceReader::new(
55            props_wrapper,
56            self.columns.clone(),
57            self.source.connector_message_buffer_size,
58            parser_config,
59        )?;
60        self.source = reader;
61        Ok(())
62    }
63}
64
65/// `FsSourceDesc` describes a stream source.
66#[deprecated = "will be replaced by new fs source (list + fetch)"]
67#[expect(deprecated)]
68#[derive(Debug)]
69pub struct LegacyFsSourceDesc {
70    pub source: LegacyFsSourceReader,
71    pub columns: Vec<SourceColumnDesc>,
72    pub metrics: Arc<SourceMetrics>,
73}
74
75#[derive(Clone)]
76pub struct SourceDescBuilder {
77    columns: Vec<ColumnCatalog>,
78    metrics: Arc<SourceMetrics>,
79    row_id_index: Option<usize>,
80    with_properties: WithOptionsSecResolved,
81    source_info: PbStreamSourceInfo,
82    connector_message_buffer_size: usize,
83    pk_indices: Vec<usize>,
84}
85
86impl SourceDescBuilder {
87    pub fn new(
88        columns: Vec<PbColumnCatalog>,
89        metrics: Arc<SourceMetrics>,
90        row_id_index: Option<usize>,
91        with_properties: WithOptionsSecResolved,
92        source_info: PbStreamSourceInfo,
93        connector_message_buffer_size: usize,
94        pk_indices: Vec<usize>,
95    ) -> Self {
96        Self {
97            columns: columns.into_iter().map(ColumnCatalog::from).collect(),
98            metrics,
99            row_id_index,
100            with_properties,
101            source_info,
102            connector_message_buffer_size,
103            pk_indices,
104        }
105    }
106
107    /// This function builds `SourceColumnDesc` from `ColumnCatalog`, and handle the creation
108    /// of hidden columns like partition/file, offset that are not specified by user.
109    pub fn column_catalogs_to_source_column_descs(&self) -> Vec<SourceColumnDesc> {
110        let connector_name = self
111            .with_properties
112            .get(UPSTREAM_SOURCE_KEY)
113            .map(|s| s.to_lowercase())
114            .unwrap();
115        let (columns_exist, additional_columns) =
116            source_add_partition_offset_cols(&self.columns, &connector_name, false);
117
118        let mut columns: Vec<_> = self
119            .columns
120            .iter()
121            .map(|c| SourceColumnDesc::from(&c.column_desc))
122            .collect();
123
124        // currently iceberg uses other columns. See `extract_iceberg_columns`
125        // TODO: unify logic.
126        if connector_name != "iceberg" {
127            for (existed, c) in columns_exist.iter().zip_eq_fast(&additional_columns) {
128                if !existed {
129                    columns.push(SourceColumnDesc::hidden_addition_col_from_column_desc(c));
130                }
131            }
132        }
133
134        if let Some(row_id_index) = self.row_id_index {
135            columns[row_id_index].column_type = SourceColumnType::RowId;
136        }
137        for pk_index in &self.pk_indices {
138            columns[*pk_index].is_pk = true;
139        }
140        columns
141    }
142
143    pub fn build(self) -> ConnectorResult<SourceDesc> {
144        let columns = self.column_catalogs_to_source_column_descs();
145
146        let parser_config = SpecificParserConfig::new(&self.source_info, &self.with_properties)?;
147
148        let source = SourceReader::new(
149            self.with_properties,
150            columns.clone(),
151            self.connector_message_buffer_size,
152            parser_config,
153        )?;
154
155        Ok(SourceDesc {
156            source,
157            columns,
158            metrics: self.metrics,
159            source_info: self.source_info,
160        })
161    }
162
163    pub fn metrics(&self) -> Arc<SourceMetrics> {
164        self.metrics.clone()
165    }
166
167    #[deprecated = "will be replaced by new fs source (list + fetch)"]
168    #[expect(deprecated)]
169    pub fn build_fs_source_desc(&self) -> ConnectorResult<LegacyFsSourceDesc> {
170        let parser_config = SpecificParserConfig::new(&self.source_info, &self.with_properties)?;
171
172        match (
173            &parser_config.protocol_config,
174            &parser_config.encoding_config,
175        ) {
176            (
177                ProtocolProperties::Plain,
178                EncodingProperties::Csv(_) | EncodingProperties::Json(_),
179            ) => {}
180            (format, encode) => {
181                bail!(
182                    "Unsupported combination of format {:?} and encode {:?}",
183                    format,
184                    encode,
185                );
186            }
187        }
188
189        let columns = self.column_catalogs_to_source_column_descs();
190
191        let source = LegacyFsSourceReader::new(
192            self.with_properties.clone(),
193            columns.clone(),
194            parser_config,
195        )?;
196
197        Ok(LegacyFsSourceDesc {
198            source,
199            columns,
200            metrics: self.metrics.clone(),
201        })
202    }
203
204    pub fn with_properties(&self) -> WithOptionsSecResolved {
205        self.with_properties.clone()
206    }
207}
208
209pub mod test_utils {
210    use std::collections::BTreeMap;
211
212    use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, Schema};
213    use risingwave_pb::catalog::StreamSourceInfo;
214
215    use super::{DEFAULT_CONNECTOR_MESSAGE_BUFFER_SIZE, SourceDescBuilder};
216
217    pub fn create_source_desc_builder(
218        schema: &Schema,
219        row_id_index: Option<usize>,
220        source_info: StreamSourceInfo,
221        with_properties: BTreeMap<String, String>,
222        pk_indices: Vec<usize>,
223    ) -> SourceDescBuilder {
224        let columns = schema
225            .fields
226            .iter()
227            .enumerate()
228            .map(|(i, f)| {
229                ColumnCatalog::visible(ColumnDesc::named(
230                    f.name.clone(),
231                    (i as i32).into(), // use column index as column id
232                    f.data_type.clone(),
233                ))
234            })
235            .collect();
236        let options_with_secret =
237            crate::WithOptionsSecResolved::without_secrets(with_properties.clone());
238        SourceDescBuilder {
239            columns,
240            metrics: Default::default(),
241            row_id_index,
242            with_properties: options_with_secret,
243            source_info,
244            connector_message_buffer_size: DEFAULT_CONNECTOR_MESSAGE_BUFFER_SIZE,
245            pk_indices,
246        }
247    }
248}