risingwave_sqlparser/ast/
legacy_source.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Content of this file can be deleted once we stop supporting `create source` syntax v1.
16//! New features shall NOT touch this file.
17
18use std::fmt;
19
20use itertools::Itertools as _;
21#[cfg(feature = "serde")]
22use serde::{Deserialize, Serialize};
23use winnow::ModalResult;
24
25use crate::ast::{
26    AstString, Encode, Format, FormatEncodeOptions, Ident, ObjectName, ParseTo, SqlOption, Value,
27    display_separated,
28};
29use crate::keywords::Keyword;
30use crate::parser::{Parser, StrError};
31use crate::{impl_fmt_display, impl_parse_to, parser_err};
32
33#[derive(Debug, Clone, PartialEq, Eq, Hash)]
34#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
35pub enum CompatibleFormatEncode {
36    RowFormat(LegacyRowFormat),
37    V2(FormatEncodeOptions),
38}
39
40impl fmt::Display for CompatibleFormatEncode {
41    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
42        match self {
43            CompatibleFormatEncode::RowFormat(inner) => {
44                write!(f, "{}", inner)
45            }
46            CompatibleFormatEncode::V2(inner) => {
47                write!(f, "{}", inner)
48            }
49        }
50    }
51}
52
53impl CompatibleFormatEncode {
54    pub(crate) fn into_v2(self) -> FormatEncodeOptions {
55        match self {
56            CompatibleFormatEncode::RowFormat(inner) => inner.into_format_encode_v2(),
57            CompatibleFormatEncode::V2(inner) => inner,
58        }
59    }
60}
61
62impl From<FormatEncodeOptions> for CompatibleFormatEncode {
63    fn from(value: FormatEncodeOptions) -> Self {
64        Self::V2(value)
65    }
66}
67
68pub fn parse_format_encode(p: &mut Parser<'_>) -> ModalResult<CompatibleFormatEncode> {
69    if let Some(schema_v2) = p.parse_schema()? {
70        if schema_v2.key_encode.is_some() {
71            parser_err!("key encode clause is not supported in source schema");
72        }
73        Ok(CompatibleFormatEncode::V2(schema_v2))
74    } else if p.peek_nth_any_of_keywords(0, &[Keyword::ROW])
75        && p.peek_nth_any_of_keywords(1, &[Keyword::FORMAT])
76    {
77        p.expect_keyword(Keyword::ROW)?;
78        p.expect_keyword(Keyword::FORMAT)?;
79        let id = p.parse_identifier()?;
80        let value = id.value.to_ascii_uppercase();
81        let schema = match &value[..] {
82            "JSON" => LegacyRowFormat::Json,
83            "UPSERT_JSON" => LegacyRowFormat::UpsertJson,
84            "PROTOBUF" => {
85                impl_parse_to!(protobuf_schema: ProtobufSchema, p);
86                LegacyRowFormat::Protobuf(protobuf_schema)
87            }
88            "DEBEZIUM_JSON" => LegacyRowFormat::DebeziumJson,
89            "DEBEZIUM_MONGO_JSON" => LegacyRowFormat::DebeziumMongoJson,
90            "AVRO" => {
91                impl_parse_to!(avro_schema: AvroSchema, p);
92                LegacyRowFormat::Avro(avro_schema)
93            }
94            "UPSERT_AVRO" => {
95                impl_parse_to!(avro_schema: AvroSchema, p);
96                LegacyRowFormat::UpsertAvro(avro_schema)
97            }
98            "MAXWELL" => LegacyRowFormat::Maxwell,
99            "CANAL_JSON" => LegacyRowFormat::CanalJson,
100            "CSV" => {
101                impl_parse_to!(csv_info: CsvInfo, p);
102                LegacyRowFormat::Csv(csv_info)
103            }
104            "NATIVE" => LegacyRowFormat::Native, // used internally by schema change
105            "DEBEZIUM_AVRO" => {
106                impl_parse_to!(avro_schema: DebeziumAvroSchema, p);
107                LegacyRowFormat::DebeziumAvro(avro_schema)
108            }
109            "BYTES" => LegacyRowFormat::Bytes,
110            _ => {
111                parser_err!(
112                    "expected JSON | UPSERT_JSON | PROTOBUF | DEBEZIUM_JSON | DEBEZIUM_AVRO \
113                    | AVRO | UPSERT_AVRO | MAXWELL | CANAL_JSON | BYTES | NATIVE after ROW FORMAT"
114                );
115            }
116        };
117        Ok(CompatibleFormatEncode::RowFormat(schema))
118    } else {
119        p.expected("description of the format")
120    }
121}
122
123#[derive(Debug, Clone, PartialEq, Eq, Hash)]
124#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
125pub enum LegacyRowFormat {
126    Protobuf(ProtobufSchema), // Keyword::PROTOBUF ProtobufSchema
127    Json,                     // Keyword::JSON
128    DebeziumJson,             // Keyword::DEBEZIUM_JSON
129    DebeziumMongoJson,
130    UpsertJson,             // Keyword::UPSERT_JSON
131    Avro(AvroSchema),       // Keyword::AVRO
132    UpsertAvro(AvroSchema), // Keyword::UpsertAVRO
133    Maxwell,                // Keyword::MAXWELL
134    CanalJson,              // Keyword::CANAL_JSON
135    Csv(CsvInfo),           // Keyword::CSV
136    Native,
137    DebeziumAvro(DebeziumAvroSchema), // Keyword::DEBEZIUM_AVRO
138    Bytes,
139}
140
141impl LegacyRowFormat {
142    pub fn into_format_encode_v2(self) -> FormatEncodeOptions {
143        let (format, row_encode) = match self {
144            LegacyRowFormat::Protobuf(_) => (Format::Plain, Encode::Protobuf),
145            LegacyRowFormat::Json => (Format::Plain, Encode::Json),
146            LegacyRowFormat::DebeziumJson => (Format::Debezium, Encode::Json),
147            LegacyRowFormat::DebeziumMongoJson => (Format::DebeziumMongo, Encode::Json),
148            LegacyRowFormat::UpsertJson => (Format::Upsert, Encode::Json),
149            LegacyRowFormat::Avro(_) => (Format::Plain, Encode::Avro),
150            LegacyRowFormat::UpsertAvro(_) => (Format::Upsert, Encode::Avro),
151            LegacyRowFormat::Maxwell => (Format::Maxwell, Encode::Json),
152            LegacyRowFormat::CanalJson => (Format::Canal, Encode::Json),
153            LegacyRowFormat::Csv(_) => (Format::Plain, Encode::Csv),
154            LegacyRowFormat::DebeziumAvro(_) => (Format::Debezium, Encode::Avro),
155            LegacyRowFormat::Bytes => (Format::Plain, Encode::Bytes),
156            LegacyRowFormat::Native => (Format::Native, Encode::Native),
157        };
158
159        let row_options = match self {
160            LegacyRowFormat::Protobuf(schema) => {
161                let mut options = vec![SqlOption {
162                    name: ObjectName(vec![Ident {
163                        value: "message".into(),
164                        quote_style: None,
165                    }]),
166                    value: Value::SingleQuotedString(schema.message_name.0).into(),
167                }];
168                if schema.use_schema_registry {
169                    options.push(SqlOption {
170                        name: ObjectName(vec![Ident {
171                            value: "schema.registry".into(),
172                            quote_style: None,
173                        }]),
174                        value: Value::SingleQuotedString(schema.row_schema_location.0).into(),
175                    });
176                } else {
177                    options.push(SqlOption {
178                        name: ObjectName(vec![Ident {
179                            value: "schema.location".into(),
180                            quote_style: None,
181                        }]),
182                        value: Value::SingleQuotedString(schema.row_schema_location.0).into(),
183                    })
184                }
185                options
186            }
187            LegacyRowFormat::Avro(schema) | LegacyRowFormat::UpsertAvro(schema) => {
188                if schema.use_schema_registry {
189                    vec![SqlOption {
190                        name: ObjectName(vec![Ident {
191                            value: "schema.registry".into(),
192                            quote_style: None,
193                        }]),
194                        value: Value::SingleQuotedString(schema.row_schema_location.0).into(),
195                    }]
196                } else {
197                    vec![SqlOption {
198                        name: ObjectName(vec![Ident {
199                            value: "schema.location".into(),
200                            quote_style: None,
201                        }]),
202                        value: Value::SingleQuotedString(schema.row_schema_location.0).into(),
203                    }]
204                }
205            }
206            LegacyRowFormat::DebeziumAvro(schema) => {
207                vec![SqlOption {
208                    name: ObjectName(vec![Ident {
209                        value: "schema.registry".into(),
210                        quote_style: None,
211                    }]),
212                    value: Value::SingleQuotedString(schema.row_schema_location.0).into(),
213                }]
214            }
215            LegacyRowFormat::Csv(schema) => {
216                vec![
217                    SqlOption {
218                        name: ObjectName(vec![Ident {
219                            value: "delimiter".into(),
220                            quote_style: None,
221                        }]),
222                        value: Value::SingleQuotedString(
223                            String::from_utf8_lossy(&[schema.delimiter]).into(),
224                        )
225                        .into(),
226                    },
227                    SqlOption {
228                        name: ObjectName(vec![Ident {
229                            value: "without_header".into(),
230                            quote_style: None,
231                        }]),
232                        value: Value::SingleQuotedString(if schema.has_header {
233                            "false".into()
234                        } else {
235                            "true".into()
236                        })
237                        .into(),
238                    },
239                ]
240            }
241            _ => vec![],
242        };
243
244        FormatEncodeOptions {
245            format,
246            row_encode,
247            row_options,
248            key_encode: None,
249        }
250    }
251}
252
253impl fmt::Display for LegacyRowFormat {
254    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
255        write!(f, "ROW FORMAT ")?;
256        match self {
257            LegacyRowFormat::Protobuf(protobuf_schema) => {
258                write!(f, "PROTOBUF {}", protobuf_schema)
259            }
260            LegacyRowFormat::Json => write!(f, "JSON"),
261            LegacyRowFormat::UpsertJson => write!(f, "UPSERT_JSON"),
262            LegacyRowFormat::Maxwell => write!(f, "MAXWELL"),
263            LegacyRowFormat::DebeziumJson => write!(f, "DEBEZIUM_JSON"),
264            LegacyRowFormat::DebeziumMongoJson => write!(f, "DEBEZIUM_MONGO_JSON"),
265            LegacyRowFormat::Avro(avro_schema) => write!(f, "AVRO {}", avro_schema),
266            LegacyRowFormat::UpsertAvro(avro_schema) => write!(f, "UPSERT_AVRO {}", avro_schema),
267            LegacyRowFormat::CanalJson => write!(f, "CANAL_JSON"),
268            LegacyRowFormat::Csv(csv_info) => write!(f, "CSV {}", csv_info),
269            LegacyRowFormat::Native => write!(f, "NATIVE"),
270            LegacyRowFormat::DebeziumAvro(avro_schema) => {
271                write!(f, "DEBEZIUM_AVRO {}", avro_schema)
272            }
273            LegacyRowFormat::Bytes => write!(f, "BYTES"),
274        }
275    }
276}
277
278// sql_grammar!(ProtobufSchema {
279//     [Keyword::MESSAGE],
280//     message_name: AstString,
281//     [Keyword::ROW, Keyword::SCHEMA, Keyword::LOCATION],
282//     row_schema_location: AstString,
283// });
284#[derive(Debug, Clone, PartialEq, Eq, Hash)]
285#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
286pub struct ProtobufSchema {
287    pub message_name: AstString,
288    pub row_schema_location: AstString,
289    pub use_schema_registry: bool,
290}
291
292impl ParseTo for ProtobufSchema {
293    fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
294        impl_parse_to!([Keyword::MESSAGE], p);
295        impl_parse_to!(message_name: AstString, p);
296        impl_parse_to!([Keyword::ROW, Keyword::SCHEMA, Keyword::LOCATION], p);
297        impl_parse_to!(use_schema_registry => [Keyword::CONFLUENT, Keyword::SCHEMA, Keyword::REGISTRY], p);
298        impl_parse_to!(row_schema_location: AstString, p);
299        Ok(Self {
300            message_name,
301            row_schema_location,
302            use_schema_registry,
303        })
304    }
305}
306
307impl fmt::Display for ProtobufSchema {
308    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
309        let mut v: Vec<String> = vec![];
310        impl_fmt_display!([Keyword::MESSAGE], v);
311        impl_fmt_display!(message_name, v, self);
312        impl_fmt_display!([Keyword::ROW, Keyword::SCHEMA, Keyword::LOCATION], v);
313        impl_fmt_display!(use_schema_registry => [Keyword::CONFLUENT, Keyword::SCHEMA, Keyword::REGISTRY], v, self);
314        impl_fmt_display!(row_schema_location, v, self);
315        v.iter().join(" ").fmt(f)
316    }
317}
318
319// sql_grammar!(AvroSchema {
320//     [Keyword::ROW, Keyword::SCHEMA, Keyword::LOCATION, [Keyword::CONFLUENT, Keyword::SCHEMA,
321// Keyword::REGISTRY]],     row_schema_location: AstString,
322// });
323#[derive(Debug, Clone, PartialEq, Eq, Hash)]
324#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
325pub struct AvroSchema {
326    pub row_schema_location: AstString,
327    pub use_schema_registry: bool,
328}
329
330impl ParseTo for AvroSchema {
331    fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
332        impl_parse_to!([Keyword::ROW, Keyword::SCHEMA, Keyword::LOCATION], p);
333        impl_parse_to!(use_schema_registry => [Keyword::CONFLUENT, Keyword::SCHEMA, Keyword::REGISTRY], p);
334        impl_parse_to!(row_schema_location: AstString, p);
335        Ok(Self {
336            row_schema_location,
337            use_schema_registry,
338        })
339    }
340}
341
342impl fmt::Display for AvroSchema {
343    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
344        let mut v: Vec<String> = vec![];
345        impl_fmt_display!([Keyword::ROW, Keyword::SCHEMA, Keyword::LOCATION], v);
346        impl_fmt_display!(use_schema_registry => [Keyword::CONFLUENT, Keyword::SCHEMA, Keyword::REGISTRY], v, self);
347        impl_fmt_display!(row_schema_location, v, self);
348        v.iter().join(" ").fmt(f)
349    }
350}
351
352#[derive(Debug, Clone, PartialEq, Eq, Hash)]
353#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
354pub struct DebeziumAvroSchema {
355    pub row_schema_location: AstString,
356}
357
358impl fmt::Display for DebeziumAvroSchema {
359    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
360        let mut v: Vec<String> = vec![];
361        impl_fmt_display!(
362            [
363                Keyword::ROW,
364                Keyword::SCHEMA,
365                Keyword::LOCATION,
366                Keyword::CONFLUENT,
367                Keyword::SCHEMA,
368                Keyword::REGISTRY
369            ],
370            v
371        );
372        impl_fmt_display!(row_schema_location, v, self);
373        v.iter().join(" ").fmt(f)
374    }
375}
376
377impl ParseTo for DebeziumAvroSchema {
378    fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
379        impl_parse_to!(
380            [
381                Keyword::ROW,
382                Keyword::SCHEMA,
383                Keyword::LOCATION,
384                Keyword::CONFLUENT,
385                Keyword::SCHEMA,
386                Keyword::REGISTRY
387            ],
388            p
389        );
390        impl_parse_to!(row_schema_location: AstString, p);
391        Ok(Self {
392            row_schema_location,
393        })
394    }
395}
396
397#[derive(Debug, Clone, PartialEq, Eq, Hash)]
398#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
399pub struct CsvInfo {
400    pub delimiter: u8,
401    pub has_header: bool,
402}
403
404pub fn get_delimiter(chars: &str) -> Result<u8, StrError> {
405    match chars {
406        "," => Ok(b','),   // comma
407        ";" => Ok(b';'),   // semicolon
408        "\t" => Ok(b'\t'), // tab
409        other => Err(StrError(format!(
410            "The delimiter should be one of ',', ';', E'\\t', but got {other:?}",
411        ))),
412    }
413}
414
415impl ParseTo for CsvInfo {
416    fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
417        impl_parse_to!(without_header => [Keyword::WITHOUT, Keyword::HEADER], p);
418        impl_parse_to!([Keyword::DELIMITED, Keyword::BY], p);
419        impl_parse_to!(delimiter: AstString, p);
420        let delimiter = get_delimiter(delimiter.0.as_str())?;
421        Ok(Self {
422            delimiter,
423            has_header: !without_header,
424        })
425    }
426}
427
428impl fmt::Display for CsvInfo {
429    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
430        if !self.has_header {
431            write!(f, "WITHOUT HEADER ")?;
432        }
433        write!(
434            f,
435            "DELIMITED BY {}",
436            AstString((self.delimiter as char).to_string())
437        )?;
438        Ok(())
439    }
440}