risingwave_connector/parser/
config.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashSet};
16
17use risingwave_common::bail;
18use risingwave_common::secret::LocalSecretManager;
19use risingwave_connector_codec::decoder::avro::MapHandling;
20use risingwave_pb::catalog::{PbSchemaRegistryNameStrategy, StreamSourceInfo};
21
22use super::unified::json::BigintUnsignedHandlingMode;
23use super::utils::get_kafka_topic;
24use super::{DebeziumProps, TimeHandling, TimestampHandling, TimestamptzHandling};
25use crate::WithOptionsSecResolved;
26use crate::connector_common::AwsAuthProps;
27use crate::error::ConnectorResult;
28use crate::parser::PROTOBUF_MESSAGES_AS_JSONB;
29use crate::schema::AWS_GLUE_SCHEMA_ARN_KEY;
30use crate::schema::schema_registry::SchemaRegistryConfig;
31use crate::source::cdc::CDC_MONGODB_STRONG_SCHEMA_KEY;
32use crate::source::{SourceColumnDesc, SourceEncode, SourceFormat, extract_source_struct};
33
34/// Note: this is created in `SourceReader::build_stream`
35#[derive(Debug, Clone, Default)]
36pub struct ParserConfig {
37    pub common: CommonParserConfig,
38    pub specific: SpecificParserConfig,
39}
40
41impl ParserConfig {
42    pub fn get_config(self) -> (Vec<SourceColumnDesc>, SpecificParserConfig) {
43        (self.common.rw_columns, self.specific)
44    }
45}
46
47#[derive(Debug, Clone, Default)]
48pub struct CommonParserConfig {
49    /// Note: this is created by `SourceDescBuilder::builder`
50    pub rw_columns: Vec<SourceColumnDesc>,
51}
52
53#[derive(Debug, Clone, Default)]
54pub struct SpecificParserConfig {
55    pub encoding_config: EncodingProperties,
56    pub protocol_config: ProtocolProperties,
57}
58
59#[derive(Debug, Default, Clone)]
60pub enum EncodingProperties {
61    Avro(AvroProperties),
62    Protobuf(ProtobufProperties),
63    Csv(CsvProperties),
64    Json(JsonProperties),
65    MongoJson(MongoProperties),
66    Bytes(BytesProperties),
67    Parquet,
68    Native,
69    /// Encoding can't be specified because the source will determines it. Now only used in Iceberg.
70    None,
71    #[default]
72    Unspecified,
73}
74
75#[derive(Debug, Default, Clone)]
76pub enum ProtocolProperties {
77    Debezium(DebeziumProps),
78    DebeziumMongo,
79    Maxwell,
80    Canal,
81    Plain,
82    Upsert,
83    Native,
84    /// Protocol can't be specified because the source will determines it. Now only used in Iceberg.
85    None,
86    #[default]
87    Unspecified,
88}
89
90impl SpecificParserConfig {
91    // for test only
92    pub const DEFAULT_PLAIN_JSON: SpecificParserConfig = SpecificParserConfig {
93        encoding_config: EncodingProperties::Json(JsonProperties {
94            use_schema_registry: false,
95            timestamp_handling: None,
96            timestamptz_handling: None,
97            time_handling: None,
98            bigint_unsigned_handling: None,
99            handle_toast_columns: false,
100        }),
101        protocol_config: ProtocolProperties::Plain,
102    };
103
104    // The validity of (format, encode) is ensured by `extract_format_encode`
105    pub fn new(
106        info: &StreamSourceInfo,
107        with_properties: &WithOptionsSecResolved,
108    ) -> ConnectorResult<Self> {
109        let info = info.clone();
110        let source_struct = extract_source_struct(&info)?;
111        let format_encode_options_with_secret = LocalSecretManager::global()
112            .fill_secrets(info.format_encode_options, info.format_encode_secret_refs)?;
113        let (options, secret_refs) = with_properties.clone().into_parts();
114        // Make sure `with_properties` is no longer used by accident.
115        // All reads shall go to `options_with_secret` instead.
116        #[expect(unused_variables)]
117        let with_properties = ();
118        let options_with_secret =
119            LocalSecretManager::global().fill_secrets(options, secret_refs)?;
120        let format = source_struct.format;
121        let encode = source_struct.encode;
122        // this transformation is needed since there may be config for the protocol
123        // in the future
124        let protocol_config = match format {
125            SourceFormat::Native => ProtocolProperties::Native,
126            SourceFormat::None => ProtocolProperties::None,
127            SourceFormat::Debezium => {
128                let debezium_props = DebeziumProps::from(&format_encode_options_with_secret);
129                ProtocolProperties::Debezium(debezium_props)
130            }
131            SourceFormat::DebeziumMongo => ProtocolProperties::DebeziumMongo,
132            SourceFormat::Maxwell => ProtocolProperties::Maxwell,
133            SourceFormat::Canal => ProtocolProperties::Canal,
134            SourceFormat::Upsert => ProtocolProperties::Upsert,
135            SourceFormat::Plain => ProtocolProperties::Plain,
136            _ => unreachable!(),
137        };
138
139        let encoding_config = match (format, encode) {
140            (SourceFormat::Plain, SourceEncode::Csv) => EncodingProperties::Csv(CsvProperties {
141                delimiter: info.csv_delimiter as u8,
142                has_header: info.csv_has_header,
143            }),
144            (SourceFormat::Plain, SourceEncode::Parquet) => EncodingProperties::Parquet,
145            (SourceFormat::Plain, SourceEncode::Avro)
146            | (SourceFormat::Upsert, SourceEncode::Avro) => {
147                let mut config = AvroProperties {
148                    record_name: if info.proto_message_name.is_empty() {
149                        None
150                    } else {
151                        Some(info.proto_message_name.clone())
152                    },
153                    key_record_name: info.key_message_name.clone(),
154                    map_handling: MapHandling::from_options(&format_encode_options_with_secret)?,
155                    ..Default::default()
156                };
157                config.schema_location = if let Some(schema_arn) =
158                    format_encode_options_with_secret.get(AWS_GLUE_SCHEMA_ARN_KEY)
159                {
160                    risingwave_common::license::Feature::GlueSchemaRegistry
161                        .check_available()
162                        .map_err(anyhow::Error::from)?;
163                    SchemaLocation::Glue {
164                        schema_arn: schema_arn.clone(),
165                        aws_auth_props: serde_json::from_value::<AwsAuthProps>(
166                            serde_json::to_value(format_encode_options_with_secret.clone())
167                                .unwrap(),
168                        )
169                        .map_err(|e| anyhow::anyhow!(e))?,
170                        // The option `mock_config` is not public and we can break compatibility.
171                        mock_config: format_encode_options_with_secret
172                            .get("aws.glue.mock_config")
173                            .cloned(),
174                    }
175                } else if info.use_schema_registry {
176                    SchemaLocation::Confluent {
177                        urls: info.row_schema_location.clone(),
178                        client_config: SchemaRegistryConfig::from(
179                            &format_encode_options_with_secret,
180                        ),
181                        name_strategy: PbSchemaRegistryNameStrategy::try_from(info.name_strategy)
182                            .unwrap(),
183                        topic: get_kafka_topic(&options_with_secret)?.clone(),
184                    }
185                } else {
186                    SchemaLocation::File {
187                        url: info.row_schema_location,
188                        aws_auth_props: Some(
189                            serde_json::from_value::<AwsAuthProps>(
190                                serde_json::to_value(format_encode_options_with_secret).unwrap(),
191                            )
192                            .map_err(|e| anyhow::anyhow!(e))?,
193                        ),
194                    }
195                };
196                EncodingProperties::Avro(config)
197            }
198            (SourceFormat::Plain, SourceEncode::Protobuf)
199            | (SourceFormat::Upsert, SourceEncode::Protobuf) => {
200                if info.row_schema_location.is_empty() {
201                    bail!("protobuf file location not provided");
202                }
203                let mut messages_as_jsonb = if let Some(messages_as_jsonb) =
204                    format_encode_options_with_secret.get(PROTOBUF_MESSAGES_AS_JSONB)
205                {
206                    messages_as_jsonb.split(',').map(|s| s.to_owned()).collect()
207                } else {
208                    HashSet::new()
209                };
210                messages_as_jsonb.insert("google.protobuf.Any".to_owned());
211
212                let mut config = ProtobufProperties {
213                    message_name: info.proto_message_name.clone(),
214                    key_message_name: info.key_message_name.clone(),
215                    messages_as_jsonb,
216                    ..Default::default()
217                };
218                config.schema_location = if info.use_schema_registry {
219                    SchemaLocation::Confluent {
220                        urls: info.row_schema_location.clone(),
221                        client_config: SchemaRegistryConfig::from(
222                            &format_encode_options_with_secret,
223                        ),
224                        name_strategy: PbSchemaRegistryNameStrategy::try_from(info.name_strategy)
225                            .unwrap(),
226                        topic: get_kafka_topic(&options_with_secret)?.clone(),
227                    }
228                } else {
229                    SchemaLocation::File {
230                        url: info.row_schema_location,
231                        aws_auth_props: Some(
232                            serde_json::from_value::<AwsAuthProps>(
233                                serde_json::to_value(format_encode_options_with_secret).unwrap(),
234                            )
235                            .map_err(|e| anyhow::anyhow!(e))?,
236                        ),
237                    }
238                };
239                EncodingProperties::Protobuf(config)
240            }
241            (SourceFormat::Debezium, SourceEncode::Avro) => {
242                EncodingProperties::Avro(AvroProperties {
243                    record_name: if info.proto_message_name.is_empty() {
244                        None
245                    } else {
246                        Some(info.proto_message_name.clone())
247                    },
248                    key_record_name: info.key_message_name.clone(),
249                    schema_location: SchemaLocation::Confluent {
250                        urls: info.row_schema_location.clone(),
251                        client_config: SchemaRegistryConfig::from(
252                            &format_encode_options_with_secret,
253                        ),
254                        name_strategy: PbSchemaRegistryNameStrategy::try_from(info.name_strategy)
255                            .unwrap(),
256                        topic: get_kafka_topic(&options_with_secret).unwrap().clone(),
257                    },
258                    ..Default::default()
259                })
260            }
261            (
262                SourceFormat::Plain
263                | SourceFormat::Debezium
264                | SourceFormat::Maxwell
265                | SourceFormat::Canal
266                | SourceFormat::Upsert,
267                SourceEncode::Json,
268            ) => EncodingProperties::Json(JsonProperties {
269                use_schema_registry: info.use_schema_registry,
270                timestamp_handling: None,
271                timestamptz_handling: TimestamptzHandling::from_options(
272                    &format_encode_options_with_secret,
273                )?,
274                time_handling: None,
275                bigint_unsigned_handling: None,
276                handle_toast_columns: false,
277            }),
278            (SourceFormat::DebeziumMongo, SourceEncode::Json) => {
279                let props = MongoProperties::from(&format_encode_options_with_secret);
280                EncodingProperties::MongoJson(props)
281            }
282            (SourceFormat::Plain, SourceEncode::Bytes) => {
283                EncodingProperties::Bytes(BytesProperties { column_name: None })
284            }
285            (SourceFormat::Native, SourceEncode::Native) => EncodingProperties::Native,
286            (SourceFormat::None, SourceEncode::None) => EncodingProperties::None,
287            (format, encode) => {
288                bail!("Unsupported format {:?} encode {:?}", format, encode);
289            }
290        };
291        Ok(Self {
292            encoding_config,
293            protocol_config,
294        })
295    }
296}
297
298#[derive(Debug, Default, Clone)]
299pub struct AvroProperties {
300    pub schema_location: SchemaLocation,
301    pub record_name: Option<String>,
302    pub key_record_name: Option<String>,
303    pub map_handling: Option<MapHandling>,
304}
305
306/// WIP: may cover protobuf and json schema later.
307#[derive(Debug, Clone)]
308pub enum SchemaLocation {
309    /// Avsc from `https://`, `s3://` or `file://`.
310    File {
311        url: String,
312        aws_auth_props: Option<AwsAuthProps>, // for s3
313    },
314    /// <https://docs.confluent.io/platform/current/schema-registry/index.html>
315    Confluent {
316        urls: String,
317        client_config: SchemaRegistryConfig,
318        name_strategy: PbSchemaRegistryNameStrategy,
319        topic: String,
320    },
321    /// <https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html>
322    Glue {
323        schema_arn: String,
324        aws_auth_props: AwsAuthProps,
325        // When `Some(_)`, ignore AWS and load schemas from provided config
326        mock_config: Option<String>,
327    },
328}
329
330// TODO: `SpecificParserConfig` shall not `impl`/`derive` a `Default`
331impl Default for SchemaLocation {
332    fn default() -> Self {
333        // backward compatible but undesired
334        Self::File {
335            url: Default::default(),
336            aws_auth_props: None,
337        }
338    }
339}
340
341#[derive(Debug, Default, Clone)]
342pub struct ProtobufProperties {
343    pub schema_location: SchemaLocation,
344    pub message_name: String,
345    pub key_message_name: Option<String>,
346    pub messages_as_jsonb: HashSet<String>,
347}
348
349#[derive(Debug, Default, Clone, Copy)]
350pub struct CsvProperties {
351    pub delimiter: u8,
352    pub has_header: bool,
353}
354
355#[derive(Debug, Default, Clone)]
356pub struct JsonProperties {
357    pub use_schema_registry: bool,
358    pub timestamp_handling: Option<TimestampHandling>,
359    pub timestamptz_handling: Option<TimestamptzHandling>,
360    pub time_handling: Option<TimeHandling>,
361    pub bigint_unsigned_handling: Option<BigintUnsignedHandlingMode>,
362    pub handle_toast_columns: bool,
363}
364
365#[derive(Debug, Default, Clone)]
366pub struct BytesProperties {
367    pub column_name: Option<String>,
368}
369
370#[derive(Debug, Default, Clone)]
371pub struct MongoProperties {
372    pub strong_schema: bool,
373}
374
375impl MongoProperties {
376    pub fn new(strong_schema: bool) -> Self {
377        Self { strong_schema }
378    }
379}
380impl From<&BTreeMap<String, String>> for MongoProperties {
381    fn from(config: &BTreeMap<String, String>) -> Self {
382        let strong_schema = config
383            .get(CDC_MONGODB_STRONG_SCHEMA_KEY)
384            .is_some_and(|k| k.eq_ignore_ascii_case("true"));
385        Self { strong_schema }
386    }
387}