risingwave_connector/parser/
config.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashSet};
16
17use risingwave_common::bail;
18use risingwave_common::secret::LocalSecretManager;
19use risingwave_connector_codec::decoder::avro::MapHandling;
20use risingwave_pb::catalog::{PbSchemaRegistryNameStrategy, StreamSourceInfo};
21
22use super::utils::get_kafka_topic;
23use super::{DebeziumProps, TimeHandling, TimestampHandling, TimestamptzHandling};
24use crate::WithOptionsSecResolved;
25use crate::connector_common::AwsAuthProps;
26use crate::error::ConnectorResult;
27use crate::parser::PROTOBUF_MESSAGES_AS_JSONB;
28use crate::schema::AWS_GLUE_SCHEMA_ARN_KEY;
29use crate::schema::schema_registry::SchemaRegistryConfig;
30use crate::source::cdc::CDC_MONGODB_STRONG_SCHEMA_KEY;
31use crate::source::{SourceColumnDesc, SourceEncode, SourceFormat, extract_source_struct};
32
33/// Note: this is created in `SourceReader::build_stream`
34#[derive(Debug, Clone, Default)]
35pub struct ParserConfig {
36    pub common: CommonParserConfig,
37    pub specific: SpecificParserConfig,
38}
39
40impl ParserConfig {
41    pub fn get_config(self) -> (Vec<SourceColumnDesc>, SpecificParserConfig) {
42        (self.common.rw_columns, self.specific)
43    }
44}
45
46#[derive(Debug, Clone, Default)]
47pub struct CommonParserConfig {
48    /// Note: this is created by `SourceDescBuilder::builder`
49    pub rw_columns: Vec<SourceColumnDesc>,
50}
51
52#[derive(Debug, Clone, Default)]
53pub struct SpecificParserConfig {
54    pub encoding_config: EncodingProperties,
55    pub protocol_config: ProtocolProperties,
56}
57
58#[derive(Debug, Default, Clone)]
59pub enum EncodingProperties {
60    Avro(AvroProperties),
61    Protobuf(ProtobufProperties),
62    Csv(CsvProperties),
63    Json(JsonProperties),
64    MongoJson(MongoProperties),
65    Bytes(BytesProperties),
66    Parquet,
67    Native,
68    /// Encoding can't be specified because the source will determines it. Now only used in Iceberg.
69    None,
70    #[default]
71    Unspecified,
72}
73
74#[derive(Debug, Default, Clone)]
75pub enum ProtocolProperties {
76    Debezium(DebeziumProps),
77    DebeziumMongo,
78    Maxwell,
79    Canal,
80    Plain,
81    Upsert,
82    Native,
83    /// Protocol can't be specified because the source will determines it. Now only used in Iceberg.
84    None,
85    #[default]
86    Unspecified,
87}
88
89impl SpecificParserConfig {
90    // for test only
91    pub const DEFAULT_PLAIN_JSON: SpecificParserConfig = SpecificParserConfig {
92        encoding_config: EncodingProperties::Json(JsonProperties {
93            use_schema_registry: false,
94            timestamp_handling: None,
95            timestamptz_handling: None,
96            time_handling: None,
97            handle_toast_columns: false,
98        }),
99        protocol_config: ProtocolProperties::Plain,
100    };
101
102    // The validity of (format, encode) is ensured by `extract_format_encode`
103    pub fn new(
104        info: &StreamSourceInfo,
105        with_properties: &WithOptionsSecResolved,
106    ) -> ConnectorResult<Self> {
107        let info = info.clone();
108        let source_struct = extract_source_struct(&info)?;
109        let format_encode_options_with_secret = LocalSecretManager::global()
110            .fill_secrets(info.format_encode_options, info.format_encode_secret_refs)?;
111        let (options, secret_refs) = with_properties.clone().into_parts();
112        // Make sure `with_properties` is no longer used by accident.
113        // All reads shall go to `options_with_secret` instead.
114        #[expect(unused_variables)]
115        let with_properties = ();
116        let options_with_secret =
117            LocalSecretManager::global().fill_secrets(options, secret_refs)?;
118        let format = source_struct.format;
119        let encode = source_struct.encode;
120        // this transformation is needed since there may be config for the protocol
121        // in the future
122        let protocol_config = match format {
123            SourceFormat::Native => ProtocolProperties::Native,
124            SourceFormat::None => ProtocolProperties::None,
125            SourceFormat::Debezium => {
126                let debezium_props = DebeziumProps::from(&format_encode_options_with_secret);
127                ProtocolProperties::Debezium(debezium_props)
128            }
129            SourceFormat::DebeziumMongo => ProtocolProperties::DebeziumMongo,
130            SourceFormat::Maxwell => ProtocolProperties::Maxwell,
131            SourceFormat::Canal => ProtocolProperties::Canal,
132            SourceFormat::Upsert => ProtocolProperties::Upsert,
133            SourceFormat::Plain => ProtocolProperties::Plain,
134            _ => unreachable!(),
135        };
136
137        let encoding_config = match (format, encode) {
138            (SourceFormat::Plain, SourceEncode::Csv) => EncodingProperties::Csv(CsvProperties {
139                delimiter: info.csv_delimiter as u8,
140                has_header: info.csv_has_header,
141            }),
142            (SourceFormat::Plain, SourceEncode::Parquet) => EncodingProperties::Parquet,
143            (SourceFormat::Plain, SourceEncode::Avro)
144            | (SourceFormat::Upsert, SourceEncode::Avro) => {
145                let mut config = AvroProperties {
146                    record_name: if info.proto_message_name.is_empty() {
147                        None
148                    } else {
149                        Some(info.proto_message_name.clone())
150                    },
151                    key_record_name: info.key_message_name.clone(),
152                    map_handling: MapHandling::from_options(&format_encode_options_with_secret)?,
153                    ..Default::default()
154                };
155                config.schema_location = if let Some(schema_arn) =
156                    format_encode_options_with_secret.get(AWS_GLUE_SCHEMA_ARN_KEY)
157                {
158                    risingwave_common::license::Feature::GlueSchemaRegistry
159                        .check_available()
160                        .map_err(anyhow::Error::from)?;
161                    SchemaLocation::Glue {
162                        schema_arn: schema_arn.clone(),
163                        aws_auth_props: serde_json::from_value::<AwsAuthProps>(
164                            serde_json::to_value(format_encode_options_with_secret.clone())
165                                .unwrap(),
166                        )
167                        .map_err(|e| anyhow::anyhow!(e))?,
168                        // The option `mock_config` is not public and we can break compatibility.
169                        mock_config: format_encode_options_with_secret
170                            .get("aws.glue.mock_config")
171                            .cloned(),
172                    }
173                } else if info.use_schema_registry {
174                    SchemaLocation::Confluent {
175                        urls: info.row_schema_location.clone(),
176                        client_config: SchemaRegistryConfig::from(
177                            &format_encode_options_with_secret,
178                        ),
179                        name_strategy: PbSchemaRegistryNameStrategy::try_from(info.name_strategy)
180                            .unwrap(),
181                        topic: get_kafka_topic(&options_with_secret)?.clone(),
182                    }
183                } else {
184                    SchemaLocation::File {
185                        url: info.row_schema_location.clone(),
186                        aws_auth_props: Some(
187                            serde_json::from_value::<AwsAuthProps>(
188                                serde_json::to_value(format_encode_options_with_secret.clone())
189                                    .unwrap(),
190                            )
191                            .map_err(|e| anyhow::anyhow!(e))?,
192                        ),
193                    }
194                };
195                EncodingProperties::Avro(config)
196            }
197            (SourceFormat::Plain, SourceEncode::Protobuf)
198            | (SourceFormat::Upsert, SourceEncode::Protobuf) => {
199                if info.row_schema_location.is_empty() {
200                    bail!("protobuf file location not provided");
201                }
202                let mut messages_as_jsonb = if let Some(messages_as_jsonb) =
203                    format_encode_options_with_secret.get(PROTOBUF_MESSAGES_AS_JSONB)
204                {
205                    messages_as_jsonb.split(',').map(|s| s.to_owned()).collect()
206                } else {
207                    HashSet::new()
208                };
209                messages_as_jsonb.insert("google.protobuf.Any".to_owned());
210
211                let mut config = ProtobufProperties {
212                    message_name: info.proto_message_name.clone(),
213                    key_message_name: info.key_message_name.clone(),
214                    messages_as_jsonb,
215                    ..Default::default()
216                };
217                config.schema_location = if info.use_schema_registry {
218                    SchemaLocation::Confluent {
219                        urls: info.row_schema_location.clone(),
220                        client_config: SchemaRegistryConfig::from(
221                            &format_encode_options_with_secret,
222                        ),
223                        name_strategy: PbSchemaRegistryNameStrategy::try_from(info.name_strategy)
224                            .unwrap(),
225                        topic: get_kafka_topic(&options_with_secret)?.clone(),
226                    }
227                } else {
228                    SchemaLocation::File {
229                        url: info.row_schema_location.clone(),
230                        aws_auth_props: Some(
231                            serde_json::from_value::<AwsAuthProps>(
232                                serde_json::to_value(format_encode_options_with_secret.clone())
233                                    .unwrap(),
234                            )
235                            .map_err(|e| anyhow::anyhow!(e))?,
236                        ),
237                    }
238                };
239                EncodingProperties::Protobuf(config)
240            }
241            (SourceFormat::Debezium, SourceEncode::Avro) => {
242                EncodingProperties::Avro(AvroProperties {
243                    record_name: if info.proto_message_name.is_empty() {
244                        None
245                    } else {
246                        Some(info.proto_message_name.clone())
247                    },
248                    key_record_name: info.key_message_name.clone(),
249                    schema_location: SchemaLocation::Confluent {
250                        urls: info.row_schema_location.clone(),
251                        client_config: SchemaRegistryConfig::from(
252                            &format_encode_options_with_secret,
253                        ),
254                        name_strategy: PbSchemaRegistryNameStrategy::try_from(info.name_strategy)
255                            .unwrap(),
256                        topic: get_kafka_topic(&options_with_secret).unwrap().clone(),
257                    },
258                    ..Default::default()
259                })
260            }
261            (
262                SourceFormat::Plain
263                | SourceFormat::Debezium
264                | SourceFormat::Maxwell
265                | SourceFormat::Canal
266                | SourceFormat::Upsert,
267                SourceEncode::Json,
268            ) => EncodingProperties::Json(JsonProperties {
269                use_schema_registry: info.use_schema_registry,
270                timestamp_handling: None,
271                timestamptz_handling: TimestamptzHandling::from_options(
272                    &format_encode_options_with_secret,
273                )?,
274                time_handling: None,
275                handle_toast_columns: false,
276            }),
277            (SourceFormat::DebeziumMongo, SourceEncode::Json) => {
278                let props = MongoProperties::from(&format_encode_options_with_secret);
279                EncodingProperties::MongoJson(props)
280            }
281            (SourceFormat::Plain, SourceEncode::Bytes) => {
282                EncodingProperties::Bytes(BytesProperties { column_name: None })
283            }
284            (SourceFormat::Native, SourceEncode::Native) => EncodingProperties::Native,
285            (SourceFormat::None, SourceEncode::None) => EncodingProperties::None,
286            (format, encode) => {
287                bail!("Unsupported format {:?} encode {:?}", format, encode);
288            }
289        };
290        Ok(Self {
291            encoding_config,
292            protocol_config,
293        })
294    }
295}
296
297#[derive(Debug, Default, Clone)]
298pub struct AvroProperties {
299    pub schema_location: SchemaLocation,
300    pub record_name: Option<String>,
301    pub key_record_name: Option<String>,
302    pub map_handling: Option<MapHandling>,
303}
304
305/// WIP: may cover protobuf and json schema later.
306#[derive(Debug, Clone)]
307pub enum SchemaLocation {
308    /// Avsc from `https://`, `s3://` or `file://`.
309    File {
310        url: String,
311        aws_auth_props: Option<AwsAuthProps>, // for s3
312    },
313    /// <https://docs.confluent.io/platform/current/schema-registry/index.html>
314    Confluent {
315        urls: String,
316        client_config: SchemaRegistryConfig,
317        name_strategy: PbSchemaRegistryNameStrategy,
318        topic: String,
319    },
320    /// <https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html>
321    Glue {
322        schema_arn: String,
323        aws_auth_props: AwsAuthProps,
324        // When `Some(_)`, ignore AWS and load schemas from provided config
325        mock_config: Option<String>,
326    },
327}
328
329// TODO: `SpecificParserConfig` shall not `impl`/`derive` a `Default`
330impl Default for SchemaLocation {
331    fn default() -> Self {
332        // backward compatible but undesired
333        Self::File {
334            url: Default::default(),
335            aws_auth_props: None,
336        }
337    }
338}
339
340#[derive(Debug, Default, Clone)]
341pub struct ProtobufProperties {
342    pub schema_location: SchemaLocation,
343    pub message_name: String,
344    pub key_message_name: Option<String>,
345    pub messages_as_jsonb: HashSet<String>,
346}
347
348#[derive(Debug, Default, Clone, Copy)]
349pub struct CsvProperties {
350    pub delimiter: u8,
351    pub has_header: bool,
352}
353
354#[derive(Debug, Default, Clone)]
355pub struct JsonProperties {
356    pub use_schema_registry: bool,
357    pub timestamp_handling: Option<TimestampHandling>,
358    pub timestamptz_handling: Option<TimestamptzHandling>,
359    pub time_handling: Option<TimeHandling>,
360    pub handle_toast_columns: bool,
361}
362
363#[derive(Debug, Default, Clone)]
364pub struct BytesProperties {
365    pub column_name: Option<String>,
366}
367
368#[derive(Debug, Default, Clone)]
369pub struct MongoProperties {
370    pub strong_schema: bool,
371}
372
373impl MongoProperties {
374    pub fn new(strong_schema: bool) -> Self {
375        Self { strong_schema }
376    }
377}
378impl From<&BTreeMap<String, String>> for MongoProperties {
379    fn from(config: &BTreeMap<String, String>) -> Self {
380        let strong_schema = config
381            .get(CDC_MONGODB_STRONG_SCHEMA_KEY)
382            .is_some_and(|k| k.eq_ignore_ascii_case("true"));
383        Self { strong_schema }
384    }
385}