risingwave_connector/source/reader/
desc.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use risingwave_common::bail;
18use risingwave_common::catalog::ColumnCatalog;
19use risingwave_common::util::iter_util::ZipEqFast;
20use risingwave_pb::catalog::PbStreamSourceInfo;
21use risingwave_pb::plan_common::PbColumnCatalog;
22
23#[expect(deprecated)]
24use super::fs_reader::LegacyFsSourceReader;
25use super::reader::SourceReader;
26use crate::WithOptionsSecResolved;
27use crate::error::ConnectorResult;
28use crate::parser::additional_columns::source_add_partition_offset_cols;
29use crate::parser::{EncodingProperties, ProtocolProperties, SpecificParserConfig};
30use crate::source::monitor::SourceMetrics;
31use crate::source::{SourceColumnDesc, SourceColumnType, UPSTREAM_SOURCE_KEY};
32
33pub const DEFAULT_CONNECTOR_MESSAGE_BUFFER_SIZE: usize = 16;
34
35/// `SourceDesc` describes a stream source.
36#[derive(Debug, Clone)]
37pub struct SourceDesc {
38    pub source: SourceReader,
39    pub columns: Vec<SourceColumnDesc>,
40    pub metrics: Arc<SourceMetrics>,
41}
42
43/// `FsSourceDesc` describes a stream source.
44#[deprecated = "will be replaced by new fs source (list + fetch)"]
45#[expect(deprecated)]
46#[derive(Debug)]
47pub struct LegacyFsSourceDesc {
48    pub source: LegacyFsSourceReader,
49    pub columns: Vec<SourceColumnDesc>,
50    pub metrics: Arc<SourceMetrics>,
51}
52
53#[derive(Clone)]
54pub struct SourceDescBuilder {
55    columns: Vec<ColumnCatalog>,
56    metrics: Arc<SourceMetrics>,
57    row_id_index: Option<usize>,
58    with_properties: WithOptionsSecResolved,
59    source_info: PbStreamSourceInfo,
60    connector_message_buffer_size: usize,
61    pk_indices: Vec<usize>,
62}
63
64impl SourceDescBuilder {
65    pub fn new(
66        columns: Vec<PbColumnCatalog>,
67        metrics: Arc<SourceMetrics>,
68        row_id_index: Option<usize>,
69        with_properties: WithOptionsSecResolved,
70        source_info: PbStreamSourceInfo,
71        connector_message_buffer_size: usize,
72        pk_indices: Vec<usize>,
73    ) -> Self {
74        Self {
75            columns: columns.into_iter().map(ColumnCatalog::from).collect(),
76            metrics,
77            row_id_index,
78            with_properties,
79            source_info,
80            connector_message_buffer_size,
81            pk_indices,
82        }
83    }
84
85    /// This function builds `SourceColumnDesc` from `ColumnCatalog`, and handle the creation
86    /// of hidden columns like partition/file, offset that are not specified by user.
87    pub fn column_catalogs_to_source_column_descs(&self) -> Vec<SourceColumnDesc> {
88        let connector_name = self
89            .with_properties
90            .get(UPSTREAM_SOURCE_KEY)
91            .map(|s| s.to_lowercase())
92            .unwrap();
93        let (columns_exist, additional_columns) =
94            source_add_partition_offset_cols(&self.columns, &connector_name, false);
95
96        let mut columns: Vec<_> = self
97            .columns
98            .iter()
99            .map(|c| SourceColumnDesc::from(&c.column_desc))
100            .collect();
101
102        // currently iceberg uses other columns. See `extract_iceberg_columns`
103        // TODO: unify logic.
104        if connector_name != "iceberg" {
105            for (existed, c) in columns_exist.iter().zip_eq_fast(&additional_columns) {
106                if !existed {
107                    columns.push(SourceColumnDesc::hidden_addition_col_from_column_desc(c));
108                }
109            }
110        }
111
112        if let Some(row_id_index) = self.row_id_index {
113            columns[row_id_index].column_type = SourceColumnType::RowId;
114        }
115        for pk_index in &self.pk_indices {
116            columns[*pk_index].is_pk = true;
117        }
118        columns
119    }
120
121    pub fn build(self) -> ConnectorResult<SourceDesc> {
122        let columns = self.column_catalogs_to_source_column_descs();
123
124        let parser_config = SpecificParserConfig::new(&self.source_info, &self.with_properties)?;
125
126        let source = SourceReader::new(
127            self.with_properties,
128            columns.clone(),
129            self.connector_message_buffer_size,
130            parser_config,
131        )?;
132
133        Ok(SourceDesc {
134            source,
135            columns,
136            metrics: self.metrics,
137        })
138    }
139
140    pub fn metrics(&self) -> Arc<SourceMetrics> {
141        self.metrics.clone()
142    }
143
144    #[deprecated = "will be replaced by new fs source (list + fetch)"]
145    #[expect(deprecated)]
146    pub fn build_fs_source_desc(&self) -> ConnectorResult<LegacyFsSourceDesc> {
147        let parser_config = SpecificParserConfig::new(&self.source_info, &self.with_properties)?;
148
149        match (
150            &parser_config.protocol_config,
151            &parser_config.encoding_config,
152        ) {
153            (
154                ProtocolProperties::Plain,
155                EncodingProperties::Csv(_) | EncodingProperties::Json(_),
156            ) => {}
157            (format, encode) => {
158                bail!(
159                    "Unsupported combination of format {:?} and encode {:?}",
160                    format,
161                    encode,
162                );
163            }
164        }
165
166        let columns = self.column_catalogs_to_source_column_descs();
167
168        let source = LegacyFsSourceReader::new(
169            self.with_properties.clone(),
170            columns.clone(),
171            parser_config,
172        )?;
173
174        Ok(LegacyFsSourceDesc {
175            source,
176            columns,
177            metrics: self.metrics.clone(),
178        })
179    }
180
181    pub fn with_properties(&self) -> WithOptionsSecResolved {
182        self.with_properties.clone()
183    }
184}
185
186pub mod test_utils {
187    use std::collections::BTreeMap;
188
189    use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, Schema};
190    use risingwave_pb::catalog::StreamSourceInfo;
191
192    use super::{DEFAULT_CONNECTOR_MESSAGE_BUFFER_SIZE, SourceDescBuilder};
193
194    pub fn create_source_desc_builder(
195        schema: &Schema,
196        row_id_index: Option<usize>,
197        source_info: StreamSourceInfo,
198        with_properties: BTreeMap<String, String>,
199        pk_indices: Vec<usize>,
200    ) -> SourceDescBuilder {
201        let columns = schema
202            .fields
203            .iter()
204            .enumerate()
205            .map(|(i, f)| {
206                ColumnCatalog::visible(ColumnDesc::named(
207                    f.name.clone(),
208                    (i as i32).into(), // use column index as column id
209                    f.data_type.clone(),
210                ))
211            })
212            .collect();
213        let options_with_secret =
214            crate::WithOptionsSecResolved::without_secrets(with_properties.clone());
215        SourceDescBuilder {
216            columns,
217            metrics: Default::default(),
218            row_id_index,
219            with_properties: options_with_secret,
220            source_info,
221            connector_message_buffer_size: DEFAULT_CONNECTOR_MESSAGE_BUFFER_SIZE,
222            pk_indices,
223        }
224    }
225}