risingwave_sqlparser/ast/
statement.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt;
16use std::fmt::{Formatter, Write};
17
18use itertools::Itertools;
19use winnow::ModalResult;
20
21use super::ddl::SourceWatermark;
22use super::legacy_source::{CompatibleFormatEncode, parse_format_encode};
23use super::{EmitMode, Ident, ObjectType, Query, Value};
24use crate::ast::{
25    ColumnDef, ObjectName, SqlOption, TableConstraint, display_comma_separated, display_separated,
26};
27use crate::keywords::Keyword;
28use crate::parser::{IncludeOption, IsOptional, Parser};
29use crate::parser_err;
30use crate::parser_v2::literal_u32;
31use crate::tokenizer::Token;
32
33/// Consumes token from the parser into an AST node.
34pub trait ParseTo: Sized {
35    fn parse_to(parser: &mut Parser<'_>) -> ModalResult<Self>;
36}
37
38#[macro_export]
39macro_rules! impl_parse_to {
40    () => {};
41    ($field:ident : $field_type:ty, $parser:ident) => {
42        let $field = <$field_type>::parse_to($parser)?;
43    };
44    ($field:ident => [$($arr:tt)+], $parser:ident) => {
45        let $field = $parser.parse_keywords(&[$($arr)+]);
46    };
47    ([$($arr:tt)+], $parser:ident) => {
48        $parser.expect_keywords(&[$($arr)+])?;
49    };
50}
51
52#[macro_export]
53macro_rules! impl_fmt_display {
54    () => {};
55    ($field:ident, $v:ident, $self:ident) => {{
56        let s = format!("{}", $self.$field);
57        if !s.is_empty() {
58            $v.push(s);
59        }
60    }};
61    ($field:ident => [$($arr:tt)+], $v:ident, $self:ident) => {
62        if $self.$field {
63            $v.push(format!("{}", display_separated(&[$($arr)+], " ")));
64        }
65    };
66    ([$($arr:tt)+], $v:ident) => {
67        $v.push(format!("{}", display_separated(&[$($arr)+], " ")));
68    };
69}
70
71// sql_grammar!(CreateSourceStatement {
72//     if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS],
73//     source_name: Ident,
74//     with_properties: AstOption<WithProperties>,
75//     [Keyword::ROW, Keyword::FORMAT],
76//     format_encode: SourceSchema,
77//     [Keyword::WATERMARK, Keyword::FOR] column [Keyword::AS] <expr>
78// });
79#[derive(Debug, Clone, PartialEq, Eq, Hash)]
80pub struct CreateSourceStatement {
81    pub temporary: bool,
82    pub if_not_exists: bool,
83    pub columns: Vec<ColumnDef>,
84    // The wildchar position in columns defined in sql. Only exist when using external schema.
85    pub wildcard_idx: Option<usize>,
86    pub constraints: Vec<TableConstraint>,
87    pub source_name: ObjectName,
88    pub with_properties: WithProperties,
89    pub format_encode: CompatibleFormatEncode,
90    pub source_watermarks: Vec<SourceWatermark>,
91    pub include_column_options: IncludeOption,
92}
93
94/// FORMAT means how to get the operation(Insert/Delete) from the input.
95///
96/// Check `CONNECTORS_COMPATIBLE_FORMATS` for what `FORMAT ... ENCODE ...` combinations are allowed.
97#[derive(Debug, Clone, PartialEq, Eq, Hash)]
98pub enum Format {
99    /// The format is the same with RisingWave's internal representation.
100    /// Used internally for schema change
101    Native,
102    /// for self-explanatory sources like iceberg, they have their own format, and should not be specified by user.
103    None,
104    // Keyword::DEBEZIUM
105    Debezium,
106    // Keyword::DEBEZIUM_MONGO
107    DebeziumMongo,
108    // Keyword::MAXWELL
109    Maxwell,
110    // Keyword::CANAL
111    Canal,
112    // Keyword::UPSERT
113    Upsert,
114    // Keyword::PLAIN
115    Plain,
116}
117
118// TODO: unify with `from_keyword`
119impl fmt::Display for Format {
120    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
121        write!(
122            f,
123            "{}",
124            match self {
125                Format::Native => "NATIVE",
126                Format::Debezium => "DEBEZIUM",
127                Format::DebeziumMongo => "DEBEZIUM_MONGO",
128                Format::Maxwell => "MAXWELL",
129                Format::Canal => "CANAL",
130                Format::Upsert => "UPSERT",
131                Format::Plain => "PLAIN",
132                Format::None => "NONE",
133            }
134        )
135    }
136}
137
138impl Format {
139    pub fn from_keyword(s: &str) -> ModalResult<Self> {
140        Ok(match s {
141            "DEBEZIUM" => Format::Debezium,
142            "DEBEZIUM_MONGO" => Format::DebeziumMongo,
143            "MAXWELL" => Format::Maxwell,
144            "CANAL" => Format::Canal,
145            "PLAIN" => Format::Plain,
146            "UPSERT" => Format::Upsert,
147            "NATIVE" => Format::Native,
148            "NONE" => Format::None,
149            _ => parser_err!(
150                "expected CANAL | PROTOBUF | DEBEZIUM | MAXWELL | PLAIN | NATIVE | NONE after FORMAT"
151            ),
152        })
153    }
154}
155
156/// Check `CONNECTORS_COMPATIBLE_FORMATS` for what `FORMAT ... ENCODE ...` combinations are allowed.
157#[derive(Debug, Clone, PartialEq, Eq, Hash)]
158pub enum Encode {
159    Avro,     // Keyword::Avro
160    Csv,      // Keyword::CSV
161    Protobuf, // Keyword::PROTOBUF
162    Json,     // Keyword::JSON
163    Bytes,    // Keyword::BYTES
164    /// for self-explanatory sources like iceberg, they have their own format, and should not be specified by user.
165    None,
166    Text, // Keyword::TEXT
167    /// The encode is the same with RisingWave's internal representation.
168    /// Used internally for schema change
169    Native,
170    Template,
171    Parquet,
172}
173
174// TODO: unify with `from_keyword`
175impl fmt::Display for Encode {
176    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
177        write!(
178            f,
179            "{}",
180            match self {
181                Encode::Avro => "AVRO",
182                Encode::Csv => "CSV",
183                Encode::Protobuf => "PROTOBUF",
184                Encode::Json => "JSON",
185                Encode::Bytes => "BYTES",
186                Encode::Native => "NATIVE",
187                Encode::Template => "TEMPLATE",
188                Encode::None => "NONE",
189                Encode::Parquet => "PARQUET",
190                Encode::Text => "TEXT",
191            }
192        )
193    }
194}
195
196impl Encode {
197    pub fn from_keyword(s: &str) -> ModalResult<Self> {
198        Ok(match s {
199            "AVRO" => Encode::Avro,
200            "TEXT" => Encode::Text,
201            "BYTES" => Encode::Bytes,
202            "CSV" => Encode::Csv,
203            "PROTOBUF" => Encode::Protobuf,
204            "JSON" => Encode::Json,
205            "TEMPLATE" => Encode::Template,
206            "PARQUET" => Encode::Parquet,
207            "NATIVE" => Encode::Native,
208            "NONE" => Encode::None,
209            _ => parser_err!(
210                "expected AVRO | BYTES | CSV | PROTOBUF | JSON | NATIVE | TEMPLATE | PARQUET | NONE after Encode"
211            ),
212        })
213    }
214}
215
216/// `FORMAT ... ENCODE ... [(a=b, ...)] [KEY ENCODE ...]`
217#[derive(Debug, Clone, PartialEq, Eq, Hash)]
218pub struct FormatEncodeOptions {
219    pub format: Format,
220    pub row_encode: Encode,
221    pub row_options: Vec<SqlOption>,
222
223    pub key_encode: Option<Encode>,
224}
225
226impl Parser<'_> {
227    /// Peek the next tokens to see if it is `FORMAT` or `ROW FORMAT` (for compatibility).
228    fn peek_format_encode_format(&mut self) -> bool {
229        (self.peek_nth_any_of_keywords(0, &[Keyword::ROW])
230            && self.peek_nth_any_of_keywords(1, &[Keyword::FORMAT])) // ROW FORMAT
231            || self.peek_nth_any_of_keywords(0, &[Keyword::FORMAT]) // FORMAT
232    }
233
234    /// Parse the source schema. The behavior depends on the `connector` type.
235    pub fn parse_format_encode_with_connector(
236        &mut self,
237        connector: &str,
238        cdc_source_job: bool,
239    ) -> ModalResult<CompatibleFormatEncode> {
240        // row format for cdc source must be debezium json
241        // row format for nexmark source must be native
242        // default row format for datagen source is native
243        // FIXME: parse input `connector` to enum type instead using string here
244        if connector.contains("-cdc") {
245            let expected = if cdc_source_job {
246                FormatEncodeOptions::plain_json()
247            } else if connector.contains("mongodb") {
248                FormatEncodeOptions::debezium_mongo_json()
249            } else {
250                FormatEncodeOptions::debezium_json()
251            };
252
253            if self.peek_format_encode_format() {
254                let schema = parse_format_encode(self)?.into_v2();
255                if schema != expected {
256                    parser_err!(
257                        "Row format for CDC connectors should be \
258                         either omitted or set to `{expected}`",
259                    );
260                }
261            }
262            Ok(expected.into())
263        } else if connector.contains("nexmark") {
264            let expected = FormatEncodeOptions::native();
265            if self.peek_format_encode_format() {
266                let schema = parse_format_encode(self)?.into_v2();
267                if schema != expected {
268                    parser_err!(
269                        "Row format for nexmark connectors should be \
270                         either omitted or set to `{expected}`",
271                    );
272                }
273            }
274            Ok(expected.into())
275        } else if connector.contains("datagen") {
276            Ok(if self.peek_format_encode_format() {
277                parse_format_encode(self)?
278            } else {
279                FormatEncodeOptions::native().into()
280            })
281        } else if connector.contains("iceberg") {
282            let expected = FormatEncodeOptions::none();
283            if self.peek_format_encode_format() {
284                let schema = parse_format_encode(self)?.into_v2();
285                if schema != expected {
286                    parser_err!(
287                        "Row format for iceberg connectors should be \
288                         either omitted or set to `{expected}`",
289                    );
290                }
291            }
292            Ok(expected.into())
293        } else if connector.contains("webhook") {
294            parser_err!(
295                "Source with webhook connector is not supported. \
296                 Please use the `CREATE TABLE ... WITH ...` statement instead.",
297            );
298        } else {
299            Ok(parse_format_encode(self)?)
300        }
301    }
302
303    /// Parse `FORMAT ... ENCODE ... (...)`.
304    pub fn parse_schema(&mut self) -> ModalResult<Option<FormatEncodeOptions>> {
305        if !self.parse_keyword(Keyword::FORMAT) {
306            return Ok(None);
307        }
308
309        let id = self.parse_identifier()?;
310        let s = id.value.to_ascii_uppercase();
311        let format = Format::from_keyword(&s)?;
312        self.expect_keyword(Keyword::ENCODE)?;
313        let id = self.parse_identifier()?;
314        let s = id.value.to_ascii_uppercase();
315        let row_encode = Encode::from_keyword(&s)?;
316        let row_options = self.parse_options()?;
317
318        let key_encode = if self.parse_keywords(&[Keyword::KEY, Keyword::ENCODE]) {
319            Some(Encode::from_keyword(
320                self.parse_identifier()?.value.to_ascii_uppercase().as_str(),
321            )?)
322        } else {
323            None
324        };
325
326        Ok(Some(FormatEncodeOptions {
327            format,
328            row_encode,
329            row_options,
330            key_encode,
331        }))
332    }
333}
334
335impl FormatEncodeOptions {
336    pub const fn plain_json() -> Self {
337        FormatEncodeOptions {
338            format: Format::Plain,
339            row_encode: Encode::Json,
340            row_options: Vec::new(),
341            key_encode: None,
342        }
343    }
344
345    /// Create a new source schema with `Debezium` format and `Json` encoding.
346    pub const fn debezium_json() -> Self {
347        FormatEncodeOptions {
348            format: Format::Debezium,
349            row_encode: Encode::Json,
350            row_options: Vec::new(),
351            key_encode: None,
352        }
353    }
354
355    pub const fn debezium_mongo_json() -> Self {
356        FormatEncodeOptions {
357            format: Format::DebeziumMongo,
358            row_encode: Encode::Json,
359            row_options: Vec::new(),
360            key_encode: None,
361        }
362    }
363
364    /// Create a new source schema with `Native` format and encoding.
365    pub const fn native() -> Self {
366        FormatEncodeOptions {
367            format: Format::Native,
368            row_encode: Encode::Native,
369            row_options: Vec::new(),
370            key_encode: None,
371        }
372    }
373
374    /// Create a new source schema with `None` format and encoding.
375    /// Used for self-explanatory source like iceberg.
376    pub const fn none() -> Self {
377        FormatEncodeOptions {
378            format: Format::None,
379            row_encode: Encode::None,
380            row_options: Vec::new(),
381            key_encode: None,
382        }
383    }
384
385    pub fn row_options(&self) -> &[SqlOption] {
386        self.row_options.as_ref()
387    }
388}
389
390impl fmt::Display for FormatEncodeOptions {
391    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
392        write!(f, "FORMAT {} ENCODE {}", self.format, self.row_encode)?;
393
394        if !self.row_options().is_empty() {
395            write!(f, " ({})", display_comma_separated(self.row_options()))?;
396        }
397
398        if let Some(key_encode) = &self.key_encode {
399            write!(f, " KEY ENCODE {}", key_encode)?;
400        }
401
402        Ok(())
403    }
404}
405
406pub(super) fn fmt_create_items(
407    columns: &[ColumnDef],
408    constraints: &[TableConstraint],
409    watermarks: &[SourceWatermark],
410    wildcard_idx: Option<usize>,
411) -> std::result::Result<String, fmt::Error> {
412    let mut items = String::new();
413    let has_items = !columns.is_empty()
414        || !constraints.is_empty()
415        || !watermarks.is_empty()
416        || wildcard_idx.is_some();
417    has_items.then(|| write!(&mut items, "("));
418
419    if let Some(wildcard_idx) = wildcard_idx {
420        let (columns_l, columns_r) = columns.split_at(wildcard_idx);
421        write!(&mut items, "{}", display_comma_separated(columns_l))?;
422        if !columns_l.is_empty() {
423            write!(&mut items, ", ")?;
424        }
425        write!(&mut items, "{}", Token::Mul)?;
426        if !columns_r.is_empty() {
427            write!(&mut items, ", ")?;
428        }
429        write!(&mut items, "{}", display_comma_separated(columns_r))?;
430    } else {
431        write!(&mut items, "{}", display_comma_separated(columns))?;
432    }
433    let mut leading_items = !columns.is_empty() || wildcard_idx.is_some();
434
435    if leading_items && !constraints.is_empty() {
436        write!(&mut items, ", ")?;
437    }
438    write!(&mut items, "{}", display_comma_separated(constraints))?;
439    leading_items |= !constraints.is_empty();
440
441    if leading_items && !watermarks.is_empty() {
442        write!(&mut items, ", ")?;
443    }
444    write!(&mut items, "{}", display_comma_separated(watermarks))?;
445    // uncomment this when adding more sections below
446    // leading_items |= !watermarks.is_empty();
447
448    has_items.then(|| write!(&mut items, ")"));
449    Ok(items)
450}
451
452impl fmt::Display for CreateSourceStatement {
453    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
454        let mut v: Vec<String> = vec![];
455        impl_fmt_display!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], v, self);
456        impl_fmt_display!(source_name, v, self);
457
458        let items = fmt_create_items(
459            &self.columns,
460            &self.constraints,
461            &self.source_watermarks,
462            self.wildcard_idx,
463        )?;
464        if !items.is_empty() {
465            v.push(items);
466        }
467
468        for item in &self.include_column_options {
469            v.push(format!("{}", item));
470        }
471
472        // skip format_encode for cdc source
473        let is_cdc_source = self.with_properties.0.iter().any(|option| {
474            option.name.real_value().eq_ignore_ascii_case("connector")
475                && option.value.to_string().contains("cdc")
476        });
477
478        impl_fmt_display!(with_properties, v, self);
479        if !is_cdc_source {
480            impl_fmt_display!(format_encode, v, self);
481        }
482        v.iter().join(" ").fmt(f)
483    }
484}
485
486#[derive(Debug, Clone, PartialEq, Eq, Hash)]
487pub enum CreateSink {
488    From(ObjectName),
489    AsQuery(Box<Query>),
490}
491
492impl fmt::Display for CreateSink {
493    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
494        match self {
495            Self::From(mv) => write!(f, "FROM {}", mv),
496            Self::AsQuery(query) => write!(f, "AS {}", query),
497        }
498    }
499}
500// sql_grammar!(CreateSinkStatement {
501//     if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS],
502//     sink_name: Ident,
503//     [Keyword::FROM],
504//     materialized_view: Ident,
505//     with_properties: AstOption<WithProperties>,
506// });
507#[derive(Debug, Clone, PartialEq, Eq, Hash)]
508pub struct CreateSinkStatement {
509    pub if_not_exists: bool,
510    pub sink_name: ObjectName,
511    pub with_properties: WithProperties,
512    pub sink_from: CreateSink,
513
514    // only used when creating sink into a table
515    // insert to specific columns of the target table
516    pub columns: Vec<Ident>,
517    pub emit_mode: Option<EmitMode>,
518    pub sink_schema: Option<FormatEncodeOptions>,
519    pub into_table_name: Option<ObjectName>,
520}
521
522impl ParseTo for CreateSinkStatement {
523    fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
524        impl_parse_to!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], p);
525        impl_parse_to!(sink_name: ObjectName, p);
526
527        let mut target_spec_columns = Vec::new();
528        let into_table_name = if p.parse_keyword(Keyword::INTO) {
529            impl_parse_to!(into_table_name: ObjectName, p);
530
531            // we only allow specify columns when creating sink into a table
532            target_spec_columns = p.parse_parenthesized_column_list(IsOptional::Optional)?;
533            Some(into_table_name)
534        } else {
535            None
536        };
537
538        let sink_from = if p.parse_keyword(Keyword::FROM) {
539            impl_parse_to!(from_name: ObjectName, p);
540            CreateSink::From(from_name)
541        } else if p.parse_keyword(Keyword::AS) {
542            let query = Box::new(p.parse_query()?);
543            CreateSink::AsQuery(query)
544        } else {
545            p.expected("FROM or AS after CREATE SINK sink_name")?
546        };
547
548        let emit_mode: Option<EmitMode> = p.parse_emit_mode()?;
549
550        // This check cannot be put into the `WithProperties::parse_to`, since other
551        // statements may not need the with properties.
552        if !p.peek_nth_any_of_keywords(0, &[Keyword::WITH]) && into_table_name.is_none() {
553            p.expected("WITH")?
554        }
555        impl_parse_to!(with_properties: WithProperties, p);
556
557        if with_properties.0.is_empty() && into_table_name.is_none() {
558            parser_err!("sink properties not provided");
559        }
560
561        let sink_schema = p.parse_schema()?;
562
563        Ok(Self {
564            if_not_exists,
565            sink_name,
566            with_properties,
567            sink_from,
568            columns: target_spec_columns,
569            emit_mode,
570            sink_schema,
571            into_table_name,
572        })
573    }
574}
575
576impl fmt::Display for CreateSinkStatement {
577    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
578        let mut v: Vec<String> = vec![];
579        impl_fmt_display!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], v, self);
580        impl_fmt_display!(sink_name, v, self);
581        if let Some(into_table) = &self.into_table_name {
582            impl_fmt_display!([Keyword::INTO], v);
583            impl_fmt_display!([into_table], v);
584            if !self.columns.is_empty() {
585                v.push(format!("({})", display_comma_separated(&self.columns)));
586            }
587        }
588        impl_fmt_display!(sink_from, v, self);
589        if let Some(ref emit_mode) = self.emit_mode {
590            v.push(format!("EMIT {}", emit_mode));
591        }
592        impl_fmt_display!(with_properties, v, self);
593        if let Some(schema) = &self.sink_schema {
594            v.push(format!("{}", schema));
595        }
596        v.iter().join(" ").fmt(f)
597    }
598}
599
600// sql_grammar!(CreateSubscriptionStatement {
601//     if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS],
602//     subscription_name: Ident,
603//     [Keyword::FROM],
604//     materialized_view: Ident,
605//     with_properties: AstOption<WithProperties>,
606// });
607#[derive(Debug, Clone, PartialEq, Eq, Hash)]
608pub struct CreateSubscriptionStatement {
609    pub if_not_exists: bool,
610    pub subscription_name: ObjectName,
611    pub with_properties: WithProperties,
612    pub subscription_from: ObjectName,
613    // pub emit_mode: Option<EmitMode>,
614}
615
616impl ParseTo for CreateSubscriptionStatement {
617    fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
618        impl_parse_to!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], p);
619        impl_parse_to!(subscription_name: ObjectName, p);
620
621        let subscription_from = if p.parse_keyword(Keyword::FROM) {
622            impl_parse_to!(from_name: ObjectName, p);
623            from_name
624        } else {
625            p.expected("FROM after CREATE SUBSCRIPTION subscription_name")?
626        };
627
628        // let emit_mode = p.parse_emit_mode()?;
629
630        // This check cannot be put into the `WithProperties::parse_to`, since other
631        // statements may not need the with properties.
632        if !p.peek_nth_any_of_keywords(0, &[Keyword::WITH]) {
633            p.expected("WITH")?
634        }
635        impl_parse_to!(with_properties: WithProperties, p);
636
637        if with_properties.0.is_empty() {
638            parser_err!("subscription properties not provided");
639        }
640
641        Ok(Self {
642            if_not_exists,
643            subscription_name,
644            with_properties,
645            subscription_from,
646        })
647    }
648}
649
650impl fmt::Display for CreateSubscriptionStatement {
651    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
652        let mut v: Vec<String> = vec![];
653        impl_fmt_display!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], v, self);
654        impl_fmt_display!(subscription_name, v, self);
655        v.push(format!("FROM {}", self.subscription_from));
656        impl_fmt_display!(with_properties, v, self);
657        v.iter().join(" ").fmt(f)
658    }
659}
660
661#[derive(Debug, Clone, PartialEq, Eq, Hash)]
662pub enum DeclareCursor {
663    Query(Box<Query>),
664    Subscription(ObjectName, Since),
665}
666
667impl fmt::Display for DeclareCursor {
668    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
669        let mut v: Vec<String> = vec![];
670        match self {
671            DeclareCursor::Query(query) => v.push(format!("{}", query.as_ref())),
672            DeclareCursor::Subscription(name, since) => {
673                v.push(format!("{}", name));
674                v.push(format!("{:?}", since));
675            }
676        }
677        v.iter().join(" ").fmt(f)
678    }
679}
680
681// sql_grammar!(DeclareCursorStatement {
682//     cursor_name: Ident,
683//     [Keyword::SUBSCRIPTION]
684//     [Keyword::CURSOR],
685//     [Keyword::FOR],
686//     subscription: Ident or query: Query,
687//     [Keyword::SINCE],
688//     rw_timestamp: Ident,
689// });
690#[derive(Debug, Clone, PartialEq, Eq, Hash)]
691pub struct DeclareCursorStatement {
692    pub cursor_name: Ident,
693    pub declare_cursor: DeclareCursor,
694}
695
696impl ParseTo for DeclareCursorStatement {
697    fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
698        let cursor_name = p.parse_identifier_non_reserved()?;
699
700        let declare_cursor = if !p.parse_keyword(Keyword::SUBSCRIPTION) {
701            p.expect_keyword(Keyword::CURSOR)?;
702            p.expect_keyword(Keyword::FOR)?;
703            DeclareCursor::Query(Box::new(p.parse_query()?))
704        } else {
705            p.expect_keyword(Keyword::CURSOR)?;
706            p.expect_keyword(Keyword::FOR)?;
707            let cursor_for_name = p.parse_object_name()?;
708            let rw_timestamp = p.parse_since()?;
709            DeclareCursor::Subscription(cursor_for_name, rw_timestamp)
710        };
711
712        Ok(Self {
713            cursor_name,
714            declare_cursor,
715        })
716    }
717}
718
719impl fmt::Display for DeclareCursorStatement {
720    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
721        let mut v: Vec<String> = vec![];
722        impl_fmt_display!(cursor_name, v, self);
723        match &self.declare_cursor {
724            DeclareCursor::Query(_) => {
725                v.push("CURSOR FOR ".to_owned());
726            }
727            DeclareCursor::Subscription { .. } => {
728                v.push("SUBSCRIPTION CURSOR FOR ".to_owned());
729            }
730        }
731        impl_fmt_display!(declare_cursor, v, self);
732        v.iter().join(" ").fmt(f)
733    }
734}
735
736// sql_grammar!(FetchCursorStatement {
737//     cursor_name: Ident,
738// });
739#[derive(Debug, Clone, PartialEq, Eq, Hash)]
740pub struct FetchCursorStatement {
741    pub cursor_name: Ident,
742    pub count: u32,
743    pub with_properties: WithProperties,
744}
745
746impl ParseTo for FetchCursorStatement {
747    fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
748        let count = if p.parse_keyword(Keyword::NEXT) {
749            1
750        } else {
751            literal_u32(p)?
752        };
753        p.expect_keyword(Keyword::FROM)?;
754        let cursor_name = p.parse_identifier_non_reserved()?;
755        impl_parse_to!(with_properties: WithProperties, p);
756
757        Ok(Self {
758            cursor_name,
759            count,
760            with_properties,
761        })
762    }
763}
764
765impl fmt::Display for FetchCursorStatement {
766    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
767        let mut v: Vec<String> = vec![];
768        if self.count == 1 {
769            v.push("NEXT ".to_owned());
770        } else {
771            impl_fmt_display!(count, v, self);
772        }
773        v.push("FROM ".to_owned());
774        impl_fmt_display!(cursor_name, v, self);
775        v.iter().join(" ").fmt(f)
776    }
777}
778
779// sql_grammar!(CloseCursorStatement {
780//     cursor_name: Ident,
781// });
782#[derive(Debug, Clone, PartialEq, Eq, Hash)]
783pub struct CloseCursorStatement {
784    pub cursor_name: Option<Ident>,
785}
786
787impl ParseTo for CloseCursorStatement {
788    fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
789        let cursor_name = if p.parse_keyword(Keyword::ALL) {
790            None
791        } else {
792            Some(p.parse_identifier_non_reserved()?)
793        };
794
795        Ok(Self { cursor_name })
796    }
797}
798
799impl fmt::Display for CloseCursorStatement {
800    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
801        let mut v: Vec<String> = vec![];
802        if let Some(cursor_name) = &self.cursor_name {
803            v.push(format!("{}", cursor_name));
804        } else {
805            v.push("ALL".to_owned());
806        }
807        v.iter().join(" ").fmt(f)
808    }
809}
810
811// sql_grammar!(CreateConnectionStatement {
812//     if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS],
813//     connection_name: Ident,
814//     with_properties: AstOption<WithProperties>,
815// });
816#[derive(Debug, Clone, PartialEq, Eq, Hash)]
817pub struct CreateConnectionStatement {
818    pub if_not_exists: bool,
819    pub connection_name: ObjectName,
820    pub with_properties: WithProperties,
821}
822
823impl ParseTo for CreateConnectionStatement {
824    fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
825        impl_parse_to!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], p);
826        impl_parse_to!(connection_name: ObjectName, p);
827        impl_parse_to!(with_properties: WithProperties, p);
828        if with_properties.0.is_empty() {
829            parser_err!("connection properties not provided");
830        }
831
832        Ok(Self {
833            if_not_exists,
834            connection_name,
835            with_properties,
836        })
837    }
838}
839
840impl fmt::Display for CreateConnectionStatement {
841    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
842        let mut v: Vec<String> = vec![];
843        impl_fmt_display!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], v, self);
844        impl_fmt_display!(connection_name, v, self);
845        impl_fmt_display!(with_properties, v, self);
846        v.iter().join(" ").fmt(f)
847    }
848}
849
850#[derive(Debug, Clone, PartialEq, Eq, Hash)]
851pub struct CreateSecretStatement {
852    pub if_not_exists: bool,
853    pub secret_name: ObjectName,
854    pub credential: Value,
855    pub with_properties: WithProperties,
856}
857
858impl ParseTo for CreateSecretStatement {
859    fn parse_to(parser: &mut Parser<'_>) -> ModalResult<Self> {
860        impl_parse_to!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], parser);
861        impl_parse_to!(secret_name: ObjectName, parser);
862        impl_parse_to!(with_properties: WithProperties, parser);
863        let mut credential = Value::Null;
864        if parser.parse_keyword(Keyword::AS) {
865            credential = parser.ensure_parse_value()?;
866        }
867        Ok(Self {
868            if_not_exists,
869            secret_name,
870            credential,
871            with_properties,
872        })
873    }
874}
875
876impl fmt::Display for CreateSecretStatement {
877    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
878        let mut v: Vec<String> = vec![];
879        impl_fmt_display!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], v, self);
880        impl_fmt_display!(secret_name, v, self);
881        impl_fmt_display!(with_properties, v, self);
882        if self.credential != Value::Null {
883            v.push("AS".to_owned());
884            impl_fmt_display!(credential, v, self);
885        }
886        v.iter().join(" ").fmt(f)
887    }
888}
889
890#[derive(Debug, Clone, PartialEq, Eq, Hash)]
891pub struct WithProperties(pub Vec<SqlOption>);
892
893impl ParseTo for WithProperties {
894    fn parse_to(parser: &mut Parser<'_>) -> ModalResult<Self> {
895        Ok(Self(
896            parser.parse_options_with_preceding_keyword(Keyword::WITH)?,
897        ))
898    }
899}
900
901impl fmt::Display for WithProperties {
902    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
903        if !self.0.is_empty() {
904            write!(f, "WITH ({})", display_comma_separated(self.0.as_slice()))
905        } else {
906            Ok(())
907        }
908    }
909}
910
911#[derive(Debug, Clone, PartialEq, Eq, Hash)]
912pub enum Since {
913    TimestampMsNum(u64),
914    ProcessTime,
915    Begin,
916    Full,
917}
918
919impl fmt::Display for Since {
920    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
921        use Since::*;
922        match self {
923            TimestampMsNum(ts) => write!(f, " SINCE {}", ts),
924            ProcessTime => write!(f, " SINCE PROCTIME()"),
925            Begin => write!(f, " SINCE BEGIN()"),
926            Full => write!(f, " FULL"),
927        }
928    }
929}
930
931#[derive(Debug, Clone, PartialEq, Eq, Hash)]
932pub struct RowSchemaLocation {
933    pub value: AstString,
934}
935
936impl ParseTo for RowSchemaLocation {
937    fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
938        impl_parse_to!([Keyword::ROW, Keyword::SCHEMA, Keyword::LOCATION], p);
939        impl_parse_to!(value: AstString, p);
940        Ok(Self { value })
941    }
942}
943
944impl fmt::Display for RowSchemaLocation {
945    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
946        let mut v = vec![];
947        impl_fmt_display!([Keyword::ROW, Keyword::SCHEMA, Keyword::LOCATION], v);
948        impl_fmt_display!(value, v, self);
949        v.iter().join(" ").fmt(f)
950    }
951}
952
953/// String literal. The difference with String is that it is displayed with
954/// single-quotes.
955#[derive(Debug, Clone, PartialEq, Eq, Hash)]
956pub struct AstString(pub String);
957
958impl ParseTo for AstString {
959    fn parse_to(parser: &mut Parser<'_>) -> ModalResult<Self> {
960        Ok(Self(parser.parse_literal_string()?))
961    }
962}
963
964impl fmt::Display for AstString {
965    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
966        write!(f, "'{}'", self.0)
967    }
968}
969
970/// This trait is used to replace `Option` because `fmt::Display` can not be implemented for
971/// `Option<T>`.
972#[derive(Debug, Clone, PartialEq, Eq, Hash)]
973pub enum AstOption<T> {
974    /// No value
975    None,
976    /// Some value `T`
977    Some(T),
978}
979
980impl<T: ParseTo> ParseTo for AstOption<T> {
981    fn parse_to(parser: &mut Parser<'_>) -> ModalResult<Self> {
982        match T::parse_to(parser) {
983            Ok(t) => Ok(AstOption::Some(t)),
984            Err(_) => Ok(AstOption::None),
985        }
986    }
987}
988
989impl<T: fmt::Display> fmt::Display for AstOption<T> {
990    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
991        match &self {
992            AstOption::Some(t) => t.fmt(f),
993            AstOption::None => Ok(()),
994        }
995    }
996}
997
998impl<T> From<AstOption<T>> for Option<T> {
999    fn from(val: AstOption<T>) -> Self {
1000        match val {
1001            AstOption::Some(t) => Some(t),
1002            AstOption::None => None,
1003        }
1004    }
1005}
1006
1007#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1008pub struct CreateUserStatement {
1009    pub user_name: ObjectName,
1010    pub with_options: UserOptions,
1011}
1012
1013#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1014pub struct AlterUserStatement {
1015    pub user_name: ObjectName,
1016    pub mode: AlterUserMode,
1017}
1018
1019#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1020pub enum AlterUserMode {
1021    Options(UserOptions),
1022    Rename(ObjectName),
1023}
1024
1025#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1026pub enum UserOption {
1027    SuperUser,
1028    NoSuperUser,
1029    CreateDB,
1030    NoCreateDB,
1031    CreateUser,
1032    NoCreateUser,
1033    Login,
1034    NoLogin,
1035    Admin,
1036    NoAdmin,
1037    EncryptedPassword(AstString),
1038    Password(Option<AstString>),
1039    OAuth(Vec<SqlOption>),
1040}
1041
1042impl fmt::Display for UserOption {
1043    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1044        match self {
1045            UserOption::SuperUser => write!(f, "SUPERUSER"),
1046            UserOption::NoSuperUser => write!(f, "NOSUPERUSER"),
1047            UserOption::CreateDB => write!(f, "CREATEDB"),
1048            UserOption::NoCreateDB => write!(f, "NOCREATEDB"),
1049            UserOption::CreateUser => write!(f, "CREATEUSER"),
1050            UserOption::NoCreateUser => write!(f, "NOCREATEUSER"),
1051            UserOption::Login => write!(f, "LOGIN"),
1052            UserOption::NoLogin => write!(f, "NOLOGIN"),
1053            UserOption::Admin => write!(f, "ADMIN"),
1054            UserOption::NoAdmin => write!(f, "NOADMIN"),
1055            UserOption::EncryptedPassword(p) => write!(f, "ENCRYPTED PASSWORD {}", p),
1056            UserOption::Password(None) => write!(f, "PASSWORD NULL"),
1057            UserOption::Password(Some(p)) => write!(f, "PASSWORD {}", p),
1058            UserOption::OAuth(options) => {
1059                write!(f, "({})", display_comma_separated(options.as_slice()))
1060            }
1061        }
1062    }
1063}
1064
1065#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1066pub struct UserOptions(pub Vec<UserOption>);
1067
1068#[derive(Default)]
1069struct UserOptionsBuilder {
1070    super_user: Option<UserOption>,
1071    create_db: Option<UserOption>,
1072    create_user: Option<UserOption>,
1073    login: Option<UserOption>,
1074    admin: Option<UserOption>,
1075    password: Option<UserOption>,
1076}
1077
1078impl UserOptionsBuilder {
1079    fn build(self) -> UserOptions {
1080        let mut options = vec![];
1081        if let Some(option) = self.super_user {
1082            options.push(option);
1083        }
1084        if let Some(option) = self.create_db {
1085            options.push(option);
1086        }
1087        if let Some(option) = self.create_user {
1088            options.push(option);
1089        }
1090        if let Some(option) = self.login {
1091            options.push(option);
1092        }
1093        if let Some(option) = self.admin {
1094            options.push(option);
1095        }
1096        if let Some(option) = self.password {
1097            options.push(option);
1098        }
1099        UserOptions(options)
1100    }
1101}
1102
1103impl ParseTo for UserOptions {
1104    fn parse_to(parser: &mut Parser<'_>) -> ModalResult<Self> {
1105        let mut builder = UserOptionsBuilder::default();
1106        let add_option = |item: &mut Option<UserOption>, user_option| {
1107            let old_value = item.replace(user_option);
1108            if old_value.is_some() {
1109                parser_err!("conflicting or redundant options");
1110            }
1111            Ok(())
1112        };
1113        let _ = parser.parse_keyword(Keyword::WITH);
1114        loop {
1115            let token = parser.peek_token();
1116            if token == Token::EOF || token == Token::SemiColon {
1117                break;
1118            }
1119
1120            let option = parser.parse_identifier()?;
1121            let s = option.real_value();
1122            let (item_mut_ref, user_option) = match &s[..] {
1123                "superuser" => (&mut builder.super_user, UserOption::SuperUser),
1124                "nosuperuser" => (&mut builder.super_user, UserOption::NoSuperUser),
1125                "createdb" => (&mut builder.create_db, UserOption::CreateDB),
1126                "nocreatedb" => (&mut builder.create_db, UserOption::NoCreateDB),
1127                "createuser" => (&mut builder.create_user, UserOption::CreateUser),
1128                "nocreateuser" => (&mut builder.create_user, UserOption::NoCreateUser),
1129                "login" => (&mut builder.login, UserOption::Login),
1130                "nologin" => (&mut builder.login, UserOption::NoLogin),
1131                "admin" => (&mut builder.admin, UserOption::Admin),
1132                "noadmin" => (&mut builder.admin, UserOption::NoAdmin),
1133                "password" => {
1134                    if parser.parse_keyword(Keyword::NULL) {
1135                        (&mut builder.password, UserOption::Password(None))
1136                    } else {
1137                        (
1138                            &mut builder.password,
1139                            UserOption::Password(Some(AstString::parse_to(parser)?)),
1140                        )
1141                    }
1142                }
1143                "encrypted" => {
1144                    let option = parser.parse_identifier()?;
1145                    let s = option.real_value();
1146                    if s != "password" {
1147                        parser_err!("expected PASSWORD after ENCRYPTED, found {}", option);
1148                    }
1149                    (
1150                        &mut builder.password,
1151                        UserOption::EncryptedPassword(AstString::parse_to(parser)?),
1152                    )
1153                }
1154                "oauth" => {
1155                    let options = parser.parse_options()?;
1156                    (&mut builder.password, UserOption::OAuth(options))
1157                }
1158                _ => {
1159                    parser_err!("unexpected user option: {}", option);
1160                }
1161            };
1162            add_option(item_mut_ref, user_option)?;
1163        }
1164        Ok(builder.build())
1165    }
1166}
1167
1168impl fmt::Display for UserOptions {
1169    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1170        if !self.0.is_empty() {
1171            write!(f, "WITH {}", display_separated(self.0.as_slice(), " "))
1172        } else {
1173            Ok(())
1174        }
1175    }
1176}
1177
1178impl ParseTo for CreateUserStatement {
1179    fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
1180        impl_parse_to!(user_name: ObjectName, p);
1181        impl_parse_to!(with_options: UserOptions, p);
1182
1183        Ok(CreateUserStatement {
1184            user_name,
1185            with_options,
1186        })
1187    }
1188}
1189
1190impl fmt::Display for CreateUserStatement {
1191    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1192        let mut v: Vec<String> = vec![];
1193        impl_fmt_display!(user_name, v, self);
1194        impl_fmt_display!(with_options, v, self);
1195        v.iter().join(" ").fmt(f)
1196    }
1197}
1198
1199impl fmt::Display for AlterUserMode {
1200    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1201        match self {
1202            AlterUserMode::Options(options) => {
1203                write!(f, "{}", options)
1204            }
1205            AlterUserMode::Rename(new_name) => {
1206                write!(f, "RENAME TO {}", new_name)
1207            }
1208        }
1209    }
1210}
1211
1212impl fmt::Display for AlterUserStatement {
1213    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1214        let mut v: Vec<String> = vec![];
1215        impl_fmt_display!(user_name, v, self);
1216        impl_fmt_display!(mode, v, self);
1217        v.iter().join(" ").fmt(f)
1218    }
1219}
1220
1221impl ParseTo for AlterUserStatement {
1222    fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
1223        impl_parse_to!(user_name: ObjectName, p);
1224        impl_parse_to!(mode: AlterUserMode, p);
1225
1226        Ok(AlterUserStatement { user_name, mode })
1227    }
1228}
1229
1230impl ParseTo for AlterUserMode {
1231    fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
1232        if p.parse_keyword(Keyword::RENAME) {
1233            p.expect_keyword(Keyword::TO)?;
1234            impl_parse_to!(new_name: ObjectName, p);
1235            Ok(AlterUserMode::Rename(new_name))
1236        } else {
1237            impl_parse_to!(with_options: UserOptions, p);
1238            Ok(AlterUserMode::Options(with_options))
1239        }
1240    }
1241}
1242
1243#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1244pub struct DropStatement {
1245    /// The type of the object to drop: TABLE, VIEW, etc.
1246    pub object_type: ObjectType,
1247    /// An optional `IF EXISTS` clause. (Non-standard.)
1248    pub if_exists: bool,
1249    /// Object to drop.
1250    pub object_name: ObjectName,
1251    /// Whether `CASCADE` was specified. This will be `false` when
1252    /// `RESTRICT` or no drop behavior at all was specified.
1253    pub drop_mode: AstOption<DropMode>,
1254}
1255
1256// sql_grammar!(DropStatement {
1257//     object_type: ObjectType,
1258//     if_exists => [Keyword::IF, Keyword::EXISTS],
1259//     name: ObjectName,
1260//     drop_mode: AstOption<DropMode>,
1261// });
1262impl ParseTo for DropStatement {
1263    fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
1264        impl_parse_to!(object_type: ObjectType, p);
1265        impl_parse_to!(if_exists => [Keyword::IF, Keyword::EXISTS], p);
1266        let object_name = p.parse_object_name()?;
1267        impl_parse_to!(drop_mode: AstOption<DropMode>, p);
1268        Ok(Self {
1269            object_type,
1270            if_exists,
1271            object_name,
1272            drop_mode,
1273        })
1274    }
1275}
1276
1277impl fmt::Display for DropStatement {
1278    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1279        let mut v: Vec<String> = vec![];
1280        impl_fmt_display!(object_type, v, self);
1281        impl_fmt_display!(if_exists => [Keyword::IF, Keyword::EXISTS], v, self);
1282        impl_fmt_display!(object_name, v, self);
1283        impl_fmt_display!(drop_mode, v, self);
1284        v.iter().join(" ").fmt(f)
1285    }
1286}
1287
1288#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1289pub enum DropMode {
1290    Cascade,
1291    Restrict,
1292}
1293
1294impl ParseTo for DropMode {
1295    fn parse_to(parser: &mut Parser<'_>) -> ModalResult<Self> {
1296        let drop_mode = if parser.parse_keyword(Keyword::CASCADE) {
1297            DropMode::Cascade
1298        } else if parser.parse_keyword(Keyword::RESTRICT) {
1299            DropMode::Restrict
1300        } else {
1301            return parser.expected("CASCADE | RESTRICT");
1302        };
1303        Ok(drop_mode)
1304    }
1305}
1306
1307impl fmt::Display for DropMode {
1308    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1309        f.write_str(match self {
1310            DropMode::Cascade => "CASCADE",
1311            DropMode::Restrict => "RESTRICT",
1312        })
1313    }
1314}