1use std::collections::{BTreeMap, HashSet};
16
17use risingwave_common::bail;
18use risingwave_common::secret::LocalSecretManager;
19use risingwave_connector_codec::decoder::avro::MapHandling;
20use risingwave_pb::catalog::{PbSchemaRegistryNameStrategy, StreamSourceInfo};
21
22use super::unified::json::BigintUnsignedHandlingMode;
23use super::utils::get_kafka_topic;
24use super::{DebeziumProps, TimeHandling, TimestampHandling, TimestamptzHandling};
25use crate::WithOptionsSecResolved;
26use crate::connector_common::AwsAuthProps;
27use crate::error::ConnectorResult;
28use crate::parser::PROTOBUF_MESSAGES_AS_JSONB;
29use crate::schema::AWS_GLUE_SCHEMA_ARN_KEY;
30use crate::schema::schema_registry::SchemaRegistryConfig;
31use crate::source::cdc::CDC_MONGODB_STRONG_SCHEMA_KEY;
32use crate::source::{SourceColumnDesc, SourceEncode, SourceFormat, extract_source_struct};
33
34pub const PARQUET_CASE_INSENSITIVE_KEY: &str = "parquet.case_insensitive";
35
36#[derive(Debug, Clone, Default)]
38pub struct ParserConfig {
39 pub common: CommonParserConfig,
40 pub specific: SpecificParserConfig,
41}
42
43fn parse_parquet_case_insensitive_option(
44 options_with_secret: &BTreeMap<String, String>,
45) -> ConnectorResult<bool> {
46 let value = options_with_secret.get(PARQUET_CASE_INSENSITIVE_KEY);
47 let Some(value) = value else {
48 return Ok(false);
49 };
50 let parsed = match value.trim().to_ascii_lowercase().parse::<bool>() {
51 Ok(parsed) => parsed,
52 Err(_) => bail!(
53 "invalid value for {}, expect true or false",
54 PARQUET_CASE_INSENSITIVE_KEY
55 ),
56 };
57 Ok(parsed)
58}
59
60impl ParserConfig {
61 pub fn get_config(self) -> (Vec<SourceColumnDesc>, SpecificParserConfig) {
62 (self.common.rw_columns, self.specific)
63 }
64}
65
66#[derive(Debug, Clone, Default)]
67pub struct CommonParserConfig {
68 pub rw_columns: Vec<SourceColumnDesc>,
70}
71
72#[derive(Debug, Clone, Default)]
73pub struct SpecificParserConfig {
74 pub encoding_config: EncodingProperties,
75 pub protocol_config: ProtocolProperties,
76}
77
78#[derive(Debug, Default, Clone)]
79pub enum EncodingProperties {
80 Avro(AvroProperties),
81 Protobuf(ProtobufProperties),
82 Csv(CsvProperties),
83 Json(JsonProperties),
84 MongoJson(MongoProperties),
85 Bytes(BytesProperties),
86 Parquet(ParquetProperties),
87 Native,
88 None,
90 #[default]
91 Unspecified,
92}
93
94#[derive(Debug, Default, Clone)]
95pub enum ProtocolProperties {
96 Debezium(DebeziumProps),
97 DebeziumMongo,
98 Maxwell,
99 Canal,
100 Plain,
101 Upsert,
102 Native,
103 None,
105 #[default]
106 Unspecified,
107}
108
109impl SpecificParserConfig {
110 pub const DEFAULT_PLAIN_JSON: SpecificParserConfig = SpecificParserConfig {
112 encoding_config: EncodingProperties::Json(JsonProperties {
113 use_schema_registry: false,
114 timestamp_handling: None,
115 timestamptz_handling: None,
116 time_handling: None,
117 bigint_unsigned_handling: None,
118 handle_toast_columns: false,
119 }),
120 protocol_config: ProtocolProperties::Plain,
121 };
122
123 pub fn new(
125 info: &StreamSourceInfo,
126 with_properties: &WithOptionsSecResolved,
127 ) -> ConnectorResult<Self> {
128 let info = info.clone();
129 let source_struct = extract_source_struct(&info)?;
130 let format_encode_options_with_secret = LocalSecretManager::global()
131 .fill_secrets(info.format_encode_options, info.format_encode_secret_refs)?;
132 let (options, secret_refs) = with_properties.clone().into_parts();
133 #[expect(unused_variables)]
136 let with_properties = ();
137 let options_with_secret =
138 LocalSecretManager::global().fill_secrets(options, secret_refs)?;
139 let format = source_struct.format;
140 let encode = source_struct.encode;
141 let protocol_config = match format {
144 SourceFormat::Native => ProtocolProperties::Native,
145 SourceFormat::None => ProtocolProperties::None,
146 SourceFormat::Debezium => {
147 let debezium_props = DebeziumProps::from(&format_encode_options_with_secret);
148 ProtocolProperties::Debezium(debezium_props)
149 }
150 SourceFormat::DebeziumMongo => ProtocolProperties::DebeziumMongo,
151 SourceFormat::Maxwell => ProtocolProperties::Maxwell,
152 SourceFormat::Canal => ProtocolProperties::Canal,
153 SourceFormat::Upsert => ProtocolProperties::Upsert,
154 SourceFormat::Plain => ProtocolProperties::Plain,
155 _ => unreachable!(),
156 };
157
158 let encoding_config = match (format, encode) {
159 (SourceFormat::Plain, SourceEncode::Csv) => EncodingProperties::Csv(CsvProperties {
160 delimiter: info.csv_delimiter as u8,
161 has_header: info.csv_has_header,
162 }),
163 (SourceFormat::Plain, SourceEncode::Parquet) => {
164 let case_insensitive = parse_parquet_case_insensitive_option(&options_with_secret)?;
165 EncodingProperties::Parquet(ParquetProperties { case_insensitive })
166 }
167 (SourceFormat::Plain, SourceEncode::Avro)
168 | (SourceFormat::Upsert, SourceEncode::Avro) => {
169 let mut config = AvroProperties {
170 record_name: if info.proto_message_name.is_empty() {
171 None
172 } else {
173 Some(info.proto_message_name.clone())
174 },
175 key_record_name: info.key_message_name.clone(),
176 map_handling: MapHandling::from_options(&format_encode_options_with_secret)?,
177 ..Default::default()
178 };
179 config.schema_location = if let Some(schema_arn) =
180 format_encode_options_with_secret.get(AWS_GLUE_SCHEMA_ARN_KEY)
181 {
182 risingwave_common::license::Feature::GlueSchemaRegistry
183 .check_available()
184 .map_err(anyhow::Error::from)?;
185 SchemaLocation::Glue {
186 schema_arn: schema_arn.clone(),
187 aws_auth_props: serde_json::from_value::<AwsAuthProps>(
188 serde_json::to_value(format_encode_options_with_secret.clone())
189 .unwrap(),
190 )
191 .map_err(|e| anyhow::anyhow!(e))?,
192 mock_config: format_encode_options_with_secret
194 .get("aws.glue.mock_config")
195 .cloned(),
196 }
197 } else if info.use_schema_registry {
198 SchemaLocation::Confluent {
199 urls: info.row_schema_location.clone(),
200 client_config: SchemaRegistryConfig::from(
201 &format_encode_options_with_secret,
202 ),
203 name_strategy: PbSchemaRegistryNameStrategy::try_from(info.name_strategy)
204 .unwrap(),
205 topic: get_kafka_topic(&options_with_secret)?.clone(),
206 }
207 } else {
208 SchemaLocation::File {
209 url: info.row_schema_location,
210 aws_auth_props: Some(
211 serde_json::from_value::<AwsAuthProps>(
212 serde_json::to_value(format_encode_options_with_secret).unwrap(),
213 )
214 .map_err(|e| anyhow::anyhow!(e))?,
215 ),
216 }
217 };
218 EncodingProperties::Avro(config)
219 }
220 (SourceFormat::Plain, SourceEncode::Protobuf)
221 | (SourceFormat::Upsert, SourceEncode::Protobuf) => {
222 if info.row_schema_location.is_empty() {
223 bail!("protobuf file location not provided");
224 }
225 let mut messages_as_jsonb = if let Some(messages_as_jsonb) =
226 format_encode_options_with_secret.get(PROTOBUF_MESSAGES_AS_JSONB)
227 {
228 messages_as_jsonb.split(',').map(|s| s.to_owned()).collect()
229 } else {
230 HashSet::new()
231 };
232 messages_as_jsonb.insert("google.protobuf.Any".to_owned());
233
234 let mut config = ProtobufProperties {
235 message_name: info.proto_message_name.clone(),
236 key_message_name: info.key_message_name.clone(),
237 messages_as_jsonb,
238 ..Default::default()
239 };
240 config.schema_location = if info.use_schema_registry {
241 SchemaLocation::Confluent {
242 urls: info.row_schema_location.clone(),
243 client_config: SchemaRegistryConfig::from(
244 &format_encode_options_with_secret,
245 ),
246 name_strategy: PbSchemaRegistryNameStrategy::try_from(info.name_strategy)
247 .unwrap(),
248 topic: get_kafka_topic(&options_with_secret)?.clone(),
249 }
250 } else {
251 SchemaLocation::File {
252 url: info.row_schema_location,
253 aws_auth_props: Some(
254 serde_json::from_value::<AwsAuthProps>(
255 serde_json::to_value(format_encode_options_with_secret).unwrap(),
256 )
257 .map_err(|e| anyhow::anyhow!(e))?,
258 ),
259 }
260 };
261 EncodingProperties::Protobuf(config)
262 }
263 (SourceFormat::Debezium, SourceEncode::Avro) => {
264 EncodingProperties::Avro(AvroProperties {
265 record_name: if info.proto_message_name.is_empty() {
266 None
267 } else {
268 Some(info.proto_message_name.clone())
269 },
270 key_record_name: info.key_message_name.clone(),
271 schema_location: SchemaLocation::Confluent {
272 urls: info.row_schema_location.clone(),
273 client_config: SchemaRegistryConfig::from(
274 &format_encode_options_with_secret,
275 ),
276 name_strategy: PbSchemaRegistryNameStrategy::try_from(info.name_strategy)
277 .unwrap(),
278 topic: get_kafka_topic(&options_with_secret).unwrap().clone(),
279 },
280 ..Default::default()
281 })
282 }
283 (
284 SourceFormat::Plain
285 | SourceFormat::Debezium
286 | SourceFormat::Maxwell
287 | SourceFormat::Canal
288 | SourceFormat::Upsert,
289 SourceEncode::Json,
290 ) => EncodingProperties::Json(JsonProperties {
291 use_schema_registry: info.use_schema_registry,
292 timestamp_handling: None,
293 timestamptz_handling: TimestamptzHandling::from_options(
294 &format_encode_options_with_secret,
295 )?,
296 time_handling: None,
297 bigint_unsigned_handling: None,
298 handle_toast_columns: false,
299 }),
300 (SourceFormat::DebeziumMongo, SourceEncode::Json) => {
301 let props = MongoProperties::from(&format_encode_options_with_secret);
302 EncodingProperties::MongoJson(props)
303 }
304 (SourceFormat::Plain, SourceEncode::Bytes) => {
305 EncodingProperties::Bytes(BytesProperties { column_name: None })
306 }
307 (SourceFormat::Native, SourceEncode::Native) => EncodingProperties::Native,
308 (SourceFormat::None, SourceEncode::None) => EncodingProperties::None,
309 (format, encode) => {
310 bail!("Unsupported format {:?} encode {:?}", format, encode);
311 }
312 };
313 Ok(Self {
314 encoding_config,
315 protocol_config,
316 })
317 }
318}
319
320#[derive(Debug, Default, Clone)]
321pub struct AvroProperties {
322 pub schema_location: SchemaLocation,
323 pub record_name: Option<String>,
324 pub key_record_name: Option<String>,
325 pub map_handling: Option<MapHandling>,
326}
327
328#[derive(Debug, Clone)]
330pub enum SchemaLocation {
331 File {
333 url: String,
334 aws_auth_props: Option<AwsAuthProps>, },
336 Confluent {
338 urls: String,
339 client_config: SchemaRegistryConfig,
340 name_strategy: PbSchemaRegistryNameStrategy,
341 topic: String,
342 },
343 Glue {
345 schema_arn: String,
346 aws_auth_props: AwsAuthProps,
347 mock_config: Option<String>,
349 },
350}
351
352impl Default for SchemaLocation {
354 fn default() -> Self {
355 Self::File {
357 url: Default::default(),
358 aws_auth_props: None,
359 }
360 }
361}
362
363#[derive(Debug, Default, Clone)]
364pub struct ProtobufProperties {
365 pub schema_location: SchemaLocation,
366 pub message_name: String,
367 pub key_message_name: Option<String>,
368 pub messages_as_jsonb: HashSet<String>,
369}
370
371#[derive(Debug, Default, Clone, Copy)]
372pub struct CsvProperties {
373 pub delimiter: u8,
374 pub has_header: bool,
375}
376
377#[derive(Debug, Default, Clone, Copy)]
378pub struct ParquetProperties {
379 pub case_insensitive: bool,
380}
381
382#[derive(Debug, Default, Clone)]
383pub struct JsonProperties {
384 pub use_schema_registry: bool,
385 pub timestamp_handling: Option<TimestampHandling>,
386 pub timestamptz_handling: Option<TimestamptzHandling>,
387 pub time_handling: Option<TimeHandling>,
388 pub bigint_unsigned_handling: Option<BigintUnsignedHandlingMode>,
389 pub handle_toast_columns: bool,
390}
391
392#[derive(Debug, Default, Clone)]
393pub struct BytesProperties {
394 pub column_name: Option<String>,
395}
396
397#[derive(Debug, Default, Clone)]
398pub struct MongoProperties {
399 pub strong_schema: bool,
400}
401
402impl MongoProperties {
403 pub fn new(strong_schema: bool) -> Self {
404 Self { strong_schema }
405 }
406}
407impl From<&BTreeMap<String, String>> for MongoProperties {
408 fn from(config: &BTreeMap<String, String>) -> Self {
409 let strong_schema = config
410 .get(CDC_MONGODB_STRONG_SCHEMA_KEY)
411 .is_some_and(|k| k.eq_ignore_ascii_case("true"));
412 Self { strong_schema }
413 }
414}