1use std::collections::{BTreeMap, HashSet};
16
17use risingwave_common::bail;
18use risingwave_common::secret::LocalSecretManager;
19use risingwave_connector_codec::decoder::avro::MapHandling;
20use risingwave_pb::catalog::{PbSchemaRegistryNameStrategy, StreamSourceInfo};
21
22use super::utils::get_kafka_topic;
23use super::{DebeziumProps, TimeHandling, TimestampHandling, TimestamptzHandling};
24use crate::WithOptionsSecResolved;
25use crate::connector_common::AwsAuthProps;
26use crate::error::ConnectorResult;
27use crate::parser::PROTOBUF_MESSAGES_AS_JSONB;
28use crate::schema::AWS_GLUE_SCHEMA_ARN_KEY;
29use crate::schema::schema_registry::SchemaRegistryConfig;
30use crate::source::cdc::CDC_MONGODB_STRONG_SCHEMA_KEY;
31use crate::source::{SourceColumnDesc, SourceEncode, SourceFormat, extract_source_struct};
32
33#[derive(Debug, Clone, Default)]
35pub struct ParserConfig {
36 pub common: CommonParserConfig,
37 pub specific: SpecificParserConfig,
38}
39
40impl ParserConfig {
41 pub fn get_config(self) -> (Vec<SourceColumnDesc>, SpecificParserConfig) {
42 (self.common.rw_columns, self.specific)
43 }
44}
45
46#[derive(Debug, Clone, Default)]
47pub struct CommonParserConfig {
48 pub rw_columns: Vec<SourceColumnDesc>,
50}
51
52#[derive(Debug, Clone, Default)]
53pub struct SpecificParserConfig {
54 pub encoding_config: EncodingProperties,
55 pub protocol_config: ProtocolProperties,
56}
57
58#[derive(Debug, Default, Clone)]
59pub enum EncodingProperties {
60 Avro(AvroProperties),
61 Protobuf(ProtobufProperties),
62 Csv(CsvProperties),
63 Json(JsonProperties),
64 MongoJson(MongoProperties),
65 Bytes(BytesProperties),
66 Parquet,
67 Native,
68 None,
70 #[default]
71 Unspecified,
72}
73
74#[derive(Debug, Default, Clone)]
75pub enum ProtocolProperties {
76 Debezium(DebeziumProps),
77 DebeziumMongo,
78 Maxwell,
79 Canal,
80 Plain,
81 Upsert,
82 Native,
83 None,
85 #[default]
86 Unspecified,
87}
88
89impl SpecificParserConfig {
90 pub const DEFAULT_PLAIN_JSON: SpecificParserConfig = SpecificParserConfig {
92 encoding_config: EncodingProperties::Json(JsonProperties {
93 use_schema_registry: false,
94 timestamp_handling: None,
95 timestamptz_handling: None,
96 time_handling: None,
97 handle_toast_columns: false,
98 }),
99 protocol_config: ProtocolProperties::Plain,
100 };
101
102 pub fn new(
104 info: &StreamSourceInfo,
105 with_properties: &WithOptionsSecResolved,
106 ) -> ConnectorResult<Self> {
107 let info = info.clone();
108 let source_struct = extract_source_struct(&info)?;
109 let format_encode_options_with_secret = LocalSecretManager::global()
110 .fill_secrets(info.format_encode_options, info.format_encode_secret_refs)?;
111 let (options, secret_refs) = with_properties.clone().into_parts();
112 #[expect(unused_variables)]
115 let with_properties = ();
116 let options_with_secret =
117 LocalSecretManager::global().fill_secrets(options, secret_refs)?;
118 let format = source_struct.format;
119 let encode = source_struct.encode;
120 let protocol_config = match format {
123 SourceFormat::Native => ProtocolProperties::Native,
124 SourceFormat::None => ProtocolProperties::None,
125 SourceFormat::Debezium => {
126 let debezium_props = DebeziumProps::from(&format_encode_options_with_secret);
127 ProtocolProperties::Debezium(debezium_props)
128 }
129 SourceFormat::DebeziumMongo => ProtocolProperties::DebeziumMongo,
130 SourceFormat::Maxwell => ProtocolProperties::Maxwell,
131 SourceFormat::Canal => ProtocolProperties::Canal,
132 SourceFormat::Upsert => ProtocolProperties::Upsert,
133 SourceFormat::Plain => ProtocolProperties::Plain,
134 _ => unreachable!(),
135 };
136
137 let encoding_config = match (format, encode) {
138 (SourceFormat::Plain, SourceEncode::Csv) => EncodingProperties::Csv(CsvProperties {
139 delimiter: info.csv_delimiter as u8,
140 has_header: info.csv_has_header,
141 }),
142 (SourceFormat::Plain, SourceEncode::Parquet) => EncodingProperties::Parquet,
143 (SourceFormat::Plain, SourceEncode::Avro)
144 | (SourceFormat::Upsert, SourceEncode::Avro) => {
145 let mut config = AvroProperties {
146 record_name: if info.proto_message_name.is_empty() {
147 None
148 } else {
149 Some(info.proto_message_name.clone())
150 },
151 key_record_name: info.key_message_name.clone(),
152 map_handling: MapHandling::from_options(&format_encode_options_with_secret)?,
153 ..Default::default()
154 };
155 config.schema_location = if let Some(schema_arn) =
156 format_encode_options_with_secret.get(AWS_GLUE_SCHEMA_ARN_KEY)
157 {
158 risingwave_common::license::Feature::GlueSchemaRegistry
159 .check_available()
160 .map_err(anyhow::Error::from)?;
161 SchemaLocation::Glue {
162 schema_arn: schema_arn.clone(),
163 aws_auth_props: serde_json::from_value::<AwsAuthProps>(
164 serde_json::to_value(format_encode_options_with_secret.clone())
165 .unwrap(),
166 )
167 .map_err(|e| anyhow::anyhow!(e))?,
168 mock_config: format_encode_options_with_secret
170 .get("aws.glue.mock_config")
171 .cloned(),
172 }
173 } else if info.use_schema_registry {
174 SchemaLocation::Confluent {
175 urls: info.row_schema_location.clone(),
176 client_config: SchemaRegistryConfig::from(
177 &format_encode_options_with_secret,
178 ),
179 name_strategy: PbSchemaRegistryNameStrategy::try_from(info.name_strategy)
180 .unwrap(),
181 topic: get_kafka_topic(&options_with_secret)?.clone(),
182 }
183 } else {
184 SchemaLocation::File {
185 url: info.row_schema_location.clone(),
186 aws_auth_props: Some(
187 serde_json::from_value::<AwsAuthProps>(
188 serde_json::to_value(format_encode_options_with_secret.clone())
189 .unwrap(),
190 )
191 .map_err(|e| anyhow::anyhow!(e))?,
192 ),
193 }
194 };
195 EncodingProperties::Avro(config)
196 }
197 (SourceFormat::Plain, SourceEncode::Protobuf)
198 | (SourceFormat::Upsert, SourceEncode::Protobuf) => {
199 if info.row_schema_location.is_empty() {
200 bail!("protobuf file location not provided");
201 }
202 let mut messages_as_jsonb = if let Some(messages_as_jsonb) =
203 format_encode_options_with_secret.get(PROTOBUF_MESSAGES_AS_JSONB)
204 {
205 messages_as_jsonb.split(',').map(|s| s.to_owned()).collect()
206 } else {
207 HashSet::new()
208 };
209 messages_as_jsonb.insert("google.protobuf.Any".to_owned());
210
211 let mut config = ProtobufProperties {
212 message_name: info.proto_message_name.clone(),
213 key_message_name: info.key_message_name.clone(),
214 messages_as_jsonb,
215 ..Default::default()
216 };
217 config.schema_location = if info.use_schema_registry {
218 SchemaLocation::Confluent {
219 urls: info.row_schema_location.clone(),
220 client_config: SchemaRegistryConfig::from(
221 &format_encode_options_with_secret,
222 ),
223 name_strategy: PbSchemaRegistryNameStrategy::try_from(info.name_strategy)
224 .unwrap(),
225 topic: get_kafka_topic(&options_with_secret)?.clone(),
226 }
227 } else {
228 SchemaLocation::File {
229 url: info.row_schema_location.clone(),
230 aws_auth_props: Some(
231 serde_json::from_value::<AwsAuthProps>(
232 serde_json::to_value(format_encode_options_with_secret.clone())
233 .unwrap(),
234 )
235 .map_err(|e| anyhow::anyhow!(e))?,
236 ),
237 }
238 };
239 EncodingProperties::Protobuf(config)
240 }
241 (SourceFormat::Debezium, SourceEncode::Avro) => {
242 EncodingProperties::Avro(AvroProperties {
243 record_name: if info.proto_message_name.is_empty() {
244 None
245 } else {
246 Some(info.proto_message_name.clone())
247 },
248 key_record_name: info.key_message_name.clone(),
249 schema_location: SchemaLocation::Confluent {
250 urls: info.row_schema_location.clone(),
251 client_config: SchemaRegistryConfig::from(
252 &format_encode_options_with_secret,
253 ),
254 name_strategy: PbSchemaRegistryNameStrategy::try_from(info.name_strategy)
255 .unwrap(),
256 topic: get_kafka_topic(&options_with_secret).unwrap().clone(),
257 },
258 ..Default::default()
259 })
260 }
261 (
262 SourceFormat::Plain
263 | SourceFormat::Debezium
264 | SourceFormat::Maxwell
265 | SourceFormat::Canal
266 | SourceFormat::Upsert,
267 SourceEncode::Json,
268 ) => EncodingProperties::Json(JsonProperties {
269 use_schema_registry: info.use_schema_registry,
270 timestamp_handling: None,
271 timestamptz_handling: TimestamptzHandling::from_options(
272 &format_encode_options_with_secret,
273 )?,
274 time_handling: None,
275 handle_toast_columns: false,
276 }),
277 (SourceFormat::DebeziumMongo, SourceEncode::Json) => {
278 let props = MongoProperties::from(&format_encode_options_with_secret);
279 EncodingProperties::MongoJson(props)
280 }
281 (SourceFormat::Plain, SourceEncode::Bytes) => {
282 EncodingProperties::Bytes(BytesProperties { column_name: None })
283 }
284 (SourceFormat::Native, SourceEncode::Native) => EncodingProperties::Native,
285 (SourceFormat::None, SourceEncode::None) => EncodingProperties::None,
286 (format, encode) => {
287 bail!("Unsupported format {:?} encode {:?}", format, encode);
288 }
289 };
290 Ok(Self {
291 encoding_config,
292 protocol_config,
293 })
294 }
295}
296
297#[derive(Debug, Default, Clone)]
298pub struct AvroProperties {
299 pub schema_location: SchemaLocation,
300 pub record_name: Option<String>,
301 pub key_record_name: Option<String>,
302 pub map_handling: Option<MapHandling>,
303}
304
305#[derive(Debug, Clone)]
307pub enum SchemaLocation {
308 File {
310 url: String,
311 aws_auth_props: Option<AwsAuthProps>, },
313 Confluent {
315 urls: String,
316 client_config: SchemaRegistryConfig,
317 name_strategy: PbSchemaRegistryNameStrategy,
318 topic: String,
319 },
320 Glue {
322 schema_arn: String,
323 aws_auth_props: AwsAuthProps,
324 mock_config: Option<String>,
326 },
327}
328
329impl Default for SchemaLocation {
331 fn default() -> Self {
332 Self::File {
334 url: Default::default(),
335 aws_auth_props: None,
336 }
337 }
338}
339
340#[derive(Debug, Default, Clone)]
341pub struct ProtobufProperties {
342 pub schema_location: SchemaLocation,
343 pub message_name: String,
344 pub key_message_name: Option<String>,
345 pub messages_as_jsonb: HashSet<String>,
346}
347
348#[derive(Debug, Default, Clone, Copy)]
349pub struct CsvProperties {
350 pub delimiter: u8,
351 pub has_header: bool,
352}
353
354#[derive(Debug, Default, Clone)]
355pub struct JsonProperties {
356 pub use_schema_registry: bool,
357 pub timestamp_handling: Option<TimestampHandling>,
358 pub timestamptz_handling: Option<TimestamptzHandling>,
359 pub time_handling: Option<TimeHandling>,
360 pub handle_toast_columns: bool,
361}
362
363#[derive(Debug, Default, Clone)]
364pub struct BytesProperties {
365 pub column_name: Option<String>,
366}
367
368#[derive(Debug, Default, Clone)]
369pub struct MongoProperties {
370 pub strong_schema: bool,
371}
372
373impl MongoProperties {
374 pub fn new(strong_schema: bool) -> Self {
375 Self { strong_schema }
376 }
377}
378impl From<&BTreeMap<String, String>> for MongoProperties {
379 fn from(config: &BTreeMap<String, String>) -> Self {
380 let strong_schema = config
381 .get(CDC_MONGODB_STRONG_SCHEMA_KEY)
382 .is_some_and(|k| k.eq_ignore_ascii_case("true"));
383 Self { strong_schema }
384 }
385}