1use std::fmt;
19
20use itertools::Itertools as _;
21use winnow::ModalResult;
22
23use crate::ast::{
24 AstString, Encode, Format, FormatEncodeOptions, Ident, ObjectName, ParseTo, SqlOption, Value,
25 display_separated,
26};
27use crate::keywords::Keyword;
28use crate::parser::{Parser, StrError};
29use crate::{impl_fmt_display, impl_parse_to, parser_err};
30
31#[derive(Debug, Clone, PartialEq, Eq, Hash)]
32pub enum CompatibleFormatEncode {
33 RowFormat(LegacyRowFormat),
34 V2(FormatEncodeOptions),
35}
36
37impl fmt::Display for CompatibleFormatEncode {
38 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
39 match self {
40 CompatibleFormatEncode::RowFormat(inner) => {
41 write!(f, "{}", inner)
42 }
43 CompatibleFormatEncode::V2(inner) => {
44 write!(f, "{}", inner)
45 }
46 }
47 }
48}
49
50impl CompatibleFormatEncode {
51 pub(crate) fn into_v2(self) -> FormatEncodeOptions {
52 match self {
53 CompatibleFormatEncode::RowFormat(inner) => inner.into_format_encode_v2(),
54 CompatibleFormatEncode::V2(inner) => inner,
55 }
56 }
57}
58
59impl From<FormatEncodeOptions> for CompatibleFormatEncode {
60 fn from(value: FormatEncodeOptions) -> Self {
61 Self::V2(value)
62 }
63}
64
65pub fn parse_format_encode(p: &mut Parser<'_>) -> ModalResult<CompatibleFormatEncode> {
66 if let Some(schema_v2) = p.parse_schema()? {
67 if schema_v2.key_encode.is_some() {
68 parser_err!("key encode clause is not supported in source schema");
69 }
70 Ok(CompatibleFormatEncode::V2(schema_v2))
71 } else if p.peek_nth_any_of_keywords(0, &[Keyword::ROW])
72 && p.peek_nth_any_of_keywords(1, &[Keyword::FORMAT])
73 {
74 p.expect_keyword(Keyword::ROW)?;
75 p.expect_keyword(Keyword::FORMAT)?;
76 let id = p.parse_identifier()?;
77 let value = id.real_value();
78 let schema = match &value[..] {
79 "json" => LegacyRowFormat::Json,
80 "upsert_json" => LegacyRowFormat::UpsertJson,
81 "protobuf" => {
82 impl_parse_to!(protobuf_schema: ProtobufSchema, p);
83 LegacyRowFormat::Protobuf(protobuf_schema)
84 }
85 "debezium_json" => LegacyRowFormat::DebeziumJson,
86 "debezium_mongo_json" => LegacyRowFormat::DebeziumMongoJson,
87 "avro" => {
88 impl_parse_to!(avro_schema: AvroSchema, p);
89 LegacyRowFormat::Avro(avro_schema)
90 }
91 "upsert_avro" => {
92 impl_parse_to!(avro_schema: AvroSchema, p);
93 LegacyRowFormat::UpsertAvro(avro_schema)
94 }
95 "maxwell" => LegacyRowFormat::Maxwell,
96 "canal_json" => LegacyRowFormat::CanalJson,
97 "csv" => {
98 impl_parse_to!(csv_info: CsvInfo, p);
99 LegacyRowFormat::Csv(csv_info)
100 }
101 "native" => LegacyRowFormat::Native, "debezium_avro" => {
103 impl_parse_to!(avro_schema: DebeziumAvroSchema, p);
104 LegacyRowFormat::DebeziumAvro(avro_schema)
105 }
106 "bytes" => LegacyRowFormat::Bytes,
107 _ => {
108 parser_err!(
109 "expected JSON | UPSERT_JSON | PROTOBUF | DEBEZIUM_JSON | DEBEZIUM_AVRO \
110 | AVRO | UPSERT_AVRO | MAXWELL | CANAL_JSON | BYTES | NATIVE after ROW FORMAT"
111 );
112 }
113 };
114 Ok(CompatibleFormatEncode::RowFormat(schema))
115 } else {
116 p.expected("description of the format")
117 }
118}
119
120#[derive(Debug, Clone, PartialEq, Eq, Hash)]
121pub enum LegacyRowFormat {
122 Protobuf(ProtobufSchema), Json, DebeziumJson, DebeziumMongoJson,
126 UpsertJson, Avro(AvroSchema), UpsertAvro(AvroSchema), Maxwell, CanalJson, Csv(CsvInfo), Native,
133 DebeziumAvro(DebeziumAvroSchema), Bytes,
135}
136
137impl LegacyRowFormat {
138 pub fn into_format_encode_v2(self) -> FormatEncodeOptions {
139 let (format, row_encode) = match self {
140 LegacyRowFormat::Protobuf(_) => (Format::Plain, Encode::Protobuf),
141 LegacyRowFormat::Json => (Format::Plain, Encode::Json),
142 LegacyRowFormat::DebeziumJson => (Format::Debezium, Encode::Json),
143 LegacyRowFormat::DebeziumMongoJson => (Format::DebeziumMongo, Encode::Json),
144 LegacyRowFormat::UpsertJson => (Format::Upsert, Encode::Json),
145 LegacyRowFormat::Avro(_) => (Format::Plain, Encode::Avro),
146 LegacyRowFormat::UpsertAvro(_) => (Format::Upsert, Encode::Avro),
147 LegacyRowFormat::Maxwell => (Format::Maxwell, Encode::Json),
148 LegacyRowFormat::CanalJson => (Format::Canal, Encode::Json),
149 LegacyRowFormat::Csv(_) => (Format::Plain, Encode::Csv),
150 LegacyRowFormat::DebeziumAvro(_) => (Format::Debezium, Encode::Avro),
151 LegacyRowFormat::Bytes => (Format::Plain, Encode::Bytes),
152 LegacyRowFormat::Native => (Format::Native, Encode::Native),
153 };
154
155 let row_options = match self {
156 LegacyRowFormat::Protobuf(schema) => {
157 let mut options = vec![SqlOption {
158 name: ObjectName(vec![Ident {
159 value: "message".into(),
160 quote_style: None,
161 }]),
162 value: Value::SingleQuotedString(schema.message_name.0).into(),
163 }];
164 if schema.use_schema_registry {
165 options.push(SqlOption {
166 name: ObjectName(vec![Ident {
167 value: "schema.registry".into(),
168 quote_style: None,
169 }]),
170 value: Value::SingleQuotedString(schema.row_schema_location.0).into(),
171 });
172 } else {
173 options.push(SqlOption {
174 name: ObjectName(vec![Ident {
175 value: "schema.location".into(),
176 quote_style: None,
177 }]),
178 value: Value::SingleQuotedString(schema.row_schema_location.0).into(),
179 })
180 }
181 options
182 }
183 LegacyRowFormat::Avro(schema) | LegacyRowFormat::UpsertAvro(schema) => {
184 if schema.use_schema_registry {
185 vec![SqlOption {
186 name: ObjectName(vec![Ident {
187 value: "schema.registry".into(),
188 quote_style: None,
189 }]),
190 value: Value::SingleQuotedString(schema.row_schema_location.0).into(),
191 }]
192 } else {
193 vec![SqlOption {
194 name: ObjectName(vec![Ident {
195 value: "schema.location".into(),
196 quote_style: None,
197 }]),
198 value: Value::SingleQuotedString(schema.row_schema_location.0).into(),
199 }]
200 }
201 }
202 LegacyRowFormat::DebeziumAvro(schema) => {
203 vec![SqlOption {
204 name: ObjectName(vec![Ident {
205 value: "schema.registry".into(),
206 quote_style: None,
207 }]),
208 value: Value::SingleQuotedString(schema.row_schema_location.0).into(),
209 }]
210 }
211 LegacyRowFormat::Csv(schema) => {
212 vec![
213 SqlOption {
214 name: ObjectName(vec![Ident {
215 value: "delimiter".into(),
216 quote_style: None,
217 }]),
218 value: Value::SingleQuotedString(
219 String::from_utf8_lossy(&[schema.delimiter]).into(),
220 )
221 .into(),
222 },
223 SqlOption {
224 name: ObjectName(vec![Ident {
225 value: "without_header".into(),
226 quote_style: None,
227 }]),
228 value: Value::SingleQuotedString(if schema.has_header {
229 "false".into()
230 } else {
231 "true".into()
232 })
233 .into(),
234 },
235 ]
236 }
237 _ => vec![],
238 };
239
240 FormatEncodeOptions {
241 format,
242 row_encode,
243 row_options,
244 key_encode: None,
245 }
246 }
247}
248
249impl fmt::Display for LegacyRowFormat {
250 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
251 write!(f, "ROW FORMAT ")?;
252 match self {
253 LegacyRowFormat::Protobuf(protobuf_schema) => {
254 write!(f, "PROTOBUF {}", protobuf_schema)
255 }
256 LegacyRowFormat::Json => write!(f, "JSON"),
257 LegacyRowFormat::UpsertJson => write!(f, "UPSERT_JSON"),
258 LegacyRowFormat::Maxwell => write!(f, "MAXWELL"),
259 LegacyRowFormat::DebeziumJson => write!(f, "DEBEZIUM_JSON"),
260 LegacyRowFormat::DebeziumMongoJson => write!(f, "DEBEZIUM_MONGO_JSON"),
261 LegacyRowFormat::Avro(avro_schema) => write!(f, "AVRO {}", avro_schema),
262 LegacyRowFormat::UpsertAvro(avro_schema) => write!(f, "UPSERT_AVRO {}", avro_schema),
263 LegacyRowFormat::CanalJson => write!(f, "CANAL_JSON"),
264 LegacyRowFormat::Csv(csv_info) => write!(f, "CSV {}", csv_info),
265 LegacyRowFormat::Native => write!(f, "NATIVE"),
266 LegacyRowFormat::DebeziumAvro(avro_schema) => {
267 write!(f, "DEBEZIUM_AVRO {}", avro_schema)
268 }
269 LegacyRowFormat::Bytes => write!(f, "BYTES"),
270 }
271 }
272}
273
274#[derive(Debug, Clone, PartialEq, Eq, Hash)]
281pub struct ProtobufSchema {
282 pub message_name: AstString,
283 pub row_schema_location: AstString,
284 pub use_schema_registry: bool,
285}
286
287impl ParseTo for ProtobufSchema {
288 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
289 impl_parse_to!([Keyword::MESSAGE], p);
290 impl_parse_to!(message_name: AstString, p);
291 impl_parse_to!([Keyword::ROW, Keyword::SCHEMA, Keyword::LOCATION], p);
292 impl_parse_to!(use_schema_registry => [Keyword::CONFLUENT, Keyword::SCHEMA, Keyword::REGISTRY], p);
293 impl_parse_to!(row_schema_location: AstString, p);
294 Ok(Self {
295 message_name,
296 row_schema_location,
297 use_schema_registry,
298 })
299 }
300}
301
302impl fmt::Display for ProtobufSchema {
303 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
304 let mut v: Vec<String> = vec![];
305 impl_fmt_display!([Keyword::MESSAGE], v);
306 impl_fmt_display!(message_name, v, self);
307 impl_fmt_display!([Keyword::ROW, Keyword::SCHEMA, Keyword::LOCATION], v);
308 impl_fmt_display!(use_schema_registry => [Keyword::CONFLUENT, Keyword::SCHEMA, Keyword::REGISTRY], v, self);
309 impl_fmt_display!(row_schema_location, v, self);
310 v.iter().join(" ").fmt(f)
311 }
312}
313
314#[derive(Debug, Clone, PartialEq, Eq, Hash)]
319pub struct AvroSchema {
320 pub row_schema_location: AstString,
321 pub use_schema_registry: bool,
322}
323
324impl ParseTo for AvroSchema {
325 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
326 impl_parse_to!([Keyword::ROW, Keyword::SCHEMA, Keyword::LOCATION], p);
327 impl_parse_to!(use_schema_registry => [Keyword::CONFLUENT, Keyword::SCHEMA, Keyword::REGISTRY], p);
328 impl_parse_to!(row_schema_location: AstString, p);
329 Ok(Self {
330 row_schema_location,
331 use_schema_registry,
332 })
333 }
334}
335
336impl fmt::Display for AvroSchema {
337 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
338 let mut v: Vec<String> = vec![];
339 impl_fmt_display!([Keyword::ROW, Keyword::SCHEMA, Keyword::LOCATION], v);
340 impl_fmt_display!(use_schema_registry => [Keyword::CONFLUENT, Keyword::SCHEMA, Keyword::REGISTRY], v, self);
341 impl_fmt_display!(row_schema_location, v, self);
342 v.iter().join(" ").fmt(f)
343 }
344}
345
346#[derive(Debug, Clone, PartialEq, Eq, Hash)]
347pub struct DebeziumAvroSchema {
348 pub row_schema_location: AstString,
349}
350
351impl fmt::Display for DebeziumAvroSchema {
352 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
353 let mut v: Vec<String> = vec![];
354 impl_fmt_display!(
355 [
356 Keyword::ROW,
357 Keyword::SCHEMA,
358 Keyword::LOCATION,
359 Keyword::CONFLUENT,
360 Keyword::SCHEMA,
361 Keyword::REGISTRY
362 ],
363 v
364 );
365 impl_fmt_display!(row_schema_location, v, self);
366 v.iter().join(" ").fmt(f)
367 }
368}
369
370impl ParseTo for DebeziumAvroSchema {
371 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
372 impl_parse_to!(
373 [
374 Keyword::ROW,
375 Keyword::SCHEMA,
376 Keyword::LOCATION,
377 Keyword::CONFLUENT,
378 Keyword::SCHEMA,
379 Keyword::REGISTRY
380 ],
381 p
382 );
383 impl_parse_to!(row_schema_location: AstString, p);
384 Ok(Self {
385 row_schema_location,
386 })
387 }
388}
389
390#[derive(Debug, Clone, PartialEq, Eq, Hash)]
391pub struct CsvInfo {
392 pub delimiter: u8,
393 pub has_header: bool,
394}
395
396pub fn get_delimiter(chars: &str) -> Result<u8, StrError> {
397 match chars {
398 "," => Ok(b','), ";" => Ok(b';'), "\t" => Ok(b'\t'), other => Err(StrError(format!(
402 "The delimiter should be one of ',', ';', E'\\t', but got {other:?}",
403 ))),
404 }
405}
406
407impl ParseTo for CsvInfo {
408 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
409 impl_parse_to!(without_header => [Keyword::WITHOUT, Keyword::HEADER], p);
410 impl_parse_to!([Keyword::DELIMITED, Keyword::BY], p);
411 impl_parse_to!(delimiter: AstString, p);
412 let delimiter = get_delimiter(delimiter.0.as_str())?;
413 Ok(Self {
414 delimiter,
415 has_header: !without_header,
416 })
417 }
418}
419
420impl fmt::Display for CsvInfo {
421 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
422 if !self.has_header {
423 write!(f, "WITHOUT HEADER ")?;
424 }
425 write!(
426 f,
427 "DELIMITED BY {}",
428 AstString((self.delimiter as char).to_string())
429 )?;
430 Ok(())
431 }
432}