risingwave_sqlparser/ast/
legacy_source.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Content of this file can be deleted once we stop supporting `create source` syntax v1.
16//! New features shall NOT touch this file.
17
18use std::fmt;
19
20use itertools::Itertools as _;
21use winnow::ModalResult;
22
23use crate::ast::{
24    AstString, Encode, Format, FormatEncodeOptions, Ident, ObjectName, ParseTo, SqlOption, Value,
25    display_separated,
26};
27use crate::keywords::Keyword;
28use crate::parser::{Parser, StrError};
29use crate::{impl_fmt_display, impl_parse_to, parser_err};
30
31#[derive(Debug, Clone, PartialEq, Eq, Hash)]
32pub enum CompatibleFormatEncode {
33    RowFormat(LegacyRowFormat),
34    V2(FormatEncodeOptions),
35}
36
37impl fmt::Display for CompatibleFormatEncode {
38    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
39        match self {
40            CompatibleFormatEncode::RowFormat(inner) => {
41                write!(f, "{}", inner)
42            }
43            CompatibleFormatEncode::V2(inner) => {
44                write!(f, "{}", inner)
45            }
46        }
47    }
48}
49
50impl CompatibleFormatEncode {
51    pub(crate) fn into_v2(self) -> FormatEncodeOptions {
52        match self {
53            CompatibleFormatEncode::RowFormat(inner) => inner.into_format_encode_v2(),
54            CompatibleFormatEncode::V2(inner) => inner,
55        }
56    }
57}
58
59impl From<FormatEncodeOptions> for CompatibleFormatEncode {
60    fn from(value: FormatEncodeOptions) -> Self {
61        Self::V2(value)
62    }
63}
64
65pub fn parse_format_encode(p: &mut Parser<'_>) -> ModalResult<CompatibleFormatEncode> {
66    if let Some(schema_v2) = p.parse_schema()? {
67        if schema_v2.key_encode.is_some() {
68            parser_err!("key encode clause is not supported in source schema");
69        }
70        Ok(CompatibleFormatEncode::V2(schema_v2))
71    } else if p.peek_nth_any_of_keywords(0, &[Keyword::ROW])
72        && p.peek_nth_any_of_keywords(1, &[Keyword::FORMAT])
73    {
74        p.expect_keyword(Keyword::ROW)?;
75        p.expect_keyword(Keyword::FORMAT)?;
76        let id = p.parse_identifier()?;
77        let value = id.real_value();
78        let schema = match &value[..] {
79            "json" => LegacyRowFormat::Json,
80            "upsert_json" => LegacyRowFormat::UpsertJson,
81            "protobuf" => {
82                impl_parse_to!(protobuf_schema: ProtobufSchema, p);
83                LegacyRowFormat::Protobuf(protobuf_schema)
84            }
85            "debezium_json" => LegacyRowFormat::DebeziumJson,
86            "debezium_mongo_json" => LegacyRowFormat::DebeziumMongoJson,
87            "avro" => {
88                impl_parse_to!(avro_schema: AvroSchema, p);
89                LegacyRowFormat::Avro(avro_schema)
90            }
91            "upsert_avro" => {
92                impl_parse_to!(avro_schema: AvroSchema, p);
93                LegacyRowFormat::UpsertAvro(avro_schema)
94            }
95            "maxwell" => LegacyRowFormat::Maxwell,
96            "canal_json" => LegacyRowFormat::CanalJson,
97            "csv" => {
98                impl_parse_to!(csv_info: CsvInfo, p);
99                LegacyRowFormat::Csv(csv_info)
100            }
101            "native" => LegacyRowFormat::Native, // used internally by schema change
102            "debezium_avro" => {
103                impl_parse_to!(avro_schema: DebeziumAvroSchema, p);
104                LegacyRowFormat::DebeziumAvro(avro_schema)
105            }
106            "bytes" => LegacyRowFormat::Bytes,
107            _ => {
108                parser_err!(
109                    "expected JSON | UPSERT_JSON | PROTOBUF | DEBEZIUM_JSON | DEBEZIUM_AVRO \
110                    | AVRO | UPSERT_AVRO | MAXWELL | CANAL_JSON | BYTES | NATIVE after ROW FORMAT"
111                );
112            }
113        };
114        Ok(CompatibleFormatEncode::RowFormat(schema))
115    } else {
116        p.expected("description of the format")
117    }
118}
119
120#[derive(Debug, Clone, PartialEq, Eq, Hash)]
121pub enum LegacyRowFormat {
122    Protobuf(ProtobufSchema), // Keyword::PROTOBUF ProtobufSchema
123    Json,                     // Keyword::JSON
124    DebeziumJson,             // Keyword::DEBEZIUM_JSON
125    DebeziumMongoJson,
126    UpsertJson,             // Keyword::UPSERT_JSON
127    Avro(AvroSchema),       // Keyword::AVRO
128    UpsertAvro(AvroSchema), // Keyword::UpsertAVRO
129    Maxwell,                // Keyword::MAXWELL
130    CanalJson,              // Keyword::CANAL_JSON
131    Csv(CsvInfo),           // Keyword::CSV
132    Native,
133    DebeziumAvro(DebeziumAvroSchema), // Keyword::DEBEZIUM_AVRO
134    Bytes,
135}
136
137impl LegacyRowFormat {
138    pub fn into_format_encode_v2(self) -> FormatEncodeOptions {
139        let (format, row_encode) = match self {
140            LegacyRowFormat::Protobuf(_) => (Format::Plain, Encode::Protobuf),
141            LegacyRowFormat::Json => (Format::Plain, Encode::Json),
142            LegacyRowFormat::DebeziumJson => (Format::Debezium, Encode::Json),
143            LegacyRowFormat::DebeziumMongoJson => (Format::DebeziumMongo, Encode::Json),
144            LegacyRowFormat::UpsertJson => (Format::Upsert, Encode::Json),
145            LegacyRowFormat::Avro(_) => (Format::Plain, Encode::Avro),
146            LegacyRowFormat::UpsertAvro(_) => (Format::Upsert, Encode::Avro),
147            LegacyRowFormat::Maxwell => (Format::Maxwell, Encode::Json),
148            LegacyRowFormat::CanalJson => (Format::Canal, Encode::Json),
149            LegacyRowFormat::Csv(_) => (Format::Plain, Encode::Csv),
150            LegacyRowFormat::DebeziumAvro(_) => (Format::Debezium, Encode::Avro),
151            LegacyRowFormat::Bytes => (Format::Plain, Encode::Bytes),
152            LegacyRowFormat::Native => (Format::Native, Encode::Native),
153        };
154
155        let row_options = match self {
156            LegacyRowFormat::Protobuf(schema) => {
157                let mut options = vec![SqlOption {
158                    name: ObjectName(vec![Ident {
159                        value: "message".into(),
160                        quote_style: None,
161                    }]),
162                    value: Value::SingleQuotedString(schema.message_name.0).into(),
163                }];
164                if schema.use_schema_registry {
165                    options.push(SqlOption {
166                        name: ObjectName(vec![Ident {
167                            value: "schema.registry".into(),
168                            quote_style: None,
169                        }]),
170                        value: Value::SingleQuotedString(schema.row_schema_location.0).into(),
171                    });
172                } else {
173                    options.push(SqlOption {
174                        name: ObjectName(vec![Ident {
175                            value: "schema.location".into(),
176                            quote_style: None,
177                        }]),
178                        value: Value::SingleQuotedString(schema.row_schema_location.0).into(),
179                    })
180                }
181                options
182            }
183            LegacyRowFormat::Avro(schema) | LegacyRowFormat::UpsertAvro(schema) => {
184                if schema.use_schema_registry {
185                    vec![SqlOption {
186                        name: ObjectName(vec![Ident {
187                            value: "schema.registry".into(),
188                            quote_style: None,
189                        }]),
190                        value: Value::SingleQuotedString(schema.row_schema_location.0).into(),
191                    }]
192                } else {
193                    vec![SqlOption {
194                        name: ObjectName(vec![Ident {
195                            value: "schema.location".into(),
196                            quote_style: None,
197                        }]),
198                        value: Value::SingleQuotedString(schema.row_schema_location.0).into(),
199                    }]
200                }
201            }
202            LegacyRowFormat::DebeziumAvro(schema) => {
203                vec![SqlOption {
204                    name: ObjectName(vec![Ident {
205                        value: "schema.registry".into(),
206                        quote_style: None,
207                    }]),
208                    value: Value::SingleQuotedString(schema.row_schema_location.0).into(),
209                }]
210            }
211            LegacyRowFormat::Csv(schema) => {
212                vec![
213                    SqlOption {
214                        name: ObjectName(vec![Ident {
215                            value: "delimiter".into(),
216                            quote_style: None,
217                        }]),
218                        value: Value::SingleQuotedString(
219                            String::from_utf8_lossy(&[schema.delimiter]).into(),
220                        )
221                        .into(),
222                    },
223                    SqlOption {
224                        name: ObjectName(vec![Ident {
225                            value: "without_header".into(),
226                            quote_style: None,
227                        }]),
228                        value: Value::SingleQuotedString(if schema.has_header {
229                            "false".into()
230                        } else {
231                            "true".into()
232                        })
233                        .into(),
234                    },
235                ]
236            }
237            _ => vec![],
238        };
239
240        FormatEncodeOptions {
241            format,
242            row_encode,
243            row_options,
244            key_encode: None,
245        }
246    }
247}
248
249impl fmt::Display for LegacyRowFormat {
250    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
251        write!(f, "ROW FORMAT ")?;
252        match self {
253            LegacyRowFormat::Protobuf(protobuf_schema) => {
254                write!(f, "PROTOBUF {}", protobuf_schema)
255            }
256            LegacyRowFormat::Json => write!(f, "JSON"),
257            LegacyRowFormat::UpsertJson => write!(f, "UPSERT_JSON"),
258            LegacyRowFormat::Maxwell => write!(f, "MAXWELL"),
259            LegacyRowFormat::DebeziumJson => write!(f, "DEBEZIUM_JSON"),
260            LegacyRowFormat::DebeziumMongoJson => write!(f, "DEBEZIUM_MONGO_JSON"),
261            LegacyRowFormat::Avro(avro_schema) => write!(f, "AVRO {}", avro_schema),
262            LegacyRowFormat::UpsertAvro(avro_schema) => write!(f, "UPSERT_AVRO {}", avro_schema),
263            LegacyRowFormat::CanalJson => write!(f, "CANAL_JSON"),
264            LegacyRowFormat::Csv(csv_info) => write!(f, "CSV {}", csv_info),
265            LegacyRowFormat::Native => write!(f, "NATIVE"),
266            LegacyRowFormat::DebeziumAvro(avro_schema) => {
267                write!(f, "DEBEZIUM_AVRO {}", avro_schema)
268            }
269            LegacyRowFormat::Bytes => write!(f, "BYTES"),
270        }
271    }
272}
273
274// sql_grammar!(ProtobufSchema {
275//     [Keyword::MESSAGE],
276//     message_name: AstString,
277//     [Keyword::ROW, Keyword::SCHEMA, Keyword::LOCATION],
278//     row_schema_location: AstString,
279// });
280#[derive(Debug, Clone, PartialEq, Eq, Hash)]
281pub struct ProtobufSchema {
282    pub message_name: AstString,
283    pub row_schema_location: AstString,
284    pub use_schema_registry: bool,
285}
286
287impl ParseTo for ProtobufSchema {
288    fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
289        impl_parse_to!([Keyword::MESSAGE], p);
290        impl_parse_to!(message_name: AstString, p);
291        impl_parse_to!([Keyword::ROW, Keyword::SCHEMA, Keyword::LOCATION], p);
292        impl_parse_to!(use_schema_registry => [Keyword::CONFLUENT, Keyword::SCHEMA, Keyword::REGISTRY], p);
293        impl_parse_to!(row_schema_location: AstString, p);
294        Ok(Self {
295            message_name,
296            row_schema_location,
297            use_schema_registry,
298        })
299    }
300}
301
302impl fmt::Display for ProtobufSchema {
303    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
304        let mut v: Vec<String> = vec![];
305        impl_fmt_display!([Keyword::MESSAGE], v);
306        impl_fmt_display!(message_name, v, self);
307        impl_fmt_display!([Keyword::ROW, Keyword::SCHEMA, Keyword::LOCATION], v);
308        impl_fmt_display!(use_schema_registry => [Keyword::CONFLUENT, Keyword::SCHEMA, Keyword::REGISTRY], v, self);
309        impl_fmt_display!(row_schema_location, v, self);
310        v.iter().join(" ").fmt(f)
311    }
312}
313
314// sql_grammar!(AvroSchema {
315//     [Keyword::ROW, Keyword::SCHEMA, Keyword::LOCATION, [Keyword::CONFLUENT, Keyword::SCHEMA,
316// Keyword::REGISTRY]],     row_schema_location: AstString,
317// });
318#[derive(Debug, Clone, PartialEq, Eq, Hash)]
319pub struct AvroSchema {
320    pub row_schema_location: AstString,
321    pub use_schema_registry: bool,
322}
323
324impl ParseTo for AvroSchema {
325    fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
326        impl_parse_to!([Keyword::ROW, Keyword::SCHEMA, Keyword::LOCATION], p);
327        impl_parse_to!(use_schema_registry => [Keyword::CONFLUENT, Keyword::SCHEMA, Keyword::REGISTRY], p);
328        impl_parse_to!(row_schema_location: AstString, p);
329        Ok(Self {
330            row_schema_location,
331            use_schema_registry,
332        })
333    }
334}
335
336impl fmt::Display for AvroSchema {
337    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
338        let mut v: Vec<String> = vec![];
339        impl_fmt_display!([Keyword::ROW, Keyword::SCHEMA, Keyword::LOCATION], v);
340        impl_fmt_display!(use_schema_registry => [Keyword::CONFLUENT, Keyword::SCHEMA, Keyword::REGISTRY], v, self);
341        impl_fmt_display!(row_schema_location, v, self);
342        v.iter().join(" ").fmt(f)
343    }
344}
345
346#[derive(Debug, Clone, PartialEq, Eq, Hash)]
347pub struct DebeziumAvroSchema {
348    pub row_schema_location: AstString,
349}
350
351impl fmt::Display for DebeziumAvroSchema {
352    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
353        let mut v: Vec<String> = vec![];
354        impl_fmt_display!(
355            [
356                Keyword::ROW,
357                Keyword::SCHEMA,
358                Keyword::LOCATION,
359                Keyword::CONFLUENT,
360                Keyword::SCHEMA,
361                Keyword::REGISTRY
362            ],
363            v
364        );
365        impl_fmt_display!(row_schema_location, v, self);
366        v.iter().join(" ").fmt(f)
367    }
368}
369
370impl ParseTo for DebeziumAvroSchema {
371    fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
372        impl_parse_to!(
373            [
374                Keyword::ROW,
375                Keyword::SCHEMA,
376                Keyword::LOCATION,
377                Keyword::CONFLUENT,
378                Keyword::SCHEMA,
379                Keyword::REGISTRY
380            ],
381            p
382        );
383        impl_parse_to!(row_schema_location: AstString, p);
384        Ok(Self {
385            row_schema_location,
386        })
387    }
388}
389
390#[derive(Debug, Clone, PartialEq, Eq, Hash)]
391pub struct CsvInfo {
392    pub delimiter: u8,
393    pub has_header: bool,
394}
395
396pub fn get_delimiter(chars: &str) -> Result<u8, StrError> {
397    match chars {
398        "," => Ok(b','),   // comma
399        ";" => Ok(b';'),   // semicolon
400        "\t" => Ok(b'\t'), // tab
401        other => Err(StrError(format!(
402            "The delimiter should be one of ',', ';', E'\\t', but got {other:?}",
403        ))),
404    }
405}
406
407impl ParseTo for CsvInfo {
408    fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
409        impl_parse_to!(without_header => [Keyword::WITHOUT, Keyword::HEADER], p);
410        impl_parse_to!([Keyword::DELIMITED, Keyword::BY], p);
411        impl_parse_to!(delimiter: AstString, p);
412        let delimiter = get_delimiter(delimiter.0.as_str())?;
413        Ok(Self {
414            delimiter,
415            has_header: !without_header,
416        })
417    }
418}
419
420impl fmt::Display for CsvInfo {
421    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
422        if !self.has_header {
423            write!(f, "WITHOUT HEADER ")?;
424        }
425        write!(
426            f,
427            "DELIMITED BY {}",
428            AstString((self.delimiter as char).to_string())
429        )?;
430        Ok(())
431    }
432}