1use std::collections::{BTreeMap, HashSet};
16
17use risingwave_common::bail;
18use risingwave_common::secret::LocalSecretManager;
19use risingwave_connector_codec::decoder::avro::MapHandling;
20use risingwave_pb::catalog::{PbSchemaRegistryNameStrategy, StreamSourceInfo};
21
22use super::utils::get_kafka_topic;
23use super::{DebeziumProps, TimestamptzHandling};
24use crate::WithOptionsSecResolved;
25use crate::connector_common::AwsAuthProps;
26use crate::error::ConnectorResult;
27use crate::parser::PROTOBUF_MESSAGES_AS_JSONB;
28use crate::schema::AWS_GLUE_SCHEMA_ARN_KEY;
29use crate::schema::schema_registry::SchemaRegistryConfig;
30use crate::source::cdc::CDC_MONGODB_STRONG_SCHEMA_KEY;
31use crate::source::{SourceColumnDesc, SourceEncode, SourceFormat, extract_source_struct};
32
33#[derive(Debug, Clone, Default)]
35pub struct ParserConfig {
36 pub common: CommonParserConfig,
37 pub specific: SpecificParserConfig,
38}
39
40impl ParserConfig {
41 pub fn get_config(self) -> (Vec<SourceColumnDesc>, SpecificParserConfig) {
42 (self.common.rw_columns, self.specific)
43 }
44}
45
46#[derive(Debug, Clone, Default)]
47pub struct CommonParserConfig {
48 pub rw_columns: Vec<SourceColumnDesc>,
50}
51
52#[derive(Debug, Clone, Default)]
53pub struct SpecificParserConfig {
54 pub encoding_config: EncodingProperties,
55 pub protocol_config: ProtocolProperties,
56}
57
58#[derive(Debug, Default, Clone)]
59pub enum EncodingProperties {
60 Avro(AvroProperties),
61 Protobuf(ProtobufProperties),
62 Csv(CsvProperties),
63 Json(JsonProperties),
64 MongoJson(MongoProperties),
65 Bytes(BytesProperties),
66 Parquet,
67 Native,
68 None,
70 #[default]
71 Unspecified,
72}
73
74#[derive(Debug, Default, Clone)]
75pub enum ProtocolProperties {
76 Debezium(DebeziumProps),
77 DebeziumMongo,
78 Maxwell,
79 Canal,
80 Plain,
81 Upsert,
82 Native,
83 None,
85 #[default]
86 Unspecified,
87}
88
89impl SpecificParserConfig {
90 pub const DEFAULT_PLAIN_JSON: SpecificParserConfig = SpecificParserConfig {
92 encoding_config: EncodingProperties::Json(JsonProperties {
93 use_schema_registry: false,
94 timestamptz_handling: None,
95 }),
96 protocol_config: ProtocolProperties::Plain,
97 };
98
99 pub fn new(
101 info: &StreamSourceInfo,
102 with_properties: &WithOptionsSecResolved,
103 ) -> ConnectorResult<Self> {
104 let info = info.clone();
105 let source_struct = extract_source_struct(&info)?;
106 let format_encode_options_with_secret = LocalSecretManager::global()
107 .fill_secrets(info.format_encode_options, info.format_encode_secret_refs)?;
108 let (options, secret_refs) = with_properties.clone().into_parts();
109 #[expect(unused_variables)]
112 let with_properties = ();
113 let options_with_secret =
114 LocalSecretManager::global().fill_secrets(options, secret_refs)?;
115 let format = source_struct.format;
116 let encode = source_struct.encode;
117 let protocol_config = match format {
120 SourceFormat::Native => ProtocolProperties::Native,
121 SourceFormat::None => ProtocolProperties::None,
122 SourceFormat::Debezium => {
123 let debezium_props = DebeziumProps::from(&format_encode_options_with_secret);
124 ProtocolProperties::Debezium(debezium_props)
125 }
126 SourceFormat::DebeziumMongo => ProtocolProperties::DebeziumMongo,
127 SourceFormat::Maxwell => ProtocolProperties::Maxwell,
128 SourceFormat::Canal => ProtocolProperties::Canal,
129 SourceFormat::Upsert => ProtocolProperties::Upsert,
130 SourceFormat::Plain => ProtocolProperties::Plain,
131 _ => unreachable!(),
132 };
133
134 let encoding_config = match (format, encode) {
135 (SourceFormat::Plain, SourceEncode::Csv) => EncodingProperties::Csv(CsvProperties {
136 delimiter: info.csv_delimiter as u8,
137 has_header: info.csv_has_header,
138 }),
139 (SourceFormat::Plain, SourceEncode::Parquet) => EncodingProperties::Parquet,
140 (SourceFormat::Plain, SourceEncode::Avro)
141 | (SourceFormat::Upsert, SourceEncode::Avro) => {
142 let mut config = AvroProperties {
143 record_name: if info.proto_message_name.is_empty() {
144 None
145 } else {
146 Some(info.proto_message_name.clone())
147 },
148 key_record_name: info.key_message_name.clone(),
149 map_handling: MapHandling::from_options(&format_encode_options_with_secret)?,
150 ..Default::default()
151 };
152 config.schema_location = if let Some(schema_arn) =
153 format_encode_options_with_secret.get(AWS_GLUE_SCHEMA_ARN_KEY)
154 {
155 risingwave_common::license::Feature::GlueSchemaRegistry
156 .check_available()
157 .map_err(anyhow::Error::from)?;
158 SchemaLocation::Glue {
159 schema_arn: schema_arn.clone(),
160 aws_auth_props: serde_json::from_value::<AwsAuthProps>(
161 serde_json::to_value(format_encode_options_with_secret.clone())
162 .unwrap(),
163 )
164 .map_err(|e| anyhow::anyhow!(e))?,
165 mock_config: format_encode_options_with_secret
167 .get("aws.glue.mock_config")
168 .cloned(),
169 }
170 } else if info.use_schema_registry {
171 SchemaLocation::Confluent {
172 urls: info.row_schema_location.clone(),
173 client_config: SchemaRegistryConfig::from(
174 &format_encode_options_with_secret,
175 ),
176 name_strategy: PbSchemaRegistryNameStrategy::try_from(info.name_strategy)
177 .unwrap(),
178 topic: get_kafka_topic(&options_with_secret)?.clone(),
179 }
180 } else {
181 SchemaLocation::File {
182 url: info.row_schema_location.clone(),
183 aws_auth_props: Some(
184 serde_json::from_value::<AwsAuthProps>(
185 serde_json::to_value(format_encode_options_with_secret.clone())
186 .unwrap(),
187 )
188 .map_err(|e| anyhow::anyhow!(e))?,
189 ),
190 }
191 };
192 EncodingProperties::Avro(config)
193 }
194 (SourceFormat::Plain, SourceEncode::Protobuf)
195 | (SourceFormat::Upsert, SourceEncode::Protobuf) => {
196 if info.row_schema_location.is_empty() {
197 bail!("protobuf file location not provided");
198 }
199 let mut messages_as_jsonb = if let Some(messages_as_jsonb) =
200 format_encode_options_with_secret.get(PROTOBUF_MESSAGES_AS_JSONB)
201 {
202 messages_as_jsonb.split(',').map(|s| s.to_owned()).collect()
203 } else {
204 HashSet::new()
205 };
206 messages_as_jsonb.insert("google.protobuf.Any".to_owned());
207
208 let mut config = ProtobufProperties {
209 message_name: info.proto_message_name.clone(),
210 key_message_name: info.key_message_name.clone(),
211 messages_as_jsonb,
212 ..Default::default()
213 };
214 config.schema_location = if info.use_schema_registry {
215 SchemaLocation::Confluent {
216 urls: info.row_schema_location.clone(),
217 client_config: SchemaRegistryConfig::from(
218 &format_encode_options_with_secret,
219 ),
220 name_strategy: PbSchemaRegistryNameStrategy::try_from(info.name_strategy)
221 .unwrap(),
222 topic: get_kafka_topic(&options_with_secret)?.clone(),
223 }
224 } else {
225 SchemaLocation::File {
226 url: info.row_schema_location.clone(),
227 aws_auth_props: Some(
228 serde_json::from_value::<AwsAuthProps>(
229 serde_json::to_value(format_encode_options_with_secret.clone())
230 .unwrap(),
231 )
232 .map_err(|e| anyhow::anyhow!(e))?,
233 ),
234 }
235 };
236 EncodingProperties::Protobuf(config)
237 }
238 (SourceFormat::Debezium, SourceEncode::Avro) => {
239 EncodingProperties::Avro(AvroProperties {
240 record_name: if info.proto_message_name.is_empty() {
241 None
242 } else {
243 Some(info.proto_message_name.clone())
244 },
245 key_record_name: info.key_message_name.clone(),
246 schema_location: SchemaLocation::Confluent {
247 urls: info.row_schema_location.clone(),
248 client_config: SchemaRegistryConfig::from(
249 &format_encode_options_with_secret,
250 ),
251 name_strategy: PbSchemaRegistryNameStrategy::try_from(info.name_strategy)
252 .unwrap(),
253 topic: get_kafka_topic(&options_with_secret).unwrap().clone(),
254 },
255 ..Default::default()
256 })
257 }
258 (
259 SourceFormat::Plain
260 | SourceFormat::Debezium
261 | SourceFormat::Maxwell
262 | SourceFormat::Canal
263 | SourceFormat::Upsert,
264 SourceEncode::Json,
265 ) => EncodingProperties::Json(JsonProperties {
266 use_schema_registry: info.use_schema_registry,
267 timestamptz_handling: TimestamptzHandling::from_options(
268 &format_encode_options_with_secret,
269 )?,
270 }),
271 (SourceFormat::DebeziumMongo, SourceEncode::Json) => {
272 let props = MongoProperties::from(&format_encode_options_with_secret);
273 EncodingProperties::MongoJson(props)
274 }
275 (SourceFormat::Plain, SourceEncode::Bytes) => {
276 EncodingProperties::Bytes(BytesProperties { column_name: None })
277 }
278 (SourceFormat::Native, SourceEncode::Native) => EncodingProperties::Native,
279 (SourceFormat::None, SourceEncode::None) => EncodingProperties::None,
280 (format, encode) => {
281 bail!("Unsupported format {:?} encode {:?}", format, encode);
282 }
283 };
284 Ok(Self {
285 encoding_config,
286 protocol_config,
287 })
288 }
289}
290
291#[derive(Debug, Default, Clone)]
292pub struct AvroProperties {
293 pub schema_location: SchemaLocation,
294 pub record_name: Option<String>,
295 pub key_record_name: Option<String>,
296 pub map_handling: Option<MapHandling>,
297}
298
299#[derive(Debug, Clone)]
301pub enum SchemaLocation {
302 File {
304 url: String,
305 aws_auth_props: Option<AwsAuthProps>, },
307 Confluent {
309 urls: String,
310 client_config: SchemaRegistryConfig,
311 name_strategy: PbSchemaRegistryNameStrategy,
312 topic: String,
313 },
314 Glue {
316 schema_arn: String,
317 aws_auth_props: AwsAuthProps,
318 mock_config: Option<String>,
320 },
321}
322
323impl Default for SchemaLocation {
325 fn default() -> Self {
326 Self::File {
328 url: Default::default(),
329 aws_auth_props: None,
330 }
331 }
332}
333
334#[derive(Debug, Default, Clone)]
335pub struct ProtobufProperties {
336 pub schema_location: SchemaLocation,
337 pub message_name: String,
338 pub key_message_name: Option<String>,
339 pub messages_as_jsonb: HashSet<String>,
340}
341
342#[derive(Debug, Default, Clone, Copy)]
343pub struct CsvProperties {
344 pub delimiter: u8,
345 pub has_header: bool,
346}
347
348#[derive(Debug, Default, Clone)]
349pub struct JsonProperties {
350 pub use_schema_registry: bool,
351 pub timestamptz_handling: Option<TimestamptzHandling>,
352}
353
354#[derive(Debug, Default, Clone)]
355pub struct BytesProperties {
356 pub column_name: Option<String>,
357}
358
359#[derive(Debug, Default, Clone)]
360pub struct MongoProperties {
361 pub strong_schema: bool,
362}
363
364impl MongoProperties {
365 pub fn new(strong_schema: bool) -> Self {
366 Self { strong_schema }
367 }
368}
369impl From<&BTreeMap<String, String>> for MongoProperties {
370 fn from(config: &BTreeMap<String, String>) -> Self {
371 let strong_schema = config
372 .get(CDC_MONGODB_STRONG_SCHEMA_KEY)
373 .is_some_and(|k| k.eq_ignore_ascii_case("true"));
374 Self { strong_schema }
375 }
376}