risingwave_connector/parser/
config.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashSet};
16
17use risingwave_common::bail;
18use risingwave_common::secret::LocalSecretManager;
19use risingwave_connector_codec::decoder::avro::MapHandling;
20use risingwave_pb::catalog::{PbSchemaRegistryNameStrategy, StreamSourceInfo};
21
22use super::unified::json::BigintUnsignedHandlingMode;
23use super::utils::get_kafka_topic;
24use super::{DebeziumProps, TimeHandling, TimestampHandling, TimestamptzHandling};
25use crate::WithOptionsSecResolved;
26use crate::connector_common::AwsAuthProps;
27use crate::error::ConnectorResult;
28use crate::parser::PROTOBUF_MESSAGES_AS_JSONB;
29use crate::schema::AWS_GLUE_SCHEMA_ARN_KEY;
30use crate::schema::schema_registry::SchemaRegistryConfig;
31use crate::source::cdc::CDC_MONGODB_STRONG_SCHEMA_KEY;
32use crate::source::{SourceColumnDesc, SourceEncode, SourceFormat, extract_source_struct};
33
34pub const PARQUET_CASE_INSENSITIVE_KEY: &str = "parquet.case_insensitive";
35
36/// Note: this is created in `SourceReader::build_stream`
37#[derive(Debug, Clone, Default)]
38pub struct ParserConfig {
39    pub common: CommonParserConfig,
40    pub specific: SpecificParserConfig,
41}
42
43fn parse_parquet_case_insensitive_option(
44    options_with_secret: &BTreeMap<String, String>,
45) -> ConnectorResult<bool> {
46    let value = options_with_secret.get(PARQUET_CASE_INSENSITIVE_KEY);
47    let Some(value) = value else {
48        return Ok(false);
49    };
50    let parsed = match value.trim().to_ascii_lowercase().parse::<bool>() {
51        Ok(parsed) => parsed,
52        Err(_) => bail!(
53            "invalid value for {}, expect true or false",
54            PARQUET_CASE_INSENSITIVE_KEY
55        ),
56    };
57    Ok(parsed)
58}
59
60impl ParserConfig {
61    pub fn get_config(self) -> (Vec<SourceColumnDesc>, SpecificParserConfig) {
62        (self.common.rw_columns, self.specific)
63    }
64}
65
66#[derive(Debug, Clone, Default)]
67pub struct CommonParserConfig {
68    /// Note: this is created by `SourceDescBuilder::builder`
69    pub rw_columns: Vec<SourceColumnDesc>,
70}
71
72#[derive(Debug, Clone, Default)]
73pub struct SpecificParserConfig {
74    pub encoding_config: EncodingProperties,
75    pub protocol_config: ProtocolProperties,
76}
77
78#[derive(Debug, Default, Clone)]
79pub enum EncodingProperties {
80    Avro(AvroProperties),
81    Protobuf(ProtobufProperties),
82    Csv(CsvProperties),
83    Json(JsonProperties),
84    MongoJson(MongoProperties),
85    Bytes(BytesProperties),
86    Parquet(ParquetProperties),
87    Native,
88    /// Encoding can't be specified because the source will determines it. Now only used in Iceberg.
89    None,
90    #[default]
91    Unspecified,
92}
93
94#[derive(Debug, Default, Clone)]
95pub enum ProtocolProperties {
96    Debezium(DebeziumProps),
97    DebeziumMongo,
98    Maxwell,
99    Canal,
100    Plain,
101    Upsert,
102    Native,
103    /// Protocol can't be specified because the source will determines it. Now only used in Iceberg.
104    None,
105    #[default]
106    Unspecified,
107}
108
109impl SpecificParserConfig {
110    // for test only
111    pub const DEFAULT_PLAIN_JSON: SpecificParserConfig = SpecificParserConfig {
112        encoding_config: EncodingProperties::Json(JsonProperties {
113            use_schema_registry: false,
114            timestamp_handling: None,
115            timestamptz_handling: None,
116            time_handling: None,
117            bigint_unsigned_handling: None,
118            handle_toast_columns: false,
119        }),
120        protocol_config: ProtocolProperties::Plain,
121    };
122
123    // The validity of (format, encode) is ensured by `extract_format_encode`
124    pub fn new(
125        info: &StreamSourceInfo,
126        with_properties: &WithOptionsSecResolved,
127    ) -> ConnectorResult<Self> {
128        let info = info.clone();
129        let source_struct = extract_source_struct(&info)?;
130        let format_encode_options_with_secret = LocalSecretManager::global()
131            .fill_secrets(info.format_encode_options, info.format_encode_secret_refs)?;
132        let (options, secret_refs) = with_properties.clone().into_parts();
133        // Make sure `with_properties` is no longer used by accident.
134        // All reads shall go to `options_with_secret` instead.
135        #[expect(unused_variables)]
136        let with_properties = ();
137        let options_with_secret =
138            LocalSecretManager::global().fill_secrets(options, secret_refs)?;
139        let format = source_struct.format;
140        let encode = source_struct.encode;
141        // this transformation is needed since there may be config for the protocol
142        // in the future
143        let protocol_config = match format {
144            SourceFormat::Native => ProtocolProperties::Native,
145            SourceFormat::None => ProtocolProperties::None,
146            SourceFormat::Debezium => {
147                let debezium_props = DebeziumProps::from(&format_encode_options_with_secret);
148                ProtocolProperties::Debezium(debezium_props)
149            }
150            SourceFormat::DebeziumMongo => ProtocolProperties::DebeziumMongo,
151            SourceFormat::Maxwell => ProtocolProperties::Maxwell,
152            SourceFormat::Canal => ProtocolProperties::Canal,
153            SourceFormat::Upsert => ProtocolProperties::Upsert,
154            SourceFormat::Plain => ProtocolProperties::Plain,
155            _ => unreachable!(),
156        };
157
158        let encoding_config = match (format, encode) {
159            (SourceFormat::Plain, SourceEncode::Csv) => EncodingProperties::Csv(CsvProperties {
160                delimiter: info.csv_delimiter as u8,
161                has_header: info.csv_has_header,
162            }),
163            (SourceFormat::Plain, SourceEncode::Parquet) => {
164                let case_insensitive = parse_parquet_case_insensitive_option(&options_with_secret)?;
165                EncodingProperties::Parquet(ParquetProperties { case_insensitive })
166            }
167            (SourceFormat::Plain, SourceEncode::Avro)
168            | (SourceFormat::Upsert, SourceEncode::Avro) => {
169                let mut config = AvroProperties {
170                    record_name: if info.proto_message_name.is_empty() {
171                        None
172                    } else {
173                        Some(info.proto_message_name.clone())
174                    },
175                    key_record_name: info.key_message_name.clone(),
176                    map_handling: MapHandling::from_options(&format_encode_options_with_secret)?,
177                    ..Default::default()
178                };
179                config.schema_location = if let Some(schema_arn) =
180                    format_encode_options_with_secret.get(AWS_GLUE_SCHEMA_ARN_KEY)
181                {
182                    risingwave_common::license::Feature::GlueSchemaRegistry
183                        .check_available()
184                        .map_err(anyhow::Error::from)?;
185                    SchemaLocation::Glue {
186                        schema_arn: schema_arn.clone(),
187                        aws_auth_props: serde_json::from_value::<AwsAuthProps>(
188                            serde_json::to_value(format_encode_options_with_secret.clone())
189                                .unwrap(),
190                        )
191                        .map_err(|e| anyhow::anyhow!(e))?,
192                        // The option `mock_config` is not public and we can break compatibility.
193                        mock_config: format_encode_options_with_secret
194                            .get("aws.glue.mock_config")
195                            .cloned(),
196                    }
197                } else if info.use_schema_registry {
198                    SchemaLocation::Confluent {
199                        urls: info.row_schema_location.clone(),
200                        client_config: SchemaRegistryConfig::from(
201                            &format_encode_options_with_secret,
202                        ),
203                        name_strategy: PbSchemaRegistryNameStrategy::try_from(info.name_strategy)
204                            .unwrap(),
205                        topic: get_kafka_topic(&options_with_secret)?.clone(),
206                    }
207                } else {
208                    SchemaLocation::File {
209                        url: info.row_schema_location,
210                        aws_auth_props: Some(
211                            serde_json::from_value::<AwsAuthProps>(
212                                serde_json::to_value(format_encode_options_with_secret).unwrap(),
213                            )
214                            .map_err(|e| anyhow::anyhow!(e))?,
215                        ),
216                    }
217                };
218                EncodingProperties::Avro(config)
219            }
220            (SourceFormat::Plain, SourceEncode::Protobuf)
221            | (SourceFormat::Upsert, SourceEncode::Protobuf) => {
222                if info.row_schema_location.is_empty() {
223                    bail!("protobuf file location not provided");
224                }
225                let mut messages_as_jsonb = if let Some(messages_as_jsonb) =
226                    format_encode_options_with_secret.get(PROTOBUF_MESSAGES_AS_JSONB)
227                {
228                    messages_as_jsonb.split(',').map(|s| s.to_owned()).collect()
229                } else {
230                    HashSet::new()
231                };
232                messages_as_jsonb.insert("google.protobuf.Any".to_owned());
233
234                let mut config = ProtobufProperties {
235                    message_name: info.proto_message_name.clone(),
236                    key_message_name: info.key_message_name.clone(),
237                    messages_as_jsonb,
238                    ..Default::default()
239                };
240                config.schema_location = if info.use_schema_registry {
241                    SchemaLocation::Confluent {
242                        urls: info.row_schema_location.clone(),
243                        client_config: SchemaRegistryConfig::from(
244                            &format_encode_options_with_secret,
245                        ),
246                        name_strategy: PbSchemaRegistryNameStrategy::try_from(info.name_strategy)
247                            .unwrap(),
248                        topic: get_kafka_topic(&options_with_secret)?.clone(),
249                    }
250                } else {
251                    SchemaLocation::File {
252                        url: info.row_schema_location,
253                        aws_auth_props: Some(
254                            serde_json::from_value::<AwsAuthProps>(
255                                serde_json::to_value(format_encode_options_with_secret).unwrap(),
256                            )
257                            .map_err(|e| anyhow::anyhow!(e))?,
258                        ),
259                    }
260                };
261                EncodingProperties::Protobuf(config)
262            }
263            (SourceFormat::Debezium, SourceEncode::Avro) => {
264                EncodingProperties::Avro(AvroProperties {
265                    record_name: if info.proto_message_name.is_empty() {
266                        None
267                    } else {
268                        Some(info.proto_message_name.clone())
269                    },
270                    key_record_name: info.key_message_name.clone(),
271                    schema_location: SchemaLocation::Confluent {
272                        urls: info.row_schema_location.clone(),
273                        client_config: SchemaRegistryConfig::from(
274                            &format_encode_options_with_secret,
275                        ),
276                        name_strategy: PbSchemaRegistryNameStrategy::try_from(info.name_strategy)
277                            .unwrap(),
278                        topic: get_kafka_topic(&options_with_secret).unwrap().clone(),
279                    },
280                    ..Default::default()
281                })
282            }
283            (
284                SourceFormat::Plain
285                | SourceFormat::Debezium
286                | SourceFormat::Maxwell
287                | SourceFormat::Canal
288                | SourceFormat::Upsert,
289                SourceEncode::Json,
290            ) => EncodingProperties::Json(JsonProperties {
291                use_schema_registry: info.use_schema_registry,
292                timestamp_handling: None,
293                timestamptz_handling: TimestamptzHandling::from_options(
294                    &format_encode_options_with_secret,
295                )?,
296                time_handling: None,
297                bigint_unsigned_handling: None,
298                handle_toast_columns: false,
299            }),
300            (SourceFormat::DebeziumMongo, SourceEncode::Json) => {
301                let props = MongoProperties::from(&format_encode_options_with_secret);
302                EncodingProperties::MongoJson(props)
303            }
304            (SourceFormat::Plain, SourceEncode::Bytes) => {
305                EncodingProperties::Bytes(BytesProperties { column_name: None })
306            }
307            (SourceFormat::Native, SourceEncode::Native) => EncodingProperties::Native,
308            (SourceFormat::None, SourceEncode::None) => EncodingProperties::None,
309            (format, encode) => {
310                bail!("Unsupported format {:?} encode {:?}", format, encode);
311            }
312        };
313        Ok(Self {
314            encoding_config,
315            protocol_config,
316        })
317    }
318}
319
320#[derive(Debug, Default, Clone)]
321pub struct AvroProperties {
322    pub schema_location: SchemaLocation,
323    pub record_name: Option<String>,
324    pub key_record_name: Option<String>,
325    pub map_handling: Option<MapHandling>,
326}
327
328/// WIP: may cover protobuf and json schema later.
329#[derive(Debug, Clone)]
330pub enum SchemaLocation {
331    /// Avsc from `https://`, `s3://` or `file://`.
332    File {
333        url: String,
334        aws_auth_props: Option<AwsAuthProps>, // for s3
335    },
336    /// <https://docs.confluent.io/platform/current/schema-registry/index.html>
337    Confluent {
338        urls: String,
339        client_config: SchemaRegistryConfig,
340        name_strategy: PbSchemaRegistryNameStrategy,
341        topic: String,
342    },
343    /// <https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html>
344    Glue {
345        schema_arn: String,
346        aws_auth_props: AwsAuthProps,
347        // When `Some(_)`, ignore AWS and load schemas from provided config
348        mock_config: Option<String>,
349    },
350}
351
352// TODO: `SpecificParserConfig` shall not `impl`/`derive` a `Default`
353impl Default for SchemaLocation {
354    fn default() -> Self {
355        // backward compatible but undesired
356        Self::File {
357            url: Default::default(),
358            aws_auth_props: None,
359        }
360    }
361}
362
363#[derive(Debug, Default, Clone)]
364pub struct ProtobufProperties {
365    pub schema_location: SchemaLocation,
366    pub message_name: String,
367    pub key_message_name: Option<String>,
368    pub messages_as_jsonb: HashSet<String>,
369}
370
371#[derive(Debug, Default, Clone, Copy)]
372pub struct CsvProperties {
373    pub delimiter: u8,
374    pub has_header: bool,
375}
376
377#[derive(Debug, Default, Clone, Copy)]
378pub struct ParquetProperties {
379    pub case_insensitive: bool,
380}
381
382#[derive(Debug, Default, Clone)]
383pub struct JsonProperties {
384    pub use_schema_registry: bool,
385    pub timestamp_handling: Option<TimestampHandling>,
386    pub timestamptz_handling: Option<TimestamptzHandling>,
387    pub time_handling: Option<TimeHandling>,
388    pub bigint_unsigned_handling: Option<BigintUnsignedHandlingMode>,
389    pub handle_toast_columns: bool,
390}
391
392#[derive(Debug, Default, Clone)]
393pub struct BytesProperties {
394    pub column_name: Option<String>,
395}
396
397#[derive(Debug, Default, Clone)]
398pub struct MongoProperties {
399    pub strong_schema: bool,
400}
401
402impl MongoProperties {
403    pub fn new(strong_schema: bool) -> Self {
404        Self { strong_schema }
405    }
406}
407impl From<&BTreeMap<String, String>> for MongoProperties {
408    fn from(config: &BTreeMap<String, String>) -> Self {
409        let strong_schema = config
410            .get(CDC_MONGODB_STRONG_SCHEMA_KEY)
411            .is_some_and(|k| k.eq_ignore_ascii_case("true"));
412        Self { strong_schema }
413    }
414}