1use std::fmt;
19
20use itertools::Itertools as _;
21#[cfg(feature = "serde")]
22use serde::{Deserialize, Serialize};
23use winnow::ModalResult;
24
25use crate::ast::{
26 AstString, Encode, Format, FormatEncodeOptions, Ident, ObjectName, ParseTo, SqlOption, Value,
27 display_separated,
28};
29use crate::keywords::Keyword;
30use crate::parser::{Parser, StrError};
31use crate::{impl_fmt_display, impl_parse_to, parser_err};
32
33#[derive(Debug, Clone, PartialEq, Eq, Hash)]
34#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
35pub enum CompatibleFormatEncode {
36 RowFormat(LegacyRowFormat),
37 V2(FormatEncodeOptions),
38}
39
40impl fmt::Display for CompatibleFormatEncode {
41 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
42 match self {
43 CompatibleFormatEncode::RowFormat(inner) => {
44 write!(f, "{}", inner)
45 }
46 CompatibleFormatEncode::V2(inner) => {
47 write!(f, "{}", inner)
48 }
49 }
50 }
51}
52
53impl CompatibleFormatEncode {
54 pub(crate) fn into_v2(self) -> FormatEncodeOptions {
55 match self {
56 CompatibleFormatEncode::RowFormat(inner) => inner.into_format_encode_v2(),
57 CompatibleFormatEncode::V2(inner) => inner,
58 }
59 }
60}
61
62impl From<FormatEncodeOptions> for CompatibleFormatEncode {
63 fn from(value: FormatEncodeOptions) -> Self {
64 Self::V2(value)
65 }
66}
67
68pub fn parse_format_encode(p: &mut Parser<'_>) -> ModalResult<CompatibleFormatEncode> {
69 if let Some(schema_v2) = p.parse_schema()? {
70 if schema_v2.key_encode.is_some() {
71 parser_err!("key encode clause is not supported in source schema");
72 }
73 Ok(CompatibleFormatEncode::V2(schema_v2))
74 } else if p.peek_nth_any_of_keywords(0, &[Keyword::ROW])
75 && p.peek_nth_any_of_keywords(1, &[Keyword::FORMAT])
76 {
77 p.expect_keyword(Keyword::ROW)?;
78 p.expect_keyword(Keyword::FORMAT)?;
79 let id = p.parse_identifier()?;
80 let value = id.value.to_ascii_uppercase();
81 let schema = match &value[..] {
82 "JSON" => LegacyRowFormat::Json,
83 "UPSERT_JSON" => LegacyRowFormat::UpsertJson,
84 "PROTOBUF" => {
85 impl_parse_to!(protobuf_schema: ProtobufSchema, p);
86 LegacyRowFormat::Protobuf(protobuf_schema)
87 }
88 "DEBEZIUM_JSON" => LegacyRowFormat::DebeziumJson,
89 "DEBEZIUM_MONGO_JSON" => LegacyRowFormat::DebeziumMongoJson,
90 "AVRO" => {
91 impl_parse_to!(avro_schema: AvroSchema, p);
92 LegacyRowFormat::Avro(avro_schema)
93 }
94 "UPSERT_AVRO" => {
95 impl_parse_to!(avro_schema: AvroSchema, p);
96 LegacyRowFormat::UpsertAvro(avro_schema)
97 }
98 "MAXWELL" => LegacyRowFormat::Maxwell,
99 "CANAL_JSON" => LegacyRowFormat::CanalJson,
100 "CSV" => {
101 impl_parse_to!(csv_info: CsvInfo, p);
102 LegacyRowFormat::Csv(csv_info)
103 }
104 "NATIVE" => LegacyRowFormat::Native, "DEBEZIUM_AVRO" => {
106 impl_parse_to!(avro_schema: DebeziumAvroSchema, p);
107 LegacyRowFormat::DebeziumAvro(avro_schema)
108 }
109 "BYTES" => LegacyRowFormat::Bytes,
110 _ => {
111 parser_err!(
112 "expected JSON | UPSERT_JSON | PROTOBUF | DEBEZIUM_JSON | DEBEZIUM_AVRO \
113 | AVRO | UPSERT_AVRO | MAXWELL | CANAL_JSON | BYTES | NATIVE after ROW FORMAT"
114 );
115 }
116 };
117 Ok(CompatibleFormatEncode::RowFormat(schema))
118 } else {
119 p.expected("description of the format")
120 }
121}
122
123#[derive(Debug, Clone, PartialEq, Eq, Hash)]
124#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
125pub enum LegacyRowFormat {
126 Protobuf(ProtobufSchema), Json, DebeziumJson, DebeziumMongoJson,
130 UpsertJson, Avro(AvroSchema), UpsertAvro(AvroSchema), Maxwell, CanalJson, Csv(CsvInfo), Native,
137 DebeziumAvro(DebeziumAvroSchema), Bytes,
139}
140
141impl LegacyRowFormat {
142 pub fn into_format_encode_v2(self) -> FormatEncodeOptions {
143 let (format, row_encode) = match self {
144 LegacyRowFormat::Protobuf(_) => (Format::Plain, Encode::Protobuf),
145 LegacyRowFormat::Json => (Format::Plain, Encode::Json),
146 LegacyRowFormat::DebeziumJson => (Format::Debezium, Encode::Json),
147 LegacyRowFormat::DebeziumMongoJson => (Format::DebeziumMongo, Encode::Json),
148 LegacyRowFormat::UpsertJson => (Format::Upsert, Encode::Json),
149 LegacyRowFormat::Avro(_) => (Format::Plain, Encode::Avro),
150 LegacyRowFormat::UpsertAvro(_) => (Format::Upsert, Encode::Avro),
151 LegacyRowFormat::Maxwell => (Format::Maxwell, Encode::Json),
152 LegacyRowFormat::CanalJson => (Format::Canal, Encode::Json),
153 LegacyRowFormat::Csv(_) => (Format::Plain, Encode::Csv),
154 LegacyRowFormat::DebeziumAvro(_) => (Format::Debezium, Encode::Avro),
155 LegacyRowFormat::Bytes => (Format::Plain, Encode::Bytes),
156 LegacyRowFormat::Native => (Format::Native, Encode::Native),
157 };
158
159 let row_options = match self {
160 LegacyRowFormat::Protobuf(schema) => {
161 let mut options = vec![SqlOption {
162 name: ObjectName(vec![Ident {
163 value: "message".into(),
164 quote_style: None,
165 }]),
166 value: Value::SingleQuotedString(schema.message_name.0).into(),
167 }];
168 if schema.use_schema_registry {
169 options.push(SqlOption {
170 name: ObjectName(vec![Ident {
171 value: "schema.registry".into(),
172 quote_style: None,
173 }]),
174 value: Value::SingleQuotedString(schema.row_schema_location.0).into(),
175 });
176 } else {
177 options.push(SqlOption {
178 name: ObjectName(vec![Ident {
179 value: "schema.location".into(),
180 quote_style: None,
181 }]),
182 value: Value::SingleQuotedString(schema.row_schema_location.0).into(),
183 })
184 }
185 options
186 }
187 LegacyRowFormat::Avro(schema) | LegacyRowFormat::UpsertAvro(schema) => {
188 if schema.use_schema_registry {
189 vec![SqlOption {
190 name: ObjectName(vec![Ident {
191 value: "schema.registry".into(),
192 quote_style: None,
193 }]),
194 value: Value::SingleQuotedString(schema.row_schema_location.0).into(),
195 }]
196 } else {
197 vec![SqlOption {
198 name: ObjectName(vec![Ident {
199 value: "schema.location".into(),
200 quote_style: None,
201 }]),
202 value: Value::SingleQuotedString(schema.row_schema_location.0).into(),
203 }]
204 }
205 }
206 LegacyRowFormat::DebeziumAvro(schema) => {
207 vec![SqlOption {
208 name: ObjectName(vec![Ident {
209 value: "schema.registry".into(),
210 quote_style: None,
211 }]),
212 value: Value::SingleQuotedString(schema.row_schema_location.0).into(),
213 }]
214 }
215 LegacyRowFormat::Csv(schema) => {
216 vec![
217 SqlOption {
218 name: ObjectName(vec![Ident {
219 value: "delimiter".into(),
220 quote_style: None,
221 }]),
222 value: Value::SingleQuotedString(
223 String::from_utf8_lossy(&[schema.delimiter]).into(),
224 )
225 .into(),
226 },
227 SqlOption {
228 name: ObjectName(vec![Ident {
229 value: "without_header".into(),
230 quote_style: None,
231 }]),
232 value: Value::SingleQuotedString(if schema.has_header {
233 "false".into()
234 } else {
235 "true".into()
236 })
237 .into(),
238 },
239 ]
240 }
241 _ => vec![],
242 };
243
244 FormatEncodeOptions {
245 format,
246 row_encode,
247 row_options,
248 key_encode: None,
249 }
250 }
251}
252
253impl fmt::Display for LegacyRowFormat {
254 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
255 write!(f, "ROW FORMAT ")?;
256 match self {
257 LegacyRowFormat::Protobuf(protobuf_schema) => {
258 write!(f, "PROTOBUF {}", protobuf_schema)
259 }
260 LegacyRowFormat::Json => write!(f, "JSON"),
261 LegacyRowFormat::UpsertJson => write!(f, "UPSERT_JSON"),
262 LegacyRowFormat::Maxwell => write!(f, "MAXWELL"),
263 LegacyRowFormat::DebeziumJson => write!(f, "DEBEZIUM_JSON"),
264 LegacyRowFormat::DebeziumMongoJson => write!(f, "DEBEZIUM_MONGO_JSON"),
265 LegacyRowFormat::Avro(avro_schema) => write!(f, "AVRO {}", avro_schema),
266 LegacyRowFormat::UpsertAvro(avro_schema) => write!(f, "UPSERT_AVRO {}", avro_schema),
267 LegacyRowFormat::CanalJson => write!(f, "CANAL_JSON"),
268 LegacyRowFormat::Csv(csv_info) => write!(f, "CSV {}", csv_info),
269 LegacyRowFormat::Native => write!(f, "NATIVE"),
270 LegacyRowFormat::DebeziumAvro(avro_schema) => {
271 write!(f, "DEBEZIUM_AVRO {}", avro_schema)
272 }
273 LegacyRowFormat::Bytes => write!(f, "BYTES"),
274 }
275 }
276}
277
278#[derive(Debug, Clone, PartialEq, Eq, Hash)]
285#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
286pub struct ProtobufSchema {
287 pub message_name: AstString,
288 pub row_schema_location: AstString,
289 pub use_schema_registry: bool,
290}
291
292impl ParseTo for ProtobufSchema {
293 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
294 impl_parse_to!([Keyword::MESSAGE], p);
295 impl_parse_to!(message_name: AstString, p);
296 impl_parse_to!([Keyword::ROW, Keyword::SCHEMA, Keyword::LOCATION], p);
297 impl_parse_to!(use_schema_registry => [Keyword::CONFLUENT, Keyword::SCHEMA, Keyword::REGISTRY], p);
298 impl_parse_to!(row_schema_location: AstString, p);
299 Ok(Self {
300 message_name,
301 row_schema_location,
302 use_schema_registry,
303 })
304 }
305}
306
307impl fmt::Display for ProtobufSchema {
308 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
309 let mut v: Vec<String> = vec![];
310 impl_fmt_display!([Keyword::MESSAGE], v);
311 impl_fmt_display!(message_name, v, self);
312 impl_fmt_display!([Keyword::ROW, Keyword::SCHEMA, Keyword::LOCATION], v);
313 impl_fmt_display!(use_schema_registry => [Keyword::CONFLUENT, Keyword::SCHEMA, Keyword::REGISTRY], v, self);
314 impl_fmt_display!(row_schema_location, v, self);
315 v.iter().join(" ").fmt(f)
316 }
317}
318
319#[derive(Debug, Clone, PartialEq, Eq, Hash)]
324#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
325pub struct AvroSchema {
326 pub row_schema_location: AstString,
327 pub use_schema_registry: bool,
328}
329
330impl ParseTo for AvroSchema {
331 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
332 impl_parse_to!([Keyword::ROW, Keyword::SCHEMA, Keyword::LOCATION], p);
333 impl_parse_to!(use_schema_registry => [Keyword::CONFLUENT, Keyword::SCHEMA, Keyword::REGISTRY], p);
334 impl_parse_to!(row_schema_location: AstString, p);
335 Ok(Self {
336 row_schema_location,
337 use_schema_registry,
338 })
339 }
340}
341
342impl fmt::Display for AvroSchema {
343 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
344 let mut v: Vec<String> = vec![];
345 impl_fmt_display!([Keyword::ROW, Keyword::SCHEMA, Keyword::LOCATION], v);
346 impl_fmt_display!(use_schema_registry => [Keyword::CONFLUENT, Keyword::SCHEMA, Keyword::REGISTRY], v, self);
347 impl_fmt_display!(row_schema_location, v, self);
348 v.iter().join(" ").fmt(f)
349 }
350}
351
352#[derive(Debug, Clone, PartialEq, Eq, Hash)]
353#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
354pub struct DebeziumAvroSchema {
355 pub row_schema_location: AstString,
356}
357
358impl fmt::Display for DebeziumAvroSchema {
359 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
360 let mut v: Vec<String> = vec![];
361 impl_fmt_display!(
362 [
363 Keyword::ROW,
364 Keyword::SCHEMA,
365 Keyword::LOCATION,
366 Keyword::CONFLUENT,
367 Keyword::SCHEMA,
368 Keyword::REGISTRY
369 ],
370 v
371 );
372 impl_fmt_display!(row_schema_location, v, self);
373 v.iter().join(" ").fmt(f)
374 }
375}
376
377impl ParseTo for DebeziumAvroSchema {
378 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
379 impl_parse_to!(
380 [
381 Keyword::ROW,
382 Keyword::SCHEMA,
383 Keyword::LOCATION,
384 Keyword::CONFLUENT,
385 Keyword::SCHEMA,
386 Keyword::REGISTRY
387 ],
388 p
389 );
390 impl_parse_to!(row_schema_location: AstString, p);
391 Ok(Self {
392 row_schema_location,
393 })
394 }
395}
396
397#[derive(Debug, Clone, PartialEq, Eq, Hash)]
398#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
399pub struct CsvInfo {
400 pub delimiter: u8,
401 pub has_header: bool,
402}
403
404pub fn get_delimiter(chars: &str) -> Result<u8, StrError> {
405 match chars {
406 "," => Ok(b','), ";" => Ok(b';'), "\t" => Ok(b'\t'), other => Err(StrError(format!(
410 "The delimiter should be one of ',', ';', E'\\t', but got {other:?}",
411 ))),
412 }
413}
414
415impl ParseTo for CsvInfo {
416 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
417 impl_parse_to!(without_header => [Keyword::WITHOUT, Keyword::HEADER], p);
418 impl_parse_to!([Keyword::DELIMITED, Keyword::BY], p);
419 impl_parse_to!(delimiter: AstString, p);
420 let delimiter = get_delimiter(delimiter.0.as_str())?;
421 Ok(Self {
422 delimiter,
423 has_header: !without_header,
424 })
425 }
426}
427
428impl fmt::Display for CsvInfo {
429 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
430 if !self.has_header {
431 write!(f, "WITHOUT HEADER ")?;
432 }
433 write!(
434 f,
435 "DELIMITED BY {}",
436 AstString((self.delimiter as char).to_string())
437 )?;
438 Ok(())
439 }
440}