1use std::collections::{BTreeMap, HashSet};
16
17use risingwave_common::bail;
18use risingwave_common::secret::LocalSecretManager;
19use risingwave_connector_codec::decoder::avro::MapHandling;
20use risingwave_pb::catalog::{PbSchemaRegistryNameStrategy, StreamSourceInfo};
21
22use super::utils::get_kafka_topic;
23use super::{DebeziumProps, TimeHandling, TimestampHandling, TimestamptzHandling};
24use crate::WithOptionsSecResolved;
25use crate::connector_common::AwsAuthProps;
26use crate::error::ConnectorResult;
27use crate::parser::PROTOBUF_MESSAGES_AS_JSONB;
28use crate::schema::AWS_GLUE_SCHEMA_ARN_KEY;
29use crate::schema::schema_registry::SchemaRegistryConfig;
30use crate::source::cdc::CDC_MONGODB_STRONG_SCHEMA_KEY;
31use crate::source::{SourceColumnDesc, SourceEncode, SourceFormat, extract_source_struct};
32
33#[derive(Debug, Clone, Default)]
35pub struct ParserConfig {
36 pub common: CommonParserConfig,
37 pub specific: SpecificParserConfig,
38}
39
40impl ParserConfig {
41 pub fn get_config(self) -> (Vec<SourceColumnDesc>, SpecificParserConfig) {
42 (self.common.rw_columns, self.specific)
43 }
44}
45
46#[derive(Debug, Clone, Default)]
47pub struct CommonParserConfig {
48 pub rw_columns: Vec<SourceColumnDesc>,
50}
51
52#[derive(Debug, Clone, Default)]
53pub struct SpecificParserConfig {
54 pub encoding_config: EncodingProperties,
55 pub protocol_config: ProtocolProperties,
56}
57
58#[derive(Debug, Default, Clone)]
59pub enum EncodingProperties {
60 Avro(AvroProperties),
61 Protobuf(ProtobufProperties),
62 Csv(CsvProperties),
63 Json(JsonProperties),
64 MongoJson(MongoProperties),
65 Bytes(BytesProperties),
66 Parquet,
67 Native,
68 None,
70 #[default]
71 Unspecified,
72}
73
74#[derive(Debug, Default, Clone)]
75pub enum ProtocolProperties {
76 Debezium(DebeziumProps),
77 DebeziumMongo,
78 Maxwell,
79 Canal,
80 Plain,
81 Upsert,
82 Native,
83 None,
85 #[default]
86 Unspecified,
87}
88
89impl SpecificParserConfig {
90 pub const DEFAULT_PLAIN_JSON: SpecificParserConfig = SpecificParserConfig {
92 encoding_config: EncodingProperties::Json(JsonProperties {
93 use_schema_registry: false,
94 timestamp_handling: None,
95 timestamptz_handling: None,
96 time_handling: None,
97 }),
98 protocol_config: ProtocolProperties::Plain,
99 };
100
101 pub fn new(
103 info: &StreamSourceInfo,
104 with_properties: &WithOptionsSecResolved,
105 ) -> ConnectorResult<Self> {
106 let info = info.clone();
107 let source_struct = extract_source_struct(&info)?;
108 let format_encode_options_with_secret = LocalSecretManager::global()
109 .fill_secrets(info.format_encode_options, info.format_encode_secret_refs)?;
110 let (options, secret_refs) = with_properties.clone().into_parts();
111 #[expect(unused_variables)]
114 let with_properties = ();
115 let options_with_secret =
116 LocalSecretManager::global().fill_secrets(options, secret_refs)?;
117 let format = source_struct.format;
118 let encode = source_struct.encode;
119 let protocol_config = match format {
122 SourceFormat::Native => ProtocolProperties::Native,
123 SourceFormat::None => ProtocolProperties::None,
124 SourceFormat::Debezium => {
125 let debezium_props = DebeziumProps::from(&format_encode_options_with_secret);
126 ProtocolProperties::Debezium(debezium_props)
127 }
128 SourceFormat::DebeziumMongo => ProtocolProperties::DebeziumMongo,
129 SourceFormat::Maxwell => ProtocolProperties::Maxwell,
130 SourceFormat::Canal => ProtocolProperties::Canal,
131 SourceFormat::Upsert => ProtocolProperties::Upsert,
132 SourceFormat::Plain => ProtocolProperties::Plain,
133 _ => unreachable!(),
134 };
135
136 let encoding_config = match (format, encode) {
137 (SourceFormat::Plain, SourceEncode::Csv) => EncodingProperties::Csv(CsvProperties {
138 delimiter: info.csv_delimiter as u8,
139 has_header: info.csv_has_header,
140 }),
141 (SourceFormat::Plain, SourceEncode::Parquet) => EncodingProperties::Parquet,
142 (SourceFormat::Plain, SourceEncode::Avro)
143 | (SourceFormat::Upsert, SourceEncode::Avro) => {
144 let mut config = AvroProperties {
145 record_name: if info.proto_message_name.is_empty() {
146 None
147 } else {
148 Some(info.proto_message_name.clone())
149 },
150 key_record_name: info.key_message_name.clone(),
151 map_handling: MapHandling::from_options(&format_encode_options_with_secret)?,
152 ..Default::default()
153 };
154 config.schema_location = if let Some(schema_arn) =
155 format_encode_options_with_secret.get(AWS_GLUE_SCHEMA_ARN_KEY)
156 {
157 risingwave_common::license::Feature::GlueSchemaRegistry
158 .check_available()
159 .map_err(anyhow::Error::from)?;
160 SchemaLocation::Glue {
161 schema_arn: schema_arn.clone(),
162 aws_auth_props: serde_json::from_value::<AwsAuthProps>(
163 serde_json::to_value(format_encode_options_with_secret.clone())
164 .unwrap(),
165 )
166 .map_err(|e| anyhow::anyhow!(e))?,
167 mock_config: format_encode_options_with_secret
169 .get("aws.glue.mock_config")
170 .cloned(),
171 }
172 } else if info.use_schema_registry {
173 SchemaLocation::Confluent {
174 urls: info.row_schema_location.clone(),
175 client_config: SchemaRegistryConfig::from(
176 &format_encode_options_with_secret,
177 ),
178 name_strategy: PbSchemaRegistryNameStrategy::try_from(info.name_strategy)
179 .unwrap(),
180 topic: get_kafka_topic(&options_with_secret)?.clone(),
181 }
182 } else {
183 SchemaLocation::File {
184 url: info.row_schema_location.clone(),
185 aws_auth_props: Some(
186 serde_json::from_value::<AwsAuthProps>(
187 serde_json::to_value(format_encode_options_with_secret.clone())
188 .unwrap(),
189 )
190 .map_err(|e| anyhow::anyhow!(e))?,
191 ),
192 }
193 };
194 EncodingProperties::Avro(config)
195 }
196 (SourceFormat::Plain, SourceEncode::Protobuf)
197 | (SourceFormat::Upsert, SourceEncode::Protobuf) => {
198 if info.row_schema_location.is_empty() {
199 bail!("protobuf file location not provided");
200 }
201 let mut messages_as_jsonb = if let Some(messages_as_jsonb) =
202 format_encode_options_with_secret.get(PROTOBUF_MESSAGES_AS_JSONB)
203 {
204 messages_as_jsonb.split(',').map(|s| s.to_owned()).collect()
205 } else {
206 HashSet::new()
207 };
208 messages_as_jsonb.insert("google.protobuf.Any".to_owned());
209
210 let mut config = ProtobufProperties {
211 message_name: info.proto_message_name.clone(),
212 key_message_name: info.key_message_name.clone(),
213 messages_as_jsonb,
214 ..Default::default()
215 };
216 config.schema_location = if info.use_schema_registry {
217 SchemaLocation::Confluent {
218 urls: info.row_schema_location.clone(),
219 client_config: SchemaRegistryConfig::from(
220 &format_encode_options_with_secret,
221 ),
222 name_strategy: PbSchemaRegistryNameStrategy::try_from(info.name_strategy)
223 .unwrap(),
224 topic: get_kafka_topic(&options_with_secret)?.clone(),
225 }
226 } else {
227 SchemaLocation::File {
228 url: info.row_schema_location.clone(),
229 aws_auth_props: Some(
230 serde_json::from_value::<AwsAuthProps>(
231 serde_json::to_value(format_encode_options_with_secret.clone())
232 .unwrap(),
233 )
234 .map_err(|e| anyhow::anyhow!(e))?,
235 ),
236 }
237 };
238 EncodingProperties::Protobuf(config)
239 }
240 (SourceFormat::Debezium, SourceEncode::Avro) => {
241 EncodingProperties::Avro(AvroProperties {
242 record_name: if info.proto_message_name.is_empty() {
243 None
244 } else {
245 Some(info.proto_message_name.clone())
246 },
247 key_record_name: info.key_message_name.clone(),
248 schema_location: SchemaLocation::Confluent {
249 urls: info.row_schema_location.clone(),
250 client_config: SchemaRegistryConfig::from(
251 &format_encode_options_with_secret,
252 ),
253 name_strategy: PbSchemaRegistryNameStrategy::try_from(info.name_strategy)
254 .unwrap(),
255 topic: get_kafka_topic(&options_with_secret).unwrap().clone(),
256 },
257 ..Default::default()
258 })
259 }
260 (
261 SourceFormat::Plain
262 | SourceFormat::Debezium
263 | SourceFormat::Maxwell
264 | SourceFormat::Canal
265 | SourceFormat::Upsert,
266 SourceEncode::Json,
267 ) => EncodingProperties::Json(JsonProperties {
268 use_schema_registry: info.use_schema_registry,
269 timestamp_handling: None,
270 timestamptz_handling: TimestamptzHandling::from_options(
271 &format_encode_options_with_secret,
272 )?,
273 time_handling: None,
274 }),
275 (SourceFormat::DebeziumMongo, SourceEncode::Json) => {
276 let props = MongoProperties::from(&format_encode_options_with_secret);
277 EncodingProperties::MongoJson(props)
278 }
279 (SourceFormat::Plain, SourceEncode::Bytes) => {
280 EncodingProperties::Bytes(BytesProperties { column_name: None })
281 }
282 (SourceFormat::Native, SourceEncode::Native) => EncodingProperties::Native,
283 (SourceFormat::None, SourceEncode::None) => EncodingProperties::None,
284 (format, encode) => {
285 bail!("Unsupported format {:?} encode {:?}", format, encode);
286 }
287 };
288 Ok(Self {
289 encoding_config,
290 protocol_config,
291 })
292 }
293}
294
295#[derive(Debug, Default, Clone)]
296pub struct AvroProperties {
297 pub schema_location: SchemaLocation,
298 pub record_name: Option<String>,
299 pub key_record_name: Option<String>,
300 pub map_handling: Option<MapHandling>,
301}
302
303#[derive(Debug, Clone)]
305pub enum SchemaLocation {
306 File {
308 url: String,
309 aws_auth_props: Option<AwsAuthProps>, },
311 Confluent {
313 urls: String,
314 client_config: SchemaRegistryConfig,
315 name_strategy: PbSchemaRegistryNameStrategy,
316 topic: String,
317 },
318 Glue {
320 schema_arn: String,
321 aws_auth_props: AwsAuthProps,
322 mock_config: Option<String>,
324 },
325}
326
327impl Default for SchemaLocation {
329 fn default() -> Self {
330 Self::File {
332 url: Default::default(),
333 aws_auth_props: None,
334 }
335 }
336}
337
338#[derive(Debug, Default, Clone)]
339pub struct ProtobufProperties {
340 pub schema_location: SchemaLocation,
341 pub message_name: String,
342 pub key_message_name: Option<String>,
343 pub messages_as_jsonb: HashSet<String>,
344}
345
346#[derive(Debug, Default, Clone, Copy)]
347pub struct CsvProperties {
348 pub delimiter: u8,
349 pub has_header: bool,
350}
351
352#[derive(Debug, Default, Clone)]
353pub struct JsonProperties {
354 pub use_schema_registry: bool,
355 pub timestamp_handling: Option<TimestampHandling>,
356 pub timestamptz_handling: Option<TimestamptzHandling>,
357 pub time_handling: Option<TimeHandling>,
358}
359
360#[derive(Debug, Default, Clone)]
361pub struct BytesProperties {
362 pub column_name: Option<String>,
363}
364
365#[derive(Debug, Default, Clone)]
366pub struct MongoProperties {
367 pub strong_schema: bool,
368}
369
370impl MongoProperties {
371 pub fn new(strong_schema: bool) -> Self {
372 Self { strong_schema }
373 }
374}
375impl From<&BTreeMap<String, String>> for MongoProperties {
376 fn from(config: &BTreeMap<String, String>) -> Self {
377 let strong_schema = config
378 .get(CDC_MONGODB_STRONG_SCHEMA_KEY)
379 .is_some_and(|k| k.eq_ignore_ascii_case("true"));
380 Self { strong_schema }
381 }
382}