1use std::collections::{BTreeMap, HashSet};
16
17use risingwave_common::bail;
18use risingwave_common::secret::LocalSecretManager;
19use risingwave_connector_codec::decoder::avro::MapHandling;
20use risingwave_pb::catalog::{PbSchemaRegistryNameStrategy, StreamSourceInfo};
21
22use super::unified::json::BigintUnsignedHandlingMode;
23use super::utils::get_kafka_topic;
24use super::{DebeziumProps, TimeHandling, TimestampHandling, TimestamptzHandling};
25use crate::WithOptionsSecResolved;
26use crate::connector_common::AwsAuthProps;
27use crate::error::ConnectorResult;
28use crate::parser::PROTOBUF_MESSAGES_AS_JSONB;
29use crate::schema::AWS_GLUE_SCHEMA_ARN_KEY;
30use crate::schema::schema_registry::SchemaRegistryConfig;
31use crate::source::cdc::CDC_MONGODB_STRONG_SCHEMA_KEY;
32use crate::source::{SourceColumnDesc, SourceEncode, SourceFormat, extract_source_struct};
33
34#[derive(Debug, Clone, Default)]
36pub struct ParserConfig {
37 pub common: CommonParserConfig,
38 pub specific: SpecificParserConfig,
39}
40
41impl ParserConfig {
42 pub fn get_config(self) -> (Vec<SourceColumnDesc>, SpecificParserConfig) {
43 (self.common.rw_columns, self.specific)
44 }
45}
46
47#[derive(Debug, Clone, Default)]
48pub struct CommonParserConfig {
49 pub rw_columns: Vec<SourceColumnDesc>,
51}
52
53#[derive(Debug, Clone, Default)]
54pub struct SpecificParserConfig {
55 pub encoding_config: EncodingProperties,
56 pub protocol_config: ProtocolProperties,
57}
58
59#[derive(Debug, Default, Clone)]
60pub enum EncodingProperties {
61 Avro(AvroProperties),
62 Protobuf(ProtobufProperties),
63 Csv(CsvProperties),
64 Json(JsonProperties),
65 MongoJson(MongoProperties),
66 Bytes(BytesProperties),
67 Parquet,
68 Native,
69 None,
71 #[default]
72 Unspecified,
73}
74
75#[derive(Debug, Default, Clone)]
76pub enum ProtocolProperties {
77 Debezium(DebeziumProps),
78 DebeziumMongo,
79 Maxwell,
80 Canal,
81 Plain,
82 Upsert,
83 Native,
84 None,
86 #[default]
87 Unspecified,
88}
89
90impl SpecificParserConfig {
91 pub const DEFAULT_PLAIN_JSON: SpecificParserConfig = SpecificParserConfig {
93 encoding_config: EncodingProperties::Json(JsonProperties {
94 use_schema_registry: false,
95 timestamp_handling: None,
96 timestamptz_handling: None,
97 time_handling: None,
98 bigint_unsigned_handling: None,
99 handle_toast_columns: false,
100 }),
101 protocol_config: ProtocolProperties::Plain,
102 };
103
104 pub fn new(
106 info: &StreamSourceInfo,
107 with_properties: &WithOptionsSecResolved,
108 ) -> ConnectorResult<Self> {
109 let info = info.clone();
110 let source_struct = extract_source_struct(&info)?;
111 let format_encode_options_with_secret = LocalSecretManager::global()
112 .fill_secrets(info.format_encode_options, info.format_encode_secret_refs)?;
113 let (options, secret_refs) = with_properties.clone().into_parts();
114 #[expect(unused_variables)]
117 let with_properties = ();
118 let options_with_secret =
119 LocalSecretManager::global().fill_secrets(options, secret_refs)?;
120 let format = source_struct.format;
121 let encode = source_struct.encode;
122 let protocol_config = match format {
125 SourceFormat::Native => ProtocolProperties::Native,
126 SourceFormat::None => ProtocolProperties::None,
127 SourceFormat::Debezium => {
128 let debezium_props = DebeziumProps::from(&format_encode_options_with_secret);
129 ProtocolProperties::Debezium(debezium_props)
130 }
131 SourceFormat::DebeziumMongo => ProtocolProperties::DebeziumMongo,
132 SourceFormat::Maxwell => ProtocolProperties::Maxwell,
133 SourceFormat::Canal => ProtocolProperties::Canal,
134 SourceFormat::Upsert => ProtocolProperties::Upsert,
135 SourceFormat::Plain => ProtocolProperties::Plain,
136 _ => unreachable!(),
137 };
138
139 let encoding_config = match (format, encode) {
140 (SourceFormat::Plain, SourceEncode::Csv) => EncodingProperties::Csv(CsvProperties {
141 delimiter: info.csv_delimiter as u8,
142 has_header: info.csv_has_header,
143 }),
144 (SourceFormat::Plain, SourceEncode::Parquet) => EncodingProperties::Parquet,
145 (SourceFormat::Plain, SourceEncode::Avro)
146 | (SourceFormat::Upsert, SourceEncode::Avro) => {
147 let mut config = AvroProperties {
148 record_name: if info.proto_message_name.is_empty() {
149 None
150 } else {
151 Some(info.proto_message_name.clone())
152 },
153 key_record_name: info.key_message_name.clone(),
154 map_handling: MapHandling::from_options(&format_encode_options_with_secret)?,
155 ..Default::default()
156 };
157 config.schema_location = if let Some(schema_arn) =
158 format_encode_options_with_secret.get(AWS_GLUE_SCHEMA_ARN_KEY)
159 {
160 risingwave_common::license::Feature::GlueSchemaRegistry
161 .check_available()
162 .map_err(anyhow::Error::from)?;
163 SchemaLocation::Glue {
164 schema_arn: schema_arn.clone(),
165 aws_auth_props: serde_json::from_value::<AwsAuthProps>(
166 serde_json::to_value(format_encode_options_with_secret.clone())
167 .unwrap(),
168 )
169 .map_err(|e| anyhow::anyhow!(e))?,
170 mock_config: format_encode_options_with_secret
172 .get("aws.glue.mock_config")
173 .cloned(),
174 }
175 } else if info.use_schema_registry {
176 SchemaLocation::Confluent {
177 urls: info.row_schema_location.clone(),
178 client_config: SchemaRegistryConfig::from(
179 &format_encode_options_with_secret,
180 ),
181 name_strategy: PbSchemaRegistryNameStrategy::try_from(info.name_strategy)
182 .unwrap(),
183 topic: get_kafka_topic(&options_with_secret)?.clone(),
184 }
185 } else {
186 SchemaLocation::File {
187 url: info.row_schema_location,
188 aws_auth_props: Some(
189 serde_json::from_value::<AwsAuthProps>(
190 serde_json::to_value(format_encode_options_with_secret).unwrap(),
191 )
192 .map_err(|e| anyhow::anyhow!(e))?,
193 ),
194 }
195 };
196 EncodingProperties::Avro(config)
197 }
198 (SourceFormat::Plain, SourceEncode::Protobuf)
199 | (SourceFormat::Upsert, SourceEncode::Protobuf) => {
200 if info.row_schema_location.is_empty() {
201 bail!("protobuf file location not provided");
202 }
203 let mut messages_as_jsonb = if let Some(messages_as_jsonb) =
204 format_encode_options_with_secret.get(PROTOBUF_MESSAGES_AS_JSONB)
205 {
206 messages_as_jsonb.split(',').map(|s| s.to_owned()).collect()
207 } else {
208 HashSet::new()
209 };
210 messages_as_jsonb.insert("google.protobuf.Any".to_owned());
211
212 let mut config = ProtobufProperties {
213 message_name: info.proto_message_name.clone(),
214 key_message_name: info.key_message_name.clone(),
215 messages_as_jsonb,
216 ..Default::default()
217 };
218 config.schema_location = if info.use_schema_registry {
219 SchemaLocation::Confluent {
220 urls: info.row_schema_location.clone(),
221 client_config: SchemaRegistryConfig::from(
222 &format_encode_options_with_secret,
223 ),
224 name_strategy: PbSchemaRegistryNameStrategy::try_from(info.name_strategy)
225 .unwrap(),
226 topic: get_kafka_topic(&options_with_secret)?.clone(),
227 }
228 } else {
229 SchemaLocation::File {
230 url: info.row_schema_location,
231 aws_auth_props: Some(
232 serde_json::from_value::<AwsAuthProps>(
233 serde_json::to_value(format_encode_options_with_secret).unwrap(),
234 )
235 .map_err(|e| anyhow::anyhow!(e))?,
236 ),
237 }
238 };
239 EncodingProperties::Protobuf(config)
240 }
241 (SourceFormat::Debezium, SourceEncode::Avro) => {
242 EncodingProperties::Avro(AvroProperties {
243 record_name: if info.proto_message_name.is_empty() {
244 None
245 } else {
246 Some(info.proto_message_name.clone())
247 },
248 key_record_name: info.key_message_name.clone(),
249 schema_location: SchemaLocation::Confluent {
250 urls: info.row_schema_location.clone(),
251 client_config: SchemaRegistryConfig::from(
252 &format_encode_options_with_secret,
253 ),
254 name_strategy: PbSchemaRegistryNameStrategy::try_from(info.name_strategy)
255 .unwrap(),
256 topic: get_kafka_topic(&options_with_secret).unwrap().clone(),
257 },
258 ..Default::default()
259 })
260 }
261 (
262 SourceFormat::Plain
263 | SourceFormat::Debezium
264 | SourceFormat::Maxwell
265 | SourceFormat::Canal
266 | SourceFormat::Upsert,
267 SourceEncode::Json,
268 ) => EncodingProperties::Json(JsonProperties {
269 use_schema_registry: info.use_schema_registry,
270 timestamp_handling: None,
271 timestamptz_handling: TimestamptzHandling::from_options(
272 &format_encode_options_with_secret,
273 )?,
274 time_handling: None,
275 bigint_unsigned_handling: None,
276 handle_toast_columns: false,
277 }),
278 (SourceFormat::DebeziumMongo, SourceEncode::Json) => {
279 let props = MongoProperties::from(&format_encode_options_with_secret);
280 EncodingProperties::MongoJson(props)
281 }
282 (SourceFormat::Plain, SourceEncode::Bytes) => {
283 EncodingProperties::Bytes(BytesProperties { column_name: None })
284 }
285 (SourceFormat::Native, SourceEncode::Native) => EncodingProperties::Native,
286 (SourceFormat::None, SourceEncode::None) => EncodingProperties::None,
287 (format, encode) => {
288 bail!("Unsupported format {:?} encode {:?}", format, encode);
289 }
290 };
291 Ok(Self {
292 encoding_config,
293 protocol_config,
294 })
295 }
296}
297
298#[derive(Debug, Default, Clone)]
299pub struct AvroProperties {
300 pub schema_location: SchemaLocation,
301 pub record_name: Option<String>,
302 pub key_record_name: Option<String>,
303 pub map_handling: Option<MapHandling>,
304}
305
306#[derive(Debug, Clone)]
308pub enum SchemaLocation {
309 File {
311 url: String,
312 aws_auth_props: Option<AwsAuthProps>, },
314 Confluent {
316 urls: String,
317 client_config: SchemaRegistryConfig,
318 name_strategy: PbSchemaRegistryNameStrategy,
319 topic: String,
320 },
321 Glue {
323 schema_arn: String,
324 aws_auth_props: AwsAuthProps,
325 mock_config: Option<String>,
327 },
328}
329
330impl Default for SchemaLocation {
332 fn default() -> Self {
333 Self::File {
335 url: Default::default(),
336 aws_auth_props: None,
337 }
338 }
339}
340
341#[derive(Debug, Default, Clone)]
342pub struct ProtobufProperties {
343 pub schema_location: SchemaLocation,
344 pub message_name: String,
345 pub key_message_name: Option<String>,
346 pub messages_as_jsonb: HashSet<String>,
347}
348
349#[derive(Debug, Default, Clone, Copy)]
350pub struct CsvProperties {
351 pub delimiter: u8,
352 pub has_header: bool,
353}
354
355#[derive(Debug, Default, Clone)]
356pub struct JsonProperties {
357 pub use_schema_registry: bool,
358 pub timestamp_handling: Option<TimestampHandling>,
359 pub timestamptz_handling: Option<TimestamptzHandling>,
360 pub time_handling: Option<TimeHandling>,
361 pub bigint_unsigned_handling: Option<BigintUnsignedHandlingMode>,
362 pub handle_toast_columns: bool,
363}
364
365#[derive(Debug, Default, Clone)]
366pub struct BytesProperties {
367 pub column_name: Option<String>,
368}
369
370#[derive(Debug, Default, Clone)]
371pub struct MongoProperties {
372 pub strong_schema: bool,
373}
374
375impl MongoProperties {
376 pub fn new(strong_schema: bool) -> Self {
377 Self { strong_schema }
378 }
379}
380impl From<&BTreeMap<String, String>> for MongoProperties {
381 fn from(config: &BTreeMap<String, String>) -> Self {
382 let strong_schema = config
383 .get(CDC_MONGODB_STRONG_SCHEMA_KEY)
384 .is_some_and(|k| k.eq_ignore_ascii_case("true"));
385 Self { strong_schema }
386 }
387}