risingwave_connector/parser/
config.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashSet};
16
17use risingwave_common::bail;
18use risingwave_common::secret::LocalSecretManager;
19use risingwave_connector_codec::decoder::avro::MapHandling;
20use risingwave_pb::catalog::{PbSchemaRegistryNameStrategy, StreamSourceInfo};
21
22use super::utils::get_kafka_topic;
23use super::{DebeziumProps, TimeHandling, TimestampHandling, TimestamptzHandling};
24use crate::WithOptionsSecResolved;
25use crate::connector_common::AwsAuthProps;
26use crate::error::ConnectorResult;
27use crate::parser::PROTOBUF_MESSAGES_AS_JSONB;
28use crate::schema::AWS_GLUE_SCHEMA_ARN_KEY;
29use crate::schema::schema_registry::SchemaRegistryConfig;
30use crate::source::cdc::CDC_MONGODB_STRONG_SCHEMA_KEY;
31use crate::source::{SourceColumnDesc, SourceEncode, SourceFormat, extract_source_struct};
32
33/// Note: this is created in `SourceReader::build_stream`
34#[derive(Debug, Clone, Default)]
35pub struct ParserConfig {
36    pub common: CommonParserConfig,
37    pub specific: SpecificParserConfig,
38}
39
40impl ParserConfig {
41    pub fn get_config(self) -> (Vec<SourceColumnDesc>, SpecificParserConfig) {
42        (self.common.rw_columns, self.specific)
43    }
44}
45
46#[derive(Debug, Clone, Default)]
47pub struct CommonParserConfig {
48    /// Note: this is created by `SourceDescBuilder::builder`
49    pub rw_columns: Vec<SourceColumnDesc>,
50}
51
52#[derive(Debug, Clone, Default)]
53pub struct SpecificParserConfig {
54    pub encoding_config: EncodingProperties,
55    pub protocol_config: ProtocolProperties,
56}
57
58#[derive(Debug, Default, Clone)]
59pub enum EncodingProperties {
60    Avro(AvroProperties),
61    Protobuf(ProtobufProperties),
62    Csv(CsvProperties),
63    Json(JsonProperties),
64    MongoJson(MongoProperties),
65    Bytes(BytesProperties),
66    Parquet,
67    Native,
68    /// Encoding can't be specified because the source will determines it. Now only used in Iceberg.
69    None,
70    #[default]
71    Unspecified,
72}
73
74#[derive(Debug, Default, Clone)]
75pub enum ProtocolProperties {
76    Debezium(DebeziumProps),
77    DebeziumMongo,
78    Maxwell,
79    Canal,
80    Plain,
81    Upsert,
82    Native,
83    /// Protocol can't be specified because the source will determines it. Now only used in Iceberg.
84    None,
85    #[default]
86    Unspecified,
87}
88
89impl SpecificParserConfig {
90    // for test only
91    pub const DEFAULT_PLAIN_JSON: SpecificParserConfig = SpecificParserConfig {
92        encoding_config: EncodingProperties::Json(JsonProperties {
93            use_schema_registry: false,
94            timestamp_handling: None,
95            timestamptz_handling: None,
96            time_handling: None,
97        }),
98        protocol_config: ProtocolProperties::Plain,
99    };
100
101    // The validity of (format, encode) is ensured by `extract_format_encode`
102    pub fn new(
103        info: &StreamSourceInfo,
104        with_properties: &WithOptionsSecResolved,
105    ) -> ConnectorResult<Self> {
106        let info = info.clone();
107        let source_struct = extract_source_struct(&info)?;
108        let format_encode_options_with_secret = LocalSecretManager::global()
109            .fill_secrets(info.format_encode_options, info.format_encode_secret_refs)?;
110        let (options, secret_refs) = with_properties.clone().into_parts();
111        // Make sure `with_properties` is no longer used by accident.
112        // All reads shall go to `options_with_secret` instead.
113        #[expect(unused_variables)]
114        let with_properties = ();
115        let options_with_secret =
116            LocalSecretManager::global().fill_secrets(options, secret_refs)?;
117        let format = source_struct.format;
118        let encode = source_struct.encode;
119        // this transformation is needed since there may be config for the protocol
120        // in the future
121        let protocol_config = match format {
122            SourceFormat::Native => ProtocolProperties::Native,
123            SourceFormat::None => ProtocolProperties::None,
124            SourceFormat::Debezium => {
125                let debezium_props = DebeziumProps::from(&format_encode_options_with_secret);
126                ProtocolProperties::Debezium(debezium_props)
127            }
128            SourceFormat::DebeziumMongo => ProtocolProperties::DebeziumMongo,
129            SourceFormat::Maxwell => ProtocolProperties::Maxwell,
130            SourceFormat::Canal => ProtocolProperties::Canal,
131            SourceFormat::Upsert => ProtocolProperties::Upsert,
132            SourceFormat::Plain => ProtocolProperties::Plain,
133            _ => unreachable!(),
134        };
135
136        let encoding_config = match (format, encode) {
137            (SourceFormat::Plain, SourceEncode::Csv) => EncodingProperties::Csv(CsvProperties {
138                delimiter: info.csv_delimiter as u8,
139                has_header: info.csv_has_header,
140            }),
141            (SourceFormat::Plain, SourceEncode::Parquet) => EncodingProperties::Parquet,
142            (SourceFormat::Plain, SourceEncode::Avro)
143            | (SourceFormat::Upsert, SourceEncode::Avro) => {
144                let mut config = AvroProperties {
145                    record_name: if info.proto_message_name.is_empty() {
146                        None
147                    } else {
148                        Some(info.proto_message_name.clone())
149                    },
150                    key_record_name: info.key_message_name.clone(),
151                    map_handling: MapHandling::from_options(&format_encode_options_with_secret)?,
152                    ..Default::default()
153                };
154                config.schema_location = if let Some(schema_arn) =
155                    format_encode_options_with_secret.get(AWS_GLUE_SCHEMA_ARN_KEY)
156                {
157                    risingwave_common::license::Feature::GlueSchemaRegistry
158                        .check_available()
159                        .map_err(anyhow::Error::from)?;
160                    SchemaLocation::Glue {
161                        schema_arn: schema_arn.clone(),
162                        aws_auth_props: serde_json::from_value::<AwsAuthProps>(
163                            serde_json::to_value(format_encode_options_with_secret.clone())
164                                .unwrap(),
165                        )
166                        .map_err(|e| anyhow::anyhow!(e))?,
167                        // The option `mock_config` is not public and we can break compatibility.
168                        mock_config: format_encode_options_with_secret
169                            .get("aws.glue.mock_config")
170                            .cloned(),
171                    }
172                } else if info.use_schema_registry {
173                    SchemaLocation::Confluent {
174                        urls: info.row_schema_location.clone(),
175                        client_config: SchemaRegistryConfig::from(
176                            &format_encode_options_with_secret,
177                        ),
178                        name_strategy: PbSchemaRegistryNameStrategy::try_from(info.name_strategy)
179                            .unwrap(),
180                        topic: get_kafka_topic(&options_with_secret)?.clone(),
181                    }
182                } else {
183                    SchemaLocation::File {
184                        url: info.row_schema_location.clone(),
185                        aws_auth_props: Some(
186                            serde_json::from_value::<AwsAuthProps>(
187                                serde_json::to_value(format_encode_options_with_secret.clone())
188                                    .unwrap(),
189                            )
190                            .map_err(|e| anyhow::anyhow!(e))?,
191                        ),
192                    }
193                };
194                EncodingProperties::Avro(config)
195            }
196            (SourceFormat::Plain, SourceEncode::Protobuf)
197            | (SourceFormat::Upsert, SourceEncode::Protobuf) => {
198                if info.row_schema_location.is_empty() {
199                    bail!("protobuf file location not provided");
200                }
201                let mut messages_as_jsonb = if let Some(messages_as_jsonb) =
202                    format_encode_options_with_secret.get(PROTOBUF_MESSAGES_AS_JSONB)
203                {
204                    messages_as_jsonb.split(',').map(|s| s.to_owned()).collect()
205                } else {
206                    HashSet::new()
207                };
208                messages_as_jsonb.insert("google.protobuf.Any".to_owned());
209
210                let mut config = ProtobufProperties {
211                    message_name: info.proto_message_name.clone(),
212                    key_message_name: info.key_message_name.clone(),
213                    messages_as_jsonb,
214                    ..Default::default()
215                };
216                config.schema_location = if info.use_schema_registry {
217                    SchemaLocation::Confluent {
218                        urls: info.row_schema_location.clone(),
219                        client_config: SchemaRegistryConfig::from(
220                            &format_encode_options_with_secret,
221                        ),
222                        name_strategy: PbSchemaRegistryNameStrategy::try_from(info.name_strategy)
223                            .unwrap(),
224                        topic: get_kafka_topic(&options_with_secret)?.clone(),
225                    }
226                } else {
227                    SchemaLocation::File {
228                        url: info.row_schema_location.clone(),
229                        aws_auth_props: Some(
230                            serde_json::from_value::<AwsAuthProps>(
231                                serde_json::to_value(format_encode_options_with_secret.clone())
232                                    .unwrap(),
233                            )
234                            .map_err(|e| anyhow::anyhow!(e))?,
235                        ),
236                    }
237                };
238                EncodingProperties::Protobuf(config)
239            }
240            (SourceFormat::Debezium, SourceEncode::Avro) => {
241                EncodingProperties::Avro(AvroProperties {
242                    record_name: if info.proto_message_name.is_empty() {
243                        None
244                    } else {
245                        Some(info.proto_message_name.clone())
246                    },
247                    key_record_name: info.key_message_name.clone(),
248                    schema_location: SchemaLocation::Confluent {
249                        urls: info.row_schema_location.clone(),
250                        client_config: SchemaRegistryConfig::from(
251                            &format_encode_options_with_secret,
252                        ),
253                        name_strategy: PbSchemaRegistryNameStrategy::try_from(info.name_strategy)
254                            .unwrap(),
255                        topic: get_kafka_topic(&options_with_secret).unwrap().clone(),
256                    },
257                    ..Default::default()
258                })
259            }
260            (
261                SourceFormat::Plain
262                | SourceFormat::Debezium
263                | SourceFormat::Maxwell
264                | SourceFormat::Canal
265                | SourceFormat::Upsert,
266                SourceEncode::Json,
267            ) => EncodingProperties::Json(JsonProperties {
268                use_schema_registry: info.use_schema_registry,
269                timestamp_handling: None,
270                timestamptz_handling: TimestamptzHandling::from_options(
271                    &format_encode_options_with_secret,
272                )?,
273                time_handling: None,
274            }),
275            (SourceFormat::DebeziumMongo, SourceEncode::Json) => {
276                let props = MongoProperties::from(&format_encode_options_with_secret);
277                EncodingProperties::MongoJson(props)
278            }
279            (SourceFormat::Plain, SourceEncode::Bytes) => {
280                EncodingProperties::Bytes(BytesProperties { column_name: None })
281            }
282            (SourceFormat::Native, SourceEncode::Native) => EncodingProperties::Native,
283            (SourceFormat::None, SourceEncode::None) => EncodingProperties::None,
284            (format, encode) => {
285                bail!("Unsupported format {:?} encode {:?}", format, encode);
286            }
287        };
288        Ok(Self {
289            encoding_config,
290            protocol_config,
291        })
292    }
293}
294
295#[derive(Debug, Default, Clone)]
296pub struct AvroProperties {
297    pub schema_location: SchemaLocation,
298    pub record_name: Option<String>,
299    pub key_record_name: Option<String>,
300    pub map_handling: Option<MapHandling>,
301}
302
303/// WIP: may cover protobuf and json schema later.
304#[derive(Debug, Clone)]
305pub enum SchemaLocation {
306    /// Avsc from `https://`, `s3://` or `file://`.
307    File {
308        url: String,
309        aws_auth_props: Option<AwsAuthProps>, // for s3
310    },
311    /// <https://docs.confluent.io/platform/current/schema-registry/index.html>
312    Confluent {
313        urls: String,
314        client_config: SchemaRegistryConfig,
315        name_strategy: PbSchemaRegistryNameStrategy,
316        topic: String,
317    },
318    /// <https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html>
319    Glue {
320        schema_arn: String,
321        aws_auth_props: AwsAuthProps,
322        // When `Some(_)`, ignore AWS and load schemas from provided config
323        mock_config: Option<String>,
324    },
325}
326
327// TODO: `SpecificParserConfig` shall not `impl`/`derive` a `Default`
328impl Default for SchemaLocation {
329    fn default() -> Self {
330        // backward compatible but undesired
331        Self::File {
332            url: Default::default(),
333            aws_auth_props: None,
334        }
335    }
336}
337
338#[derive(Debug, Default, Clone)]
339pub struct ProtobufProperties {
340    pub schema_location: SchemaLocation,
341    pub message_name: String,
342    pub key_message_name: Option<String>,
343    pub messages_as_jsonb: HashSet<String>,
344}
345
346#[derive(Debug, Default, Clone, Copy)]
347pub struct CsvProperties {
348    pub delimiter: u8,
349    pub has_header: bool,
350}
351
352#[derive(Debug, Default, Clone)]
353pub struct JsonProperties {
354    pub use_schema_registry: bool,
355    pub timestamp_handling: Option<TimestampHandling>,
356    pub timestamptz_handling: Option<TimestamptzHandling>,
357    pub time_handling: Option<TimeHandling>,
358}
359
360#[derive(Debug, Default, Clone)]
361pub struct BytesProperties {
362    pub column_name: Option<String>,
363}
364
365#[derive(Debug, Default, Clone)]
366pub struct MongoProperties {
367    pub strong_schema: bool,
368}
369
370impl MongoProperties {
371    pub fn new(strong_schema: bool) -> Self {
372        Self { strong_schema }
373    }
374}
375impl From<&BTreeMap<String, String>> for MongoProperties {
376    fn from(config: &BTreeMap<String, String>) -> Self {
377        let strong_schema = config
378            .get(CDC_MONGODB_STRONG_SCHEMA_KEY)
379            .is_some_and(|k| k.eq_ignore_ascii_case("true"));
380        Self { strong_schema }
381    }
382}