risingwave_sqlparser/ast/
statement.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt;
16use std::fmt::{Formatter, Write};
17
18use itertools::Itertools;
19use winnow::ModalResult;
20
21use super::ddl::SourceWatermark;
22use super::legacy_source::{CompatibleFormatEncode, parse_format_encode};
23use super::{EmitMode, Ident, ObjectType, Query, Value};
24use crate::ast::{
25    ColumnDef, ObjectName, SqlOption, TableConstraint, display_comma_separated, display_separated,
26};
27use crate::keywords::Keyword;
28use crate::parser::{IncludeOption, IsOptional, Parser};
29use crate::parser_err;
30use crate::parser_v2::literal_u32;
31use crate::tokenizer::Token;
32
33/// Consumes token from the parser into an AST node.
34pub trait ParseTo: Sized {
35    fn parse_to(parser: &mut Parser<'_>) -> ModalResult<Self>;
36}
37
38#[macro_export]
39macro_rules! impl_parse_to {
40    () => {};
41    ($field:ident : $field_type:ty, $parser:ident) => {
42        let $field = <$field_type>::parse_to($parser)?;
43    };
44    ($field:ident => [$($arr:tt)+], $parser:ident) => {
45        let $field = $parser.parse_keywords(&[$($arr)+]);
46    };
47    ([$($arr:tt)+], $parser:ident) => {
48        $parser.expect_keywords(&[$($arr)+])?;
49    };
50}
51
52#[macro_export]
53macro_rules! impl_fmt_display {
54    () => {};
55    ($field:ident, $v:ident, $self:ident) => {{
56        let s = format!("{}", $self.$field);
57        if !s.is_empty() {
58            $v.push(s);
59        }
60    }};
61    ($field:ident => [$($arr:tt)+], $v:ident, $self:ident) => {
62        if $self.$field {
63            $v.push(format!("{}", display_separated(&[$($arr)+], " ")));
64        }
65    };
66    ([$($arr:tt)+], $v:ident) => {
67        $v.push(format!("{}", display_separated(&[$($arr)+], " ")));
68    };
69}
70
71// sql_grammar!(CreateSourceStatement {
72//     if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS],
73//     source_name: Ident,
74//     with_properties: AstOption<WithProperties>,
75//     [Keyword::ROW, Keyword::FORMAT],
76//     format_encode: SourceSchema,
77//     [Keyword::WATERMARK, Keyword::FOR] column [Keyword::AS] <expr>
78// });
79#[derive(Debug, Clone, PartialEq, Eq, Hash)]
80pub struct CreateSourceStatement {
81    pub temporary: bool,
82    pub if_not_exists: bool,
83    pub columns: Vec<ColumnDef>,
84    // The wildchar position in columns defined in sql. Only exist when using external schema.
85    pub wildcard_idx: Option<usize>,
86    pub constraints: Vec<TableConstraint>,
87    pub source_name: ObjectName,
88    pub with_properties: WithProperties,
89    pub format_encode: CompatibleFormatEncode,
90    pub source_watermarks: Vec<SourceWatermark>,
91    pub include_column_options: IncludeOption,
92}
93
94/// FORMAT means how to get the operation(Insert/Delete) from the input.
95///
96/// Check `CONNECTORS_COMPATIBLE_FORMATS` for what `FORMAT ... ENCODE ...` combinations are allowed.
97#[derive(Debug, Clone, PartialEq, Eq, Hash)]
98pub enum Format {
99    /// The format is the same with RisingWave's internal representation.
100    /// Used internally for schema change
101    Native,
102    /// for self-explanatory sources like iceberg, they have their own format, and should not be specified by user.
103    None,
104    // Keyword::DEBEZIUM
105    Debezium,
106    // Keyword::DEBEZIUM_MONGO
107    DebeziumMongo,
108    // Keyword::MAXWELL
109    Maxwell,
110    // Keyword::CANAL
111    Canal,
112    // Keyword::UPSERT
113    Upsert,
114    // Keyword::PLAIN
115    Plain,
116}
117
118// TODO: unify with `from_keyword`
119impl fmt::Display for Format {
120    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
121        write!(
122            f,
123            "{}",
124            match self {
125                Format::Native => "NATIVE",
126                Format::Debezium => "DEBEZIUM",
127                Format::DebeziumMongo => "DEBEZIUM_MONGO",
128                Format::Maxwell => "MAXWELL",
129                Format::Canal => "CANAL",
130                Format::Upsert => "UPSERT",
131                Format::Plain => "PLAIN",
132                Format::None => "NONE",
133            }
134        )
135    }
136}
137
138impl Format {
139    pub fn from_keyword(s: &str) -> ModalResult<Self> {
140        Ok(match s {
141            "DEBEZIUM" => Format::Debezium,
142            "DEBEZIUM_MONGO" => Format::DebeziumMongo,
143            "MAXWELL" => Format::Maxwell,
144            "CANAL" => Format::Canal,
145            "PLAIN" => Format::Plain,
146            "UPSERT" => Format::Upsert,
147            "NATIVE" => Format::Native,
148            "NONE" => Format::None,
149            _ => parser_err!("expected PROTOBUF | DEBEZIUM | PLAIN | NATIVE | NONE after FORMAT"),
150        })
151    }
152}
153
154/// Check `CONNECTORS_COMPATIBLE_FORMATS` for what `FORMAT ... ENCODE ...` combinations are allowed.
155#[derive(Debug, Clone, PartialEq, Eq, Hash)]
156pub enum Encode {
157    Avro,     // Keyword::Avro
158    Csv,      // Keyword::CSV
159    Protobuf, // Keyword::PROTOBUF
160    Json,     // Keyword::JSON
161    Bytes,    // Keyword::BYTES
162    /// for self-explanatory sources like iceberg, they have their own format, and should not be specified by user.
163    None,
164    Text, // Keyword::TEXT
165    /// The encode is the same with RisingWave's internal representation.
166    /// Used internally for schema change
167    Native,
168    Template,
169    Parquet,
170}
171
172// TODO: unify with `from_keyword`
173impl fmt::Display for Encode {
174    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
175        write!(
176            f,
177            "{}",
178            match self {
179                Encode::Avro => "AVRO",
180                Encode::Csv => "CSV",
181                Encode::Protobuf => "PROTOBUF",
182                Encode::Json => "JSON",
183                Encode::Bytes => "BYTES",
184                Encode::Native => "NATIVE",
185                Encode::Template => "TEMPLATE",
186                Encode::None => "NONE",
187                Encode::Parquet => "PARQUET",
188                Encode::Text => "TEXT",
189            }
190        )
191    }
192}
193
194impl Encode {
195    pub fn from_keyword(s: &str) -> ModalResult<Self> {
196        Ok(match s {
197            "AVRO" => Encode::Avro,
198            "TEXT" => Encode::Text,
199            "BYTES" => Encode::Bytes,
200            "CSV" => Encode::Csv,
201            "PROTOBUF" => Encode::Protobuf,
202            "JSON" => Encode::Json,
203            "TEMPLATE" => Encode::Template,
204            "PARQUET" => Encode::Parquet,
205            "NATIVE" => Encode::Native,
206            "NONE" => Encode::None,
207            _ => parser_err!(
208                "expected AVRO | BYTES | CSV | PROTOBUF | JSON | NATIVE | TEMPLATE | PARQUET | NONE after Encode"
209            ),
210        })
211    }
212}
213
214/// `FORMAT ... ENCODE ... [(a=b, ...)] [KEY ENCODE ...]`
215#[derive(Debug, Clone, PartialEq, Eq, Hash)]
216pub struct FormatEncodeOptions {
217    pub format: Format,
218    pub row_encode: Encode,
219    pub row_options: Vec<SqlOption>,
220
221    pub key_encode: Option<Encode>,
222}
223
224impl Parser<'_> {
225    /// Peek the next tokens to see if it is `FORMAT` or `ROW FORMAT` (for compatibility).
226    fn peek_format_encode_format(&mut self) -> bool {
227        (self.peek_nth_any_of_keywords(0, &[Keyword::ROW])
228            && self.peek_nth_any_of_keywords(1, &[Keyword::FORMAT])) // ROW FORMAT
229            || self.peek_nth_any_of_keywords(0, &[Keyword::FORMAT]) // FORMAT
230    }
231
232    /// Parse the source schema. The behavior depends on the `connector` type.
233    pub fn parse_format_encode_with_connector(
234        &mut self,
235        connector: &str,
236        cdc_source_job: bool,
237    ) -> ModalResult<CompatibleFormatEncode> {
238        // row format for cdc source must be debezium json
239        // row format for nexmark source must be native
240        // default row format for datagen source is native
241        // FIXME: parse input `connector` to enum type instead using string here
242        if connector.contains("-cdc") {
243            let expected = if cdc_source_job {
244                FormatEncodeOptions::plain_json()
245            } else if connector.contains("mongodb") {
246                FormatEncodeOptions::debezium_mongo_json()
247            } else {
248                FormatEncodeOptions::debezium_json()
249            };
250
251            if self.peek_format_encode_format() {
252                let schema = parse_format_encode(self)?.into_v2();
253                if schema != expected {
254                    parser_err!(
255                        "Row format for CDC connectors should be \
256                         either omitted or set to `{expected}`",
257                    );
258                }
259            }
260            Ok(expected.into())
261        } else if connector.contains("nexmark") {
262            let expected = FormatEncodeOptions::native();
263            if self.peek_format_encode_format() {
264                let schema = parse_format_encode(self)?.into_v2();
265                if schema != expected {
266                    parser_err!(
267                        "Row format for nexmark connectors should be \
268                         either omitted or set to `{expected}`",
269                    );
270                }
271            }
272            Ok(expected.into())
273        } else if connector.contains("datagen") {
274            Ok(if self.peek_format_encode_format() {
275                parse_format_encode(self)?
276            } else {
277                FormatEncodeOptions::native().into()
278            })
279        } else if connector.contains("iceberg") || connector.contains("adbc_snowflake") {
280            let expected = FormatEncodeOptions::none();
281            if self.peek_format_encode_format() {
282                let schema = parse_format_encode(self)?.into_v2();
283                if schema != expected {
284                    parser_err!(
285                        "Row format for iceberg connectors should be \
286                         either omitted or set to `{expected}`",
287                    );
288                }
289            }
290            Ok(expected.into())
291        } else if connector.contains("webhook") {
292            parser_err!(
293                "Source with webhook connector is not supported. \
294                 Please use the `CREATE TABLE ... WITH ...` statement instead.",
295            );
296        } else {
297            Ok(parse_format_encode(self)?)
298        }
299    }
300
301    /// Parse `FORMAT ... ENCODE ... (...)`.
302    pub fn parse_schema(&mut self) -> ModalResult<Option<FormatEncodeOptions>> {
303        if !self.parse_keyword(Keyword::FORMAT) {
304            return Ok(None);
305        }
306
307        let id = self.parse_identifier()?;
308        let s = id.value.to_ascii_uppercase();
309        let format = Format::from_keyword(&s)?;
310        self.expect_keyword(Keyword::ENCODE)?;
311        let id = self.parse_identifier()?;
312        let s = id.value.to_ascii_uppercase();
313        let row_encode = Encode::from_keyword(&s)?;
314        let row_options = self.parse_options()?;
315
316        let key_encode = if self.parse_keywords(&[Keyword::KEY, Keyword::ENCODE]) {
317            Some(Encode::from_keyword(
318                self.parse_identifier()?.value.to_ascii_uppercase().as_str(),
319            )?)
320        } else {
321            None
322        };
323
324        Ok(Some(FormatEncodeOptions {
325            format,
326            row_encode,
327            row_options,
328            key_encode,
329        }))
330    }
331}
332
333impl FormatEncodeOptions {
334    pub const fn plain_json() -> Self {
335        FormatEncodeOptions {
336            format: Format::Plain,
337            row_encode: Encode::Json,
338            row_options: Vec::new(),
339            key_encode: None,
340        }
341    }
342
343    /// Create a new source schema with `Debezium` format and `Json` encoding.
344    pub const fn debezium_json() -> Self {
345        FormatEncodeOptions {
346            format: Format::Debezium,
347            row_encode: Encode::Json,
348            row_options: Vec::new(),
349            key_encode: None,
350        }
351    }
352
353    pub const fn debezium_mongo_json() -> Self {
354        FormatEncodeOptions {
355            format: Format::DebeziumMongo,
356            row_encode: Encode::Json,
357            row_options: Vec::new(),
358            key_encode: None,
359        }
360    }
361
362    /// Create a new source schema with `Native` format and encoding.
363    pub const fn native() -> Self {
364        FormatEncodeOptions {
365            format: Format::Native,
366            row_encode: Encode::Native,
367            row_options: Vec::new(),
368            key_encode: None,
369        }
370    }
371
372    /// Create a new source schema with `None` format and encoding.
373    /// Used for self-explanatory source like iceberg.
374    pub const fn none() -> Self {
375        FormatEncodeOptions {
376            format: Format::None,
377            row_encode: Encode::None,
378            row_options: Vec::new(),
379            key_encode: None,
380        }
381    }
382
383    pub fn row_options(&self) -> &[SqlOption] {
384        self.row_options.as_ref()
385    }
386}
387
388impl fmt::Display for FormatEncodeOptions {
389    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
390        write!(f, "FORMAT {} ENCODE {}", self.format, self.row_encode)?;
391
392        if !self.row_options().is_empty() {
393            write!(f, " ({})", display_comma_separated(self.row_options()))?;
394        }
395
396        if let Some(key_encode) = &self.key_encode {
397            write!(f, " KEY ENCODE {}", key_encode)?;
398        }
399
400        Ok(())
401    }
402}
403
404pub(super) fn fmt_create_items(
405    columns: &[ColumnDef],
406    constraints: &[TableConstraint],
407    watermarks: &[SourceWatermark],
408    wildcard_idx: Option<usize>,
409) -> std::result::Result<String, fmt::Error> {
410    let mut items = String::new();
411    let has_items = !columns.is_empty()
412        || !constraints.is_empty()
413        || !watermarks.is_empty()
414        || wildcard_idx.is_some();
415    has_items.then(|| write!(&mut items, "("));
416
417    if let Some(wildcard_idx) = wildcard_idx {
418        let (columns_l, columns_r) = columns.split_at(wildcard_idx);
419        write!(&mut items, "{}", display_comma_separated(columns_l))?;
420        if !columns_l.is_empty() {
421            write!(&mut items, ", ")?;
422        }
423        write!(&mut items, "{}", Token::Mul)?;
424        if !columns_r.is_empty() {
425            write!(&mut items, ", ")?;
426        }
427        write!(&mut items, "{}", display_comma_separated(columns_r))?;
428    } else {
429        write!(&mut items, "{}", display_comma_separated(columns))?;
430    }
431    let mut leading_items = !columns.is_empty() || wildcard_idx.is_some();
432
433    if leading_items && !constraints.is_empty() {
434        write!(&mut items, ", ")?;
435    }
436    write!(&mut items, "{}", display_comma_separated(constraints))?;
437    leading_items |= !constraints.is_empty();
438
439    if leading_items && !watermarks.is_empty() {
440        write!(&mut items, ", ")?;
441    }
442    write!(&mut items, "{}", display_comma_separated(watermarks))?;
443    // uncomment this when adding more sections below
444    // leading_items |= !watermarks.is_empty();
445
446    has_items.then(|| write!(&mut items, ")"));
447    Ok(items)
448}
449
450impl fmt::Display for CreateSourceStatement {
451    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
452        let mut v: Vec<String> = vec![];
453        impl_fmt_display!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], v, self);
454        impl_fmt_display!(source_name, v, self);
455
456        let items = fmt_create_items(
457            &self.columns,
458            &self.constraints,
459            &self.source_watermarks,
460            self.wildcard_idx,
461        )?;
462        if !items.is_empty() {
463            v.push(items);
464        }
465
466        for item in &self.include_column_options {
467            v.push(format!("{}", item));
468        }
469
470        // skip format_encode for cdc source
471        let is_cdc_source = self.with_properties.0.iter().any(|option| {
472            option.name.real_value().eq_ignore_ascii_case("connector")
473                && option.value.to_string().contains("cdc")
474        });
475
476        impl_fmt_display!(with_properties, v, self);
477        if !is_cdc_source {
478            impl_fmt_display!(format_encode, v, self);
479        }
480        v.iter().join(" ").fmt(f)
481    }
482}
483
484#[derive(Debug, Clone, PartialEq, Eq, Hash)]
485pub enum CreateSink {
486    From(ObjectName),
487    AsQuery(Box<Query>),
488}
489
490impl fmt::Display for CreateSink {
491    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
492        match self {
493            Self::From(mv) => write!(f, "FROM {}", mv),
494            Self::AsQuery(query) => write!(f, "AS {}", query),
495        }
496    }
497}
498// sql_grammar!(CreateSinkStatement {
499//     if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS],
500//     sink_name: Ident,
501//     [Keyword::FROM],
502//     materialized_view: Ident,
503//     with_properties: AstOption<WithProperties>,
504// });
505#[derive(Debug, Clone, PartialEq, Eq, Hash)]
506pub struct CreateSinkStatement {
507    pub if_not_exists: bool,
508    pub sink_name: ObjectName,
509    pub with_properties: WithProperties,
510    pub sink_from: CreateSink,
511
512    // only used when creating sink into a table
513    // insert to specific columns of the target table
514    pub columns: Vec<Ident>,
515    pub emit_mode: Option<EmitMode>,
516    pub sink_schema: Option<FormatEncodeOptions>,
517    pub into_table_name: Option<ObjectName>,
518}
519
520impl ParseTo for CreateSinkStatement {
521    fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
522        impl_parse_to!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], p);
523        impl_parse_to!(sink_name: ObjectName, p);
524
525        let mut target_spec_columns = Vec::new();
526        let into_table_name = if p.parse_keyword(Keyword::INTO) {
527            impl_parse_to!(into_table_name: ObjectName, p);
528
529            // we only allow specify columns when creating sink into a table
530            target_spec_columns = p.parse_parenthesized_column_list(IsOptional::Optional)?;
531            Some(into_table_name)
532        } else {
533            None
534        };
535
536        let sink_from = if p.parse_keyword(Keyword::FROM) {
537            impl_parse_to!(from_name: ObjectName, p);
538            CreateSink::From(from_name)
539        } else if p.parse_keyword(Keyword::AS) {
540            let query = Box::new(p.parse_query()?);
541            CreateSink::AsQuery(query)
542        } else {
543            p.expected("FROM or AS after CREATE SINK sink_name")?
544        };
545
546        let emit_mode: Option<EmitMode> = p.parse_emit_mode()?;
547
548        // This check cannot be put into the `WithProperties::parse_to`, since other
549        // statements may not need the with properties.
550        if !p.peek_nth_any_of_keywords(0, &[Keyword::WITH]) && into_table_name.is_none() {
551            p.expected("WITH")?
552        }
553        impl_parse_to!(with_properties: WithProperties, p);
554
555        if with_properties.0.is_empty() && into_table_name.is_none() {
556            parser_err!("sink properties not provided");
557        }
558
559        let sink_schema = p.parse_schema()?;
560
561        Ok(Self {
562            if_not_exists,
563            sink_name,
564            with_properties,
565            sink_from,
566            columns: target_spec_columns,
567            emit_mode,
568            sink_schema,
569            into_table_name,
570        })
571    }
572}
573
574impl fmt::Display for CreateSinkStatement {
575    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
576        let mut v: Vec<String> = vec![];
577        impl_fmt_display!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], v, self);
578        impl_fmt_display!(sink_name, v, self);
579        if let Some(into_table) = &self.into_table_name {
580            impl_fmt_display!([Keyword::INTO], v);
581            impl_fmt_display!([into_table], v);
582            if !self.columns.is_empty() {
583                v.push(format!("({})", display_comma_separated(&self.columns)));
584            }
585        }
586        impl_fmt_display!(sink_from, v, self);
587        if let Some(ref emit_mode) = self.emit_mode {
588            v.push(format!("EMIT {}", emit_mode));
589        }
590        impl_fmt_display!(with_properties, v, self);
591        if let Some(schema) = &self.sink_schema {
592            v.push(format!("{}", schema));
593        }
594        v.iter().join(" ").fmt(f)
595    }
596}
597
598// sql_grammar!(CreateSubscriptionStatement {
599//     if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS],
600//     subscription_name: Ident,
601//     [Keyword::FROM],
602//     materialized_view: Ident,
603//     with_properties: AstOption<WithProperties>,
604// });
605#[derive(Debug, Clone, PartialEq, Eq, Hash)]
606pub struct CreateSubscriptionStatement {
607    pub if_not_exists: bool,
608    pub subscription_name: ObjectName,
609    pub with_properties: WithProperties,
610    pub subscription_from: ObjectName,
611    // pub emit_mode: Option<EmitMode>,
612}
613
614impl ParseTo for CreateSubscriptionStatement {
615    fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
616        impl_parse_to!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], p);
617        impl_parse_to!(subscription_name: ObjectName, p);
618
619        let subscription_from = if p.parse_keyword(Keyword::FROM) {
620            impl_parse_to!(from_name: ObjectName, p);
621            from_name
622        } else {
623            p.expected("FROM after CREATE SUBSCRIPTION subscription_name")?
624        };
625
626        // let emit_mode = p.parse_emit_mode()?;
627
628        // This check cannot be put into the `WithProperties::parse_to`, since other
629        // statements may not need the with properties.
630        if !p.peek_nth_any_of_keywords(0, &[Keyword::WITH]) {
631            p.expected("WITH")?
632        }
633        impl_parse_to!(with_properties: WithProperties, p);
634
635        if with_properties.0.is_empty() {
636            parser_err!("subscription properties not provided");
637        }
638
639        Ok(Self {
640            if_not_exists,
641            subscription_name,
642            with_properties,
643            subscription_from,
644        })
645    }
646}
647
648impl fmt::Display for CreateSubscriptionStatement {
649    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
650        let mut v: Vec<String> = vec![];
651        impl_fmt_display!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], v, self);
652        impl_fmt_display!(subscription_name, v, self);
653        v.push(format!("FROM {}", self.subscription_from));
654        impl_fmt_display!(with_properties, v, self);
655        v.iter().join(" ").fmt(f)
656    }
657}
658
659#[derive(Debug, Clone, PartialEq, Eq, Hash)]
660pub enum DeclareCursor {
661    Query(Box<Query>),
662    Subscription(ObjectName, Since),
663}
664
665impl fmt::Display for DeclareCursor {
666    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
667        let mut v: Vec<String> = vec![];
668        match self {
669            DeclareCursor::Query(query) => v.push(format!("{}", query.as_ref())),
670            DeclareCursor::Subscription(name, since) => {
671                v.push(format!("{}", name));
672                v.push(format!("{:?}", since));
673            }
674        }
675        v.iter().join(" ").fmt(f)
676    }
677}
678
679// sql_grammar!(DeclareCursorStatement {
680//     cursor_name: Ident,
681//     [Keyword::SUBSCRIPTION]
682//     [Keyword::CURSOR],
683//     [Keyword::FOR],
684//     subscription: Ident or query: Query,
685//     [Keyword::SINCE],
686//     rw_timestamp: Ident,
687// });
688#[derive(Debug, Clone, PartialEq, Eq, Hash)]
689pub struct DeclareCursorStatement {
690    pub cursor_name: Ident,
691    pub declare_cursor: DeclareCursor,
692}
693
694impl ParseTo for DeclareCursorStatement {
695    fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
696        let cursor_name = p.parse_identifier_non_reserved()?;
697
698        let declare_cursor = if !p.parse_keyword(Keyword::SUBSCRIPTION) {
699            p.expect_keyword(Keyword::CURSOR)?;
700            p.expect_keyword(Keyword::FOR)?;
701            DeclareCursor::Query(Box::new(p.parse_query()?))
702        } else {
703            p.expect_keyword(Keyword::CURSOR)?;
704            p.expect_keyword(Keyword::FOR)?;
705            let cursor_for_name = p.parse_object_name()?;
706            let rw_timestamp = p.parse_since()?;
707            DeclareCursor::Subscription(cursor_for_name, rw_timestamp)
708        };
709
710        Ok(Self {
711            cursor_name,
712            declare_cursor,
713        })
714    }
715}
716
717impl fmt::Display for DeclareCursorStatement {
718    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
719        let mut v: Vec<String> = vec![];
720        impl_fmt_display!(cursor_name, v, self);
721        match &self.declare_cursor {
722            DeclareCursor::Query(_) => {
723                v.push("CURSOR FOR ".to_owned());
724            }
725            DeclareCursor::Subscription { .. } => {
726                v.push("SUBSCRIPTION CURSOR FOR ".to_owned());
727            }
728        }
729        impl_fmt_display!(declare_cursor, v, self);
730        v.iter().join(" ").fmt(f)
731    }
732}
733
734// sql_grammar!(FetchCursorStatement {
735//     cursor_name: Ident,
736// });
737#[derive(Debug, Clone, PartialEq, Eq, Hash)]
738pub struct FetchCursorStatement {
739    pub cursor_name: Ident,
740    pub count: u32,
741    pub with_properties: WithProperties,
742}
743
744impl ParseTo for FetchCursorStatement {
745    fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
746        let count = if p.parse_keyword(Keyword::NEXT) {
747            1
748        } else {
749            literal_u32(p)?
750        };
751        p.expect_keyword(Keyword::FROM)?;
752        let cursor_name = p.parse_identifier_non_reserved()?;
753        impl_parse_to!(with_properties: WithProperties, p);
754
755        Ok(Self {
756            cursor_name,
757            count,
758            with_properties,
759        })
760    }
761}
762
763impl fmt::Display for FetchCursorStatement {
764    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
765        let mut v: Vec<String> = vec![];
766        if self.count == 1 {
767            v.push("NEXT ".to_owned());
768        } else {
769            impl_fmt_display!(count, v, self);
770        }
771        v.push("FROM ".to_owned());
772        impl_fmt_display!(cursor_name, v, self);
773        v.iter().join(" ").fmt(f)
774    }
775}
776
777// sql_grammar!(CloseCursorStatement {
778//     cursor_name: Ident,
779// });
780#[derive(Debug, Clone, PartialEq, Eq, Hash)]
781pub struct CloseCursorStatement {
782    pub cursor_name: Option<Ident>,
783}
784
785impl ParseTo for CloseCursorStatement {
786    fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
787        let cursor_name = if p.parse_keyword(Keyword::ALL) {
788            None
789        } else {
790            Some(p.parse_identifier_non_reserved()?)
791        };
792
793        Ok(Self { cursor_name })
794    }
795}
796
797impl fmt::Display for CloseCursorStatement {
798    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
799        let mut v: Vec<String> = vec![];
800        if let Some(cursor_name) = &self.cursor_name {
801            v.push(format!("{}", cursor_name));
802        } else {
803            v.push("ALL".to_owned());
804        }
805        v.iter().join(" ").fmt(f)
806    }
807}
808
809// sql_grammar!(CreateConnectionStatement {
810//     if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS],
811//     connection_name: Ident,
812//     with_properties: AstOption<WithProperties>,
813// });
814#[derive(Debug, Clone, PartialEq, Eq, Hash)]
815pub struct CreateConnectionStatement {
816    pub if_not_exists: bool,
817    pub connection_name: ObjectName,
818    pub with_properties: WithProperties,
819}
820
821impl ParseTo for CreateConnectionStatement {
822    fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
823        impl_parse_to!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], p);
824        impl_parse_to!(connection_name: ObjectName, p);
825        impl_parse_to!(with_properties: WithProperties, p);
826        if with_properties.0.is_empty() {
827            parser_err!("connection properties not provided");
828        }
829
830        Ok(Self {
831            if_not_exists,
832            connection_name,
833            with_properties,
834        })
835    }
836}
837
838impl fmt::Display for CreateConnectionStatement {
839    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
840        let mut v: Vec<String> = vec![];
841        impl_fmt_display!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], v, self);
842        impl_fmt_display!(connection_name, v, self);
843        impl_fmt_display!(with_properties, v, self);
844        v.iter().join(" ").fmt(f)
845    }
846}
847
848#[derive(Debug, Clone, PartialEq, Eq, Hash)]
849pub struct CreateSecretStatement {
850    pub if_not_exists: bool,
851    pub secret_name: ObjectName,
852    pub credential: Value,
853    pub with_properties: WithProperties,
854}
855
856impl ParseTo for CreateSecretStatement {
857    fn parse_to(parser: &mut Parser<'_>) -> ModalResult<Self> {
858        impl_parse_to!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], parser);
859        impl_parse_to!(secret_name: ObjectName, parser);
860        impl_parse_to!(with_properties: WithProperties, parser);
861        let mut credential = Value::Null;
862        if parser.parse_keyword(Keyword::AS) {
863            credential = parser.ensure_parse_value()?;
864        }
865        Ok(Self {
866            if_not_exists,
867            secret_name,
868            credential,
869            with_properties,
870        })
871    }
872}
873
874impl fmt::Display for CreateSecretStatement {
875    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
876        let mut v: Vec<String> = vec![];
877        impl_fmt_display!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], v, self);
878        impl_fmt_display!(secret_name, v, self);
879        impl_fmt_display!(with_properties, v, self);
880        if self.credential != Value::Null {
881            v.push("AS".to_owned());
882            impl_fmt_display!(credential, v, self);
883        }
884        v.iter().join(" ").fmt(f)
885    }
886}
887
888#[derive(Debug, Clone, PartialEq, Eq, Hash)]
889pub struct WithProperties(pub Vec<SqlOption>);
890
891impl ParseTo for WithProperties {
892    fn parse_to(parser: &mut Parser<'_>) -> ModalResult<Self> {
893        Ok(Self(
894            parser.parse_options_with_preceding_keyword(Keyword::WITH)?,
895        ))
896    }
897}
898
899impl fmt::Display for WithProperties {
900    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
901        if !self.0.is_empty() {
902            write!(f, "WITH ({})", display_comma_separated(self.0.as_slice()))
903        } else {
904            Ok(())
905        }
906    }
907}
908
909#[derive(Debug, Clone, PartialEq, Eq, Hash)]
910pub enum Since {
911    TimestampMsNum(u64),
912    ProcessTime,
913    Begin,
914    Full,
915}
916
917impl fmt::Display for Since {
918    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
919        use Since::*;
920        match self {
921            TimestampMsNum(ts) => write!(f, " SINCE {}", ts),
922            ProcessTime => write!(f, " SINCE PROCTIME()"),
923            Begin => write!(f, " SINCE BEGIN()"),
924            Full => write!(f, " FULL"),
925        }
926    }
927}
928
929#[derive(Debug, Clone, PartialEq, Eq, Hash)]
930pub struct RowSchemaLocation {
931    pub value: AstString,
932}
933
934impl ParseTo for RowSchemaLocation {
935    fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
936        impl_parse_to!([Keyword::ROW, Keyword::SCHEMA, Keyword::LOCATION], p);
937        impl_parse_to!(value: AstString, p);
938        Ok(Self { value })
939    }
940}
941
942impl fmt::Display for RowSchemaLocation {
943    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
944        let mut v = vec![];
945        impl_fmt_display!([Keyword::ROW, Keyword::SCHEMA, Keyword::LOCATION], v);
946        impl_fmt_display!(value, v, self);
947        v.iter().join(" ").fmt(f)
948    }
949}
950
951/// String literal. The difference with String is that it is displayed with
952/// single-quotes.
953#[derive(Debug, Clone, PartialEq, Eq, Hash)]
954pub struct AstString(pub String);
955
956impl ParseTo for AstString {
957    fn parse_to(parser: &mut Parser<'_>) -> ModalResult<Self> {
958        Ok(Self(parser.parse_literal_string()?))
959    }
960}
961
962impl fmt::Display for AstString {
963    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
964        write!(f, "'{}'", self.0)
965    }
966}
967
968/// This trait is used to replace `Option` because `fmt::Display` can not be implemented for
969/// `Option<T>`.
970#[derive(Debug, Clone, PartialEq, Eq, Hash)]
971pub enum AstOption<T> {
972    /// No value
973    None,
974    /// Some value `T`
975    Some(T),
976}
977
978impl<T: ParseTo> ParseTo for AstOption<T> {
979    fn parse_to(parser: &mut Parser<'_>) -> ModalResult<Self> {
980        match T::parse_to(parser) {
981            Ok(t) => Ok(AstOption::Some(t)),
982            Err(_) => Ok(AstOption::None),
983        }
984    }
985}
986
987impl<T: fmt::Display> fmt::Display for AstOption<T> {
988    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
989        match &self {
990            AstOption::Some(t) => t.fmt(f),
991            AstOption::None => Ok(()),
992        }
993    }
994}
995
996impl<T> From<AstOption<T>> for Option<T> {
997    fn from(val: AstOption<T>) -> Self {
998        match val {
999            AstOption::Some(t) => Some(t),
1000            AstOption::None => None,
1001        }
1002    }
1003}
1004
1005#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1006pub struct CreateUserStatement {
1007    pub user_name: ObjectName,
1008    pub with_options: UserOptions,
1009}
1010
1011#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1012pub struct AlterUserStatement {
1013    pub user_name: ObjectName,
1014    pub mode: AlterUserMode,
1015}
1016
1017#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1018pub enum AlterUserMode {
1019    Options(UserOptions),
1020    Rename(ObjectName),
1021}
1022
1023#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1024pub enum UserOption {
1025    SuperUser,
1026    NoSuperUser,
1027    CreateDB,
1028    NoCreateDB,
1029    CreateUser,
1030    NoCreateUser,
1031    Login,
1032    NoLogin,
1033    Admin,
1034    NoAdmin,
1035    EncryptedPassword(AstString),
1036    Password(Option<AstString>),
1037    OAuth(Vec<SqlOption>),
1038}
1039
1040impl fmt::Display for UserOption {
1041    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1042        match self {
1043            UserOption::SuperUser => write!(f, "SUPERUSER"),
1044            UserOption::NoSuperUser => write!(f, "NOSUPERUSER"),
1045            UserOption::CreateDB => write!(f, "CREATEDB"),
1046            UserOption::NoCreateDB => write!(f, "NOCREATEDB"),
1047            UserOption::CreateUser => write!(f, "CREATEUSER"),
1048            UserOption::NoCreateUser => write!(f, "NOCREATEUSER"),
1049            UserOption::Login => write!(f, "LOGIN"),
1050            UserOption::NoLogin => write!(f, "NOLOGIN"),
1051            UserOption::Admin => write!(f, "ADMIN"),
1052            UserOption::NoAdmin => write!(f, "NOADMIN"),
1053            UserOption::EncryptedPassword(p) => write!(f, "ENCRYPTED PASSWORD {}", p),
1054            UserOption::Password(None) => write!(f, "PASSWORD NULL"),
1055            UserOption::Password(Some(p)) => write!(f, "PASSWORD {}", p),
1056            UserOption::OAuth(options) => {
1057                write!(f, "({})", display_comma_separated(options.as_slice()))
1058            }
1059        }
1060    }
1061}
1062
1063#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1064pub struct UserOptions(pub Vec<UserOption>);
1065
1066#[derive(Default)]
1067struct UserOptionsBuilder {
1068    super_user: Option<UserOption>,
1069    create_db: Option<UserOption>,
1070    create_user: Option<UserOption>,
1071    login: Option<UserOption>,
1072    admin: Option<UserOption>,
1073    password: Option<UserOption>,
1074}
1075
1076impl UserOptionsBuilder {
1077    fn build(self) -> UserOptions {
1078        let mut options = vec![];
1079        if let Some(option) = self.super_user {
1080            options.push(option);
1081        }
1082        if let Some(option) = self.create_db {
1083            options.push(option);
1084        }
1085        if let Some(option) = self.create_user {
1086            options.push(option);
1087        }
1088        if let Some(option) = self.login {
1089            options.push(option);
1090        }
1091        if let Some(option) = self.admin {
1092            options.push(option);
1093        }
1094        if let Some(option) = self.password {
1095            options.push(option);
1096        }
1097        UserOptions(options)
1098    }
1099}
1100
1101impl ParseTo for UserOptions {
1102    fn parse_to(parser: &mut Parser<'_>) -> ModalResult<Self> {
1103        let mut builder = UserOptionsBuilder::default();
1104        let add_option = |item: &mut Option<UserOption>, user_option| {
1105            let old_value = item.replace(user_option);
1106            if old_value.is_some() {
1107                parser_err!("conflicting or redundant options");
1108            }
1109            Ok(())
1110        };
1111        let _ = parser.parse_keyword(Keyword::WITH);
1112        loop {
1113            let token = parser.peek_token();
1114            if token == Token::EOF || token == Token::SemiColon {
1115                break;
1116            }
1117
1118            let option = parser.parse_identifier()?;
1119            let s = option.real_value();
1120            let (item_mut_ref, user_option) = match &s[..] {
1121                "superuser" => (&mut builder.super_user, UserOption::SuperUser),
1122                "nosuperuser" => (&mut builder.super_user, UserOption::NoSuperUser),
1123                "createdb" => (&mut builder.create_db, UserOption::CreateDB),
1124                "nocreatedb" => (&mut builder.create_db, UserOption::NoCreateDB),
1125                "createuser" => (&mut builder.create_user, UserOption::CreateUser),
1126                "nocreateuser" => (&mut builder.create_user, UserOption::NoCreateUser),
1127                "login" => (&mut builder.login, UserOption::Login),
1128                "nologin" => (&mut builder.login, UserOption::NoLogin),
1129                "admin" => (&mut builder.admin, UserOption::Admin),
1130                "noadmin" => (&mut builder.admin, UserOption::NoAdmin),
1131                "password" => {
1132                    if parser.parse_keyword(Keyword::NULL) {
1133                        (&mut builder.password, UserOption::Password(None))
1134                    } else {
1135                        (
1136                            &mut builder.password,
1137                            UserOption::Password(Some(AstString::parse_to(parser)?)),
1138                        )
1139                    }
1140                }
1141                "encrypted" => {
1142                    let option = parser.parse_identifier()?;
1143                    let s = option.real_value();
1144                    if s != "password" {
1145                        parser_err!("expected PASSWORD after ENCRYPTED, found {}", option);
1146                    }
1147                    (
1148                        &mut builder.password,
1149                        UserOption::EncryptedPassword(AstString::parse_to(parser)?),
1150                    )
1151                }
1152                "oauth" => {
1153                    let options = parser.parse_options()?;
1154                    (&mut builder.password, UserOption::OAuth(options))
1155                }
1156                _ => {
1157                    parser_err!("unexpected user option: {}", option);
1158                }
1159            };
1160            add_option(item_mut_ref, user_option)?;
1161        }
1162        Ok(builder.build())
1163    }
1164}
1165
1166impl fmt::Display for UserOptions {
1167    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1168        if !self.0.is_empty() {
1169            write!(f, "WITH {}", display_separated(self.0.as_slice(), " "))
1170        } else {
1171            Ok(())
1172        }
1173    }
1174}
1175
1176impl ParseTo for CreateUserStatement {
1177    fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
1178        impl_parse_to!(user_name: ObjectName, p);
1179        impl_parse_to!(with_options: UserOptions, p);
1180
1181        Ok(CreateUserStatement {
1182            user_name,
1183            with_options,
1184        })
1185    }
1186}
1187
1188impl fmt::Display for CreateUserStatement {
1189    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1190        let mut v: Vec<String> = vec![];
1191        impl_fmt_display!(user_name, v, self);
1192        impl_fmt_display!(with_options, v, self);
1193        v.iter().join(" ").fmt(f)
1194    }
1195}
1196
1197impl fmt::Display for AlterUserMode {
1198    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1199        match self {
1200            AlterUserMode::Options(options) => {
1201                write!(f, "{}", options)
1202            }
1203            AlterUserMode::Rename(new_name) => {
1204                write!(f, "RENAME TO {}", new_name)
1205            }
1206        }
1207    }
1208}
1209
1210impl fmt::Display for AlterUserStatement {
1211    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1212        let mut v: Vec<String> = vec![];
1213        impl_fmt_display!(user_name, v, self);
1214        impl_fmt_display!(mode, v, self);
1215        v.iter().join(" ").fmt(f)
1216    }
1217}
1218
1219impl ParseTo for AlterUserStatement {
1220    fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
1221        impl_parse_to!(user_name: ObjectName, p);
1222        impl_parse_to!(mode: AlterUserMode, p);
1223
1224        Ok(AlterUserStatement { user_name, mode })
1225    }
1226}
1227
1228impl ParseTo for AlterUserMode {
1229    fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
1230        if p.parse_keyword(Keyword::RENAME) {
1231            p.expect_keyword(Keyword::TO)?;
1232            impl_parse_to!(new_name: ObjectName, p);
1233            Ok(AlterUserMode::Rename(new_name))
1234        } else {
1235            impl_parse_to!(with_options: UserOptions, p);
1236            Ok(AlterUserMode::Options(with_options))
1237        }
1238    }
1239}
1240
1241#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1242pub struct DropStatement {
1243    /// The type of the object to drop: TABLE, VIEW, etc.
1244    pub object_type: ObjectType,
1245    /// An optional `IF EXISTS` clause. (Non-standard.)
1246    pub if_exists: bool,
1247    /// Object to drop.
1248    pub object_name: ObjectName,
1249    /// Whether `CASCADE` was specified. This will be `false` when
1250    /// `RESTRICT` or no drop behavior at all was specified.
1251    pub drop_mode: AstOption<DropMode>,
1252}
1253
1254// sql_grammar!(DropStatement {
1255//     object_type: ObjectType,
1256//     if_exists => [Keyword::IF, Keyword::EXISTS],
1257//     name: ObjectName,
1258//     drop_mode: AstOption<DropMode>,
1259// });
1260impl ParseTo for DropStatement {
1261    fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
1262        impl_parse_to!(object_type: ObjectType, p);
1263        impl_parse_to!(if_exists => [Keyword::IF, Keyword::EXISTS], p);
1264        let object_name = p.parse_object_name()?;
1265        impl_parse_to!(drop_mode: AstOption<DropMode>, p);
1266        Ok(Self {
1267            object_type,
1268            if_exists,
1269            object_name,
1270            drop_mode,
1271        })
1272    }
1273}
1274
1275impl fmt::Display for DropStatement {
1276    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1277        let mut v: Vec<String> = vec![];
1278        impl_fmt_display!(object_type, v, self);
1279        impl_fmt_display!(if_exists => [Keyword::IF, Keyword::EXISTS], v, self);
1280        impl_fmt_display!(object_name, v, self);
1281        impl_fmt_display!(drop_mode, v, self);
1282        v.iter().join(" ").fmt(f)
1283    }
1284}
1285
1286#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1287pub enum DropMode {
1288    Cascade,
1289    Restrict,
1290}
1291
1292impl ParseTo for DropMode {
1293    fn parse_to(parser: &mut Parser<'_>) -> ModalResult<Self> {
1294        let drop_mode = if parser.parse_keyword(Keyword::CASCADE) {
1295            DropMode::Cascade
1296        } else if parser.parse_keyword(Keyword::RESTRICT) {
1297            DropMode::Restrict
1298        } else {
1299            return parser.expected("CASCADE | RESTRICT");
1300        };
1301        Ok(drop_mode)
1302    }
1303}
1304
1305impl fmt::Display for DropMode {
1306    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1307        f.write_str(match self {
1308            DropMode::Cascade => "CASCADE",
1309            DropMode::Restrict => "RESTRICT",
1310        })
1311    }
1312}