risingwave_sqlparser/ast/
statement.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt;
16use std::fmt::{Formatter, Write};
17
18use itertools::Itertools;
19use winnow::ModalResult;
20
21use super::ddl::SourceWatermark;
22use super::legacy_source::{CompatibleFormatEncode, parse_format_encode};
23use super::{EmitMode, Ident, ObjectType, Query, Value};
24use crate::ast::{
25    ColumnDef, ObjectName, REDACT_SQL_OPTION_KEYWORDS, SqlOption, TableConstraint,
26    display_comma_separated, display_separated,
27};
28use crate::keywords::Keyword;
29use crate::parser::{IncludeOption, IsOptional, Parser};
30use crate::parser_err;
31use crate::parser_v2::literal_u32;
32use crate::tokenizer::Token;
33
34/// Consumes token from the parser into an AST node.
35pub trait ParseTo: Sized {
36    fn parse_to(parser: &mut Parser<'_>) -> ModalResult<Self>;
37}
38
39#[macro_export]
40macro_rules! impl_parse_to {
41    () => {};
42    ($field:ident : $field_type:ty, $parser:ident) => {
43        let $field = <$field_type>::parse_to($parser)?;
44    };
45    ($field:ident => [$($arr:tt)+], $parser:ident) => {
46        let $field = $parser.parse_keywords(&[$($arr)+]);
47    };
48    ([$($arr:tt)+], $parser:ident) => {
49        $parser.expect_keywords(&[$($arr)+])?;
50    };
51}
52
53#[macro_export]
54macro_rules! impl_fmt_display {
55    () => {};
56    ($field:ident, $v:ident, $self:ident) => {{
57        let s = format!("{}", $self.$field);
58        if !s.is_empty() {
59            $v.push(s);
60        }
61    }};
62    ($field:ident => [$($arr:tt)+], $v:ident, $self:ident) => {
63        if $self.$field {
64            $v.push(format!("{}", display_separated(&[$($arr)+], " ")));
65        }
66    };
67    ([$($arr:tt)+], $v:ident) => {
68        $v.push(format!("{}", display_separated(&[$($arr)+], " ")));
69    };
70}
71
72// sql_grammar!(CreateSourceStatement {
73//     if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS],
74//     source_name: Ident,
75//     with_properties: AstOption<WithProperties>,
76//     [Keyword::ROW, Keyword::FORMAT],
77//     format_encode: SourceSchema,
78//     [Keyword::WATERMARK, Keyword::FOR] column [Keyword::AS] <expr>
79// });
80#[derive(Debug, Clone, PartialEq, Eq, Hash)]
81pub struct CreateSourceStatement {
82    pub temporary: bool,
83    pub if_not_exists: bool,
84    pub columns: Vec<ColumnDef>,
85    // The wildchar position in columns defined in sql. Only exist when using external schema.
86    pub wildcard_idx: Option<usize>,
87    pub constraints: Vec<TableConstraint>,
88    pub source_name: ObjectName,
89    pub with_properties: WithProperties,
90    pub format_encode: CompatibleFormatEncode,
91    pub source_watermarks: Vec<SourceWatermark>,
92    pub include_column_options: IncludeOption,
93}
94
95/// FORMAT means how to get the operation(Insert/Delete) from the input.
96///
97/// Check `CONNECTORS_COMPATIBLE_FORMATS` for what `FORMAT ... ENCODE ...` combinations are allowed.
98#[derive(Debug, Clone, PartialEq, Eq, Hash)]
99pub enum Format {
100    /// The format is the same with RisingWave's internal representation.
101    /// Used internally for schema change
102    Native,
103    /// for self-explanatory sources like iceberg, they have their own format, and should not be specified by user.
104    None,
105    // Keyword::DEBEZIUM
106    Debezium,
107    // Keyword::DEBEZIUM_MONGO
108    DebeziumMongo,
109    // Keyword::MAXWELL
110    Maxwell,
111    // Keyword::CANAL
112    Canal,
113    // Keyword::UPSERT
114    Upsert,
115    // Keyword::PLAIN
116    Plain,
117}
118
119// TODO: unify with `from_keyword`
120impl fmt::Display for Format {
121    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
122        write!(
123            f,
124            "{}",
125            match self {
126                Format::Native => "NATIVE",
127                Format::Debezium => "DEBEZIUM",
128                Format::DebeziumMongo => "DEBEZIUM_MONGO",
129                Format::Maxwell => "MAXWELL",
130                Format::Canal => "CANAL",
131                Format::Upsert => "UPSERT",
132                Format::Plain => "PLAIN",
133                Format::None => "NONE",
134            }
135        )
136    }
137}
138
139impl Format {
140    pub fn from_keyword(s: &str) -> ModalResult<Self> {
141        Ok(match s {
142            "DEBEZIUM" => Format::Debezium,
143            "DEBEZIUM_MONGO" => Format::DebeziumMongo,
144            "MAXWELL" => Format::Maxwell,
145            "CANAL" => Format::Canal,
146            "PLAIN" => Format::Plain,
147            "UPSERT" => Format::Upsert,
148            "NATIVE" => Format::Native,
149            "NONE" => Format::None,
150            _ => parser_err!("expected PROTOBUF | DEBEZIUM | PLAIN | NATIVE | NONE after FORMAT"),
151        })
152    }
153}
154
155/// Check `CONNECTORS_COMPATIBLE_FORMATS` for what `FORMAT ... ENCODE ...` combinations are allowed.
156#[derive(Debug, Clone, PartialEq, Eq, Hash)]
157pub enum Encode {
158    Avro,     // Keyword::Avro
159    Csv,      // Keyword::CSV
160    Protobuf, // Keyword::PROTOBUF
161    Json,     // Keyword::JSON
162    Bytes,    // Keyword::BYTES
163    /// for self-explanatory sources like iceberg, they have their own format, and should not be specified by user.
164    None,
165    Text, // Keyword::TEXT
166    /// The encode is the same with RisingWave's internal representation.
167    /// Used internally for schema change
168    Native,
169    Template,
170    Parquet,
171}
172
173// TODO: unify with `from_keyword`
174impl fmt::Display for Encode {
175    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
176        write!(
177            f,
178            "{}",
179            match self {
180                Encode::Avro => "AVRO",
181                Encode::Csv => "CSV",
182                Encode::Protobuf => "PROTOBUF",
183                Encode::Json => "JSON",
184                Encode::Bytes => "BYTES",
185                Encode::Native => "NATIVE",
186                Encode::Template => "TEMPLATE",
187                Encode::None => "NONE",
188                Encode::Parquet => "PARQUET",
189                Encode::Text => "TEXT",
190            }
191        )
192    }
193}
194
195impl Encode {
196    pub fn from_keyword(s: &str) -> ModalResult<Self> {
197        Ok(match s {
198            "AVRO" => Encode::Avro,
199            "TEXT" => Encode::Text,
200            "BYTES" => Encode::Bytes,
201            "CSV" => Encode::Csv,
202            "PROTOBUF" => Encode::Protobuf,
203            "JSON" => Encode::Json,
204            "TEMPLATE" => Encode::Template,
205            "PARQUET" => Encode::Parquet,
206            "NATIVE" => Encode::Native,
207            "NONE" => Encode::None,
208            _ => parser_err!(
209                "expected AVRO | BYTES | CSV | PROTOBUF | JSON | NATIVE | TEMPLATE | PARQUET | NONE after Encode"
210            ),
211        })
212    }
213}
214
215/// `FORMAT ... ENCODE ... [(a=b, ...)] [KEY ENCODE ...]`
216#[derive(Debug, Clone, PartialEq, Eq, Hash)]
217pub struct FormatEncodeOptions {
218    pub format: Format,
219    pub row_encode: Encode,
220    pub row_options: Vec<SqlOption>,
221
222    pub key_encode: Option<Encode>,
223}
224
225impl Parser<'_> {
226    /// Peek the next tokens to see if it is `FORMAT` or `ROW FORMAT` (for compatibility).
227    fn peek_format_encode_format(&mut self) -> bool {
228        (self.peek_nth_any_of_keywords(0, &[Keyword::ROW])
229            && self.peek_nth_any_of_keywords(1, &[Keyword::FORMAT])) // ROW FORMAT
230            || self.peek_nth_any_of_keywords(0, &[Keyword::FORMAT]) // FORMAT
231    }
232
233    /// Parse the source schema. The behavior depends on the `connector` type.
234    pub fn parse_format_encode_with_connector(
235        &mut self,
236        connector: &str,
237        cdc_source_job: bool,
238    ) -> ModalResult<CompatibleFormatEncode> {
239        // row format for cdc source must be debezium json
240        // row format for nexmark source must be native
241        // default row format for datagen source is native
242        // FIXME: parse input `connector` to enum type instead using string here
243        if connector.contains("-cdc") {
244            let expected = if cdc_source_job {
245                FormatEncodeOptions::plain_json()
246            } else if connector.contains("mongodb") {
247                FormatEncodeOptions::debezium_mongo_json()
248            } else {
249                FormatEncodeOptions::debezium_json()
250            };
251
252            if self.peek_format_encode_format() {
253                let schema = parse_format_encode(self)?.into_v2();
254                if schema != expected {
255                    parser_err!(
256                        "Row format for CDC connectors should be \
257                         either omitted or set to `{expected}`",
258                    );
259                }
260            }
261            Ok(expected.into())
262        } else if connector.contains("nexmark") {
263            let expected = FormatEncodeOptions::native();
264            if self.peek_format_encode_format() {
265                let schema = parse_format_encode(self)?.into_v2();
266                if schema != expected {
267                    parser_err!(
268                        "Row format for nexmark connectors should be \
269                         either omitted or set to `{expected}`",
270                    );
271                }
272            }
273            Ok(expected.into())
274        } else if connector.contains("datagen") {
275            Ok(if self.peek_format_encode_format() {
276                parse_format_encode(self)?
277            } else {
278                FormatEncodeOptions::native().into()
279            })
280        } else if connector.contains("iceberg") || connector.contains("adbc_snowflake") {
281            let expected = FormatEncodeOptions::none();
282            if self.peek_format_encode_format() {
283                let schema = parse_format_encode(self)?.into_v2();
284                if schema != expected {
285                    parser_err!(
286                        "Row format for iceberg connectors should be \
287                         either omitted or set to `{expected}`",
288                    );
289                }
290            }
291            Ok(expected.into())
292        } else if connector.contains("webhook") {
293            parser_err!(
294                "Source with webhook connector is not supported. \
295                 Please use the `CREATE TABLE ... WITH ...` statement instead.",
296            );
297        } else {
298            Ok(parse_format_encode(self)?)
299        }
300    }
301
302    /// Parse `FORMAT ... ENCODE ... (...)`.
303    pub fn parse_schema(&mut self) -> ModalResult<Option<FormatEncodeOptions>> {
304        if !self.parse_keyword(Keyword::FORMAT) {
305            return Ok(None);
306        }
307
308        let id = self.parse_identifier()?;
309        let s = id.value.to_ascii_uppercase();
310        let format = Format::from_keyword(&s)?;
311        self.expect_keyword(Keyword::ENCODE)?;
312        let id = self.parse_identifier()?;
313        let s = id.value.to_ascii_uppercase();
314        let row_encode = Encode::from_keyword(&s)?;
315        let row_options = self.parse_options()?;
316
317        let key_encode = if self.parse_keywords(&[Keyword::KEY, Keyword::ENCODE]) {
318            Some(Encode::from_keyword(
319                self.parse_identifier()?.value.to_ascii_uppercase().as_str(),
320            )?)
321        } else {
322            None
323        };
324
325        Ok(Some(FormatEncodeOptions {
326            format,
327            row_encode,
328            row_options,
329            key_encode,
330        }))
331    }
332}
333
334impl FormatEncodeOptions {
335    pub const fn plain_json() -> Self {
336        FormatEncodeOptions {
337            format: Format::Plain,
338            row_encode: Encode::Json,
339            row_options: Vec::new(),
340            key_encode: None,
341        }
342    }
343
344    /// Create a new source schema with `Debezium` format and `Json` encoding.
345    pub const fn debezium_json() -> Self {
346        FormatEncodeOptions {
347            format: Format::Debezium,
348            row_encode: Encode::Json,
349            row_options: Vec::new(),
350            key_encode: None,
351        }
352    }
353
354    pub const fn debezium_mongo_json() -> Self {
355        FormatEncodeOptions {
356            format: Format::DebeziumMongo,
357            row_encode: Encode::Json,
358            row_options: Vec::new(),
359            key_encode: None,
360        }
361    }
362
363    /// Create a new source schema with `Native` format and encoding.
364    pub const fn native() -> Self {
365        FormatEncodeOptions {
366            format: Format::Native,
367            row_encode: Encode::Native,
368            row_options: Vec::new(),
369            key_encode: None,
370        }
371    }
372
373    /// Create a new source schema with `None` format and encoding.
374    /// Used for self-explanatory source like iceberg.
375    pub const fn none() -> Self {
376        FormatEncodeOptions {
377            format: Format::None,
378            row_encode: Encode::None,
379            row_options: Vec::new(),
380            key_encode: None,
381        }
382    }
383
384    pub fn row_options(&self) -> &[SqlOption] {
385        self.row_options.as_ref()
386    }
387}
388
389impl fmt::Display for FormatEncodeOptions {
390    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
391        write!(f, "FORMAT {} ENCODE {}", self.format, self.row_encode)?;
392
393        if !self.row_options().is_empty() {
394            write!(f, " ({})", display_comma_separated(self.row_options()))?;
395        }
396
397        if let Some(key_encode) = &self.key_encode {
398            write!(f, " KEY ENCODE {}", key_encode)?;
399        }
400
401        Ok(())
402    }
403}
404
405pub(super) fn fmt_create_items(
406    columns: &[ColumnDef],
407    constraints: &[TableConstraint],
408    watermarks: &[SourceWatermark],
409    wildcard_idx: Option<usize>,
410) -> std::result::Result<String, fmt::Error> {
411    let mut items = String::new();
412    let has_items = !columns.is_empty()
413        || !constraints.is_empty()
414        || !watermarks.is_empty()
415        || wildcard_idx.is_some();
416    has_items.then(|| write!(&mut items, "("));
417
418    if let Some(wildcard_idx) = wildcard_idx {
419        let (columns_l, columns_r) = columns.split_at(wildcard_idx);
420        write!(&mut items, "{}", display_comma_separated(columns_l))?;
421        if !columns_l.is_empty() {
422            write!(&mut items, ", ")?;
423        }
424        write!(&mut items, "{}", Token::Mul)?;
425        if !columns_r.is_empty() {
426            write!(&mut items, ", ")?;
427        }
428        write!(&mut items, "{}", display_comma_separated(columns_r))?;
429    } else {
430        write!(&mut items, "{}", display_comma_separated(columns))?;
431    }
432    let mut leading_items = !columns.is_empty() || wildcard_idx.is_some();
433
434    if leading_items && !constraints.is_empty() {
435        write!(&mut items, ", ")?;
436    }
437    write!(&mut items, "{}", display_comma_separated(constraints))?;
438    leading_items |= !constraints.is_empty();
439
440    if leading_items && !watermarks.is_empty() {
441        write!(&mut items, ", ")?;
442    }
443    write!(&mut items, "{}", display_comma_separated(watermarks))?;
444    // uncomment this when adding more sections below
445    // leading_items |= !watermarks.is_empty();
446
447    has_items.then(|| write!(&mut items, ")"));
448    Ok(items)
449}
450
451impl fmt::Display for CreateSourceStatement {
452    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
453        let mut v: Vec<String> = vec![];
454        impl_fmt_display!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], v, self);
455        impl_fmt_display!(source_name, v, self);
456
457        let items = fmt_create_items(
458            &self.columns,
459            &self.constraints,
460            &self.source_watermarks,
461            self.wildcard_idx,
462        )?;
463        if !items.is_empty() {
464            v.push(items);
465        }
466
467        for item in &self.include_column_options {
468            v.push(format!("{}", item));
469        }
470
471        // skip format_encode for cdc source
472        let is_cdc_source = self.with_properties.0.iter().any(|option| {
473            option.name.real_value().eq_ignore_ascii_case("connector")
474                && option.value.to_string().contains("cdc")
475        });
476
477        impl_fmt_display!(with_properties, v, self);
478        if !is_cdc_source {
479            impl_fmt_display!(format_encode, v, self);
480        }
481        v.iter().join(" ").fmt(f)
482    }
483}
484
485#[derive(Debug, Clone, PartialEq, Eq, Hash)]
486pub enum CreateSink {
487    From(ObjectName),
488    AsQuery(Box<Query>),
489}
490
491impl fmt::Display for CreateSink {
492    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
493        match self {
494            Self::From(mv) => write!(f, "FROM {}", mv),
495            Self::AsQuery(query) => write!(f, "AS {}", query),
496        }
497    }
498}
499// sql_grammar!(CreateSinkStatement {
500//     if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS],
501//     sink_name: Ident,
502//     [Keyword::FROM],
503//     materialized_view: Ident,
504//     with_properties: AstOption<WithProperties>,
505// });
506#[derive(Debug, Clone, PartialEq, Eq, Hash)]
507pub struct CreateSinkStatement {
508    pub if_not_exists: bool,
509    pub sink_name: ObjectName,
510    pub with_properties: WithProperties,
511    pub sink_from: CreateSink,
512
513    // only used when creating sink into a table
514    // insert to specific columns of the target table
515    pub columns: Vec<Ident>,
516    pub emit_mode: Option<EmitMode>,
517    pub sink_schema: Option<FormatEncodeOptions>,
518    pub into_table_name: Option<ObjectName>,
519}
520
521impl ParseTo for CreateSinkStatement {
522    fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
523        impl_parse_to!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], p);
524        impl_parse_to!(sink_name: ObjectName, p);
525
526        let mut target_spec_columns = Vec::new();
527        let into_table_name = if p.parse_keyword(Keyword::INTO) {
528            impl_parse_to!(into_table_name: ObjectName, p);
529
530            // we only allow specify columns when creating sink into a table
531            target_spec_columns = p.parse_parenthesized_column_list(IsOptional::Optional)?;
532            Some(into_table_name)
533        } else {
534            None
535        };
536
537        let sink_from = if p.parse_keyword(Keyword::FROM) {
538            impl_parse_to!(from_name: ObjectName, p);
539            CreateSink::From(from_name)
540        } else if p.parse_keyword(Keyword::AS) {
541            let query = Box::new(p.parse_query()?);
542            CreateSink::AsQuery(query)
543        } else {
544            p.expected("FROM or AS after CREATE SINK sink_name")?
545        };
546
547        let emit_mode: Option<EmitMode> = p.parse_emit_mode()?;
548
549        // This check cannot be put into the `WithProperties::parse_to`, since other
550        // statements may not need the with properties.
551        if !p.peek_nth_any_of_keywords(0, &[Keyword::WITH]) && into_table_name.is_none() {
552            p.expected("WITH")?
553        }
554        impl_parse_to!(with_properties: WithProperties, p);
555
556        if with_properties.0.is_empty() && into_table_name.is_none() {
557            parser_err!("sink properties not provided");
558        }
559
560        let sink_schema = p.parse_schema()?;
561
562        Ok(Self {
563            if_not_exists,
564            sink_name,
565            with_properties,
566            sink_from,
567            columns: target_spec_columns,
568            emit_mode,
569            sink_schema,
570            into_table_name,
571        })
572    }
573}
574
575impl fmt::Display for CreateSinkStatement {
576    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
577        let mut v: Vec<String> = vec![];
578        impl_fmt_display!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], v, self);
579        impl_fmt_display!(sink_name, v, self);
580        if let Some(into_table) = &self.into_table_name {
581            impl_fmt_display!([Keyword::INTO], v);
582            impl_fmt_display!([into_table], v);
583            if !self.columns.is_empty() {
584                v.push(format!("({})", display_comma_separated(&self.columns)));
585            }
586        }
587        impl_fmt_display!(sink_from, v, self);
588        if let Some(ref emit_mode) = self.emit_mode {
589            v.push(format!("EMIT {}", emit_mode));
590        }
591        impl_fmt_display!(with_properties, v, self);
592        if let Some(schema) = &self.sink_schema {
593            v.push(format!("{}", schema));
594        }
595        v.iter().join(" ").fmt(f)
596    }
597}
598
599// sql_grammar!(CreateSubscriptionStatement {
600//     if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS],
601//     subscription_name: Ident,
602//     [Keyword::FROM],
603//     materialized_view: Ident,
604//     with_properties: AstOption<WithProperties>,
605// });
606#[derive(Debug, Clone, PartialEq, Eq, Hash)]
607pub struct CreateSubscriptionStatement {
608    pub if_not_exists: bool,
609    pub subscription_name: ObjectName,
610    pub with_properties: WithProperties,
611    pub subscription_from: ObjectName,
612    // pub emit_mode: Option<EmitMode>,
613}
614
615impl ParseTo for CreateSubscriptionStatement {
616    fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
617        impl_parse_to!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], p);
618        impl_parse_to!(subscription_name: ObjectName, p);
619
620        let subscription_from = if p.parse_keyword(Keyword::FROM) {
621            impl_parse_to!(from_name: ObjectName, p);
622            from_name
623        } else {
624            p.expected("FROM after CREATE SUBSCRIPTION subscription_name")?
625        };
626
627        // let emit_mode = p.parse_emit_mode()?;
628
629        // This check cannot be put into the `WithProperties::parse_to`, since other
630        // statements may not need the with properties.
631        if !p.peek_nth_any_of_keywords(0, &[Keyword::WITH]) {
632            p.expected("WITH")?
633        }
634        impl_parse_to!(with_properties: WithProperties, p);
635
636        if with_properties.0.is_empty() {
637            parser_err!("subscription properties not provided");
638        }
639
640        Ok(Self {
641            if_not_exists,
642            subscription_name,
643            with_properties,
644            subscription_from,
645        })
646    }
647}
648
649impl fmt::Display for CreateSubscriptionStatement {
650    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
651        let mut v: Vec<String> = vec![];
652        impl_fmt_display!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], v, self);
653        impl_fmt_display!(subscription_name, v, self);
654        v.push(format!("FROM {}", self.subscription_from));
655        impl_fmt_display!(with_properties, v, self);
656        v.iter().join(" ").fmt(f)
657    }
658}
659
660#[derive(Debug, Clone, PartialEq, Eq, Hash)]
661pub enum DeclareCursor {
662    Query(Box<Query>),
663    Subscription(ObjectName, Since),
664}
665
666impl fmt::Display for DeclareCursor {
667    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
668        let mut v: Vec<String> = vec![];
669        match self {
670            DeclareCursor::Query(query) => v.push(format!("{}", query.as_ref())),
671            DeclareCursor::Subscription(name, since) => {
672                v.push(format!("{}", name));
673                v.push(format!("{:?}", since));
674            }
675        }
676        v.iter().join(" ").fmt(f)
677    }
678}
679
680// sql_grammar!(DeclareCursorStatement {
681//     cursor_name: Ident,
682//     [Keyword::SUBSCRIPTION]
683//     [Keyword::CURSOR],
684//     [Keyword::FOR],
685//     subscription: Ident or query: Query,
686//     [Keyword::SINCE],
687//     rw_timestamp: Ident,
688// });
689#[derive(Debug, Clone, PartialEq, Eq, Hash)]
690pub struct DeclareCursorStatement {
691    pub cursor_name: Ident,
692    pub declare_cursor: DeclareCursor,
693}
694
695impl ParseTo for DeclareCursorStatement {
696    fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
697        let cursor_name = p.parse_identifier_non_reserved()?;
698
699        let declare_cursor = if !p.parse_keyword(Keyword::SUBSCRIPTION) {
700            p.expect_keyword(Keyword::CURSOR)?;
701            p.expect_keyword(Keyword::FOR)?;
702            DeclareCursor::Query(Box::new(p.parse_query()?))
703        } else {
704            p.expect_keyword(Keyword::CURSOR)?;
705            p.expect_keyword(Keyword::FOR)?;
706            let cursor_for_name = p.parse_object_name()?;
707            let rw_timestamp = p.parse_since()?;
708            DeclareCursor::Subscription(cursor_for_name, rw_timestamp)
709        };
710
711        Ok(Self {
712            cursor_name,
713            declare_cursor,
714        })
715    }
716}
717
718impl fmt::Display for DeclareCursorStatement {
719    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
720        let mut v: Vec<String> = vec![];
721        impl_fmt_display!(cursor_name, v, self);
722        match &self.declare_cursor {
723            DeclareCursor::Query(_) => {
724                v.push("CURSOR FOR ".to_owned());
725            }
726            DeclareCursor::Subscription { .. } => {
727                v.push("SUBSCRIPTION CURSOR FOR ".to_owned());
728            }
729        }
730        impl_fmt_display!(declare_cursor, v, self);
731        v.iter().join(" ").fmt(f)
732    }
733}
734
735// sql_grammar!(FetchCursorStatement {
736//     cursor_name: Ident,
737// });
738#[derive(Debug, Clone, PartialEq, Eq, Hash)]
739pub struct FetchCursorStatement {
740    pub cursor_name: Ident,
741    pub count: u32,
742    pub with_properties: WithProperties,
743}
744
745impl ParseTo for FetchCursorStatement {
746    fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
747        let count = if p.parse_keyword(Keyword::NEXT) {
748            1
749        } else {
750            literal_u32(p)?
751        };
752        p.expect_keyword(Keyword::FROM)?;
753        let cursor_name = p.parse_identifier_non_reserved()?;
754        impl_parse_to!(with_properties: WithProperties, p);
755
756        Ok(Self {
757            cursor_name,
758            count,
759            with_properties,
760        })
761    }
762}
763
764impl fmt::Display for FetchCursorStatement {
765    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
766        let mut v: Vec<String> = vec![];
767        if self.count == 1 {
768            v.push("NEXT ".to_owned());
769        } else {
770            impl_fmt_display!(count, v, self);
771        }
772        v.push("FROM ".to_owned());
773        impl_fmt_display!(cursor_name, v, self);
774        v.iter().join(" ").fmt(f)
775    }
776}
777
778// sql_grammar!(CloseCursorStatement {
779//     cursor_name: Ident,
780// });
781#[derive(Debug, Clone, PartialEq, Eq, Hash)]
782pub struct CloseCursorStatement {
783    pub cursor_name: Option<Ident>,
784}
785
786impl ParseTo for CloseCursorStatement {
787    fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
788        let cursor_name = if p.parse_keyword(Keyword::ALL) {
789            None
790        } else {
791            Some(p.parse_identifier_non_reserved()?)
792        };
793
794        Ok(Self { cursor_name })
795    }
796}
797
798impl fmt::Display for CloseCursorStatement {
799    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
800        let mut v: Vec<String> = vec![];
801        if let Some(cursor_name) = &self.cursor_name {
802            v.push(format!("{}", cursor_name));
803        } else {
804            v.push("ALL".to_owned());
805        }
806        v.iter().join(" ").fmt(f)
807    }
808}
809
810// sql_grammar!(CreateConnectionStatement {
811//     if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS],
812//     connection_name: Ident,
813//     with_properties: AstOption<WithProperties>,
814// });
815#[derive(Debug, Clone, PartialEq, Eq, Hash)]
816pub struct CreateConnectionStatement {
817    pub if_not_exists: bool,
818    pub connection_name: ObjectName,
819    pub with_properties: WithProperties,
820}
821
822impl ParseTo for CreateConnectionStatement {
823    fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
824        impl_parse_to!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], p);
825        impl_parse_to!(connection_name: ObjectName, p);
826        impl_parse_to!(with_properties: WithProperties, p);
827        if with_properties.0.is_empty() {
828            parser_err!("connection properties not provided");
829        }
830
831        Ok(Self {
832            if_not_exists,
833            connection_name,
834            with_properties,
835        })
836    }
837}
838
839impl fmt::Display for CreateConnectionStatement {
840    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
841        let mut v: Vec<String> = vec![];
842        impl_fmt_display!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], v, self);
843        impl_fmt_display!(connection_name, v, self);
844        impl_fmt_display!(with_properties, v, self);
845        v.iter().join(" ").fmt(f)
846    }
847}
848
849#[derive(Debug, Clone, PartialEq, Eq, Hash)]
850pub struct CreateSecretStatement {
851    pub if_not_exists: bool,
852    pub secret_name: ObjectName,
853    pub credential: Value,
854    pub with_properties: WithProperties,
855}
856
857impl ParseTo for CreateSecretStatement {
858    fn parse_to(parser: &mut Parser<'_>) -> ModalResult<Self> {
859        impl_parse_to!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], parser);
860        impl_parse_to!(secret_name: ObjectName, parser);
861        impl_parse_to!(with_properties: WithProperties, parser);
862        let mut credential = Value::Null;
863        if parser.parse_keyword(Keyword::AS) {
864            credential = parser.ensure_parse_value()?;
865        }
866        Ok(Self {
867            if_not_exists,
868            secret_name,
869            credential,
870            with_properties,
871        })
872    }
873}
874
875impl fmt::Display for CreateSecretStatement {
876    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
877        let mut v: Vec<String> = vec![];
878        impl_fmt_display!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], v, self);
879        impl_fmt_display!(secret_name, v, self);
880        impl_fmt_display!(with_properties, v, self);
881        if self.credential != Value::Null {
882            v.push("AS".to_owned());
883            impl_fmt_display!(credential, v, self);
884        }
885        v.iter().join(" ").fmt(f)
886    }
887}
888
889#[derive(Debug, Clone, PartialEq, Eq, Hash)]
890pub struct WithProperties(pub Vec<SqlOption>);
891
892impl ParseTo for WithProperties {
893    fn parse_to(parser: &mut Parser<'_>) -> ModalResult<Self> {
894        Ok(Self(
895            parser.parse_options_with_preceding_keyword(Keyword::WITH)?,
896        ))
897    }
898}
899
900impl fmt::Display for WithProperties {
901    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
902        if !self.0.is_empty() {
903            write!(f, "WITH ({})", display_comma_separated(self.0.as_slice()))
904        } else {
905            Ok(())
906        }
907    }
908}
909
910#[derive(Debug, Clone, PartialEq, Eq, Hash)]
911pub enum Since {
912    TimestampMsNum(u64),
913    ProcessTime,
914    Begin,
915    Full,
916}
917
918impl fmt::Display for Since {
919    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
920        use Since::*;
921        match self {
922            TimestampMsNum(ts) => write!(f, " SINCE {}", ts),
923            ProcessTime => write!(f, " SINCE PROCTIME()"),
924            Begin => write!(f, " SINCE BEGIN()"),
925            Full => write!(f, " FULL"),
926        }
927    }
928}
929
930#[derive(Debug, Clone, PartialEq, Eq, Hash)]
931pub struct RowSchemaLocation {
932    pub value: AstString,
933}
934
935impl ParseTo for RowSchemaLocation {
936    fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
937        impl_parse_to!([Keyword::ROW, Keyword::SCHEMA, Keyword::LOCATION], p);
938        impl_parse_to!(value: AstString, p);
939        Ok(Self { value })
940    }
941}
942
943impl fmt::Display for RowSchemaLocation {
944    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
945        let mut v = vec![];
946        impl_fmt_display!([Keyword::ROW, Keyword::SCHEMA, Keyword::LOCATION], v);
947        impl_fmt_display!(value, v, self);
948        v.iter().join(" ").fmt(f)
949    }
950}
951
952/// String literal. The difference with String is that it is displayed with
953/// single-quotes.
954#[derive(Debug, Clone, PartialEq, Eq, Hash)]
955pub struct AstString(pub String);
956
957impl ParseTo for AstString {
958    fn parse_to(parser: &mut Parser<'_>) -> ModalResult<Self> {
959        Ok(Self(parser.parse_literal_string()?))
960    }
961}
962
963impl fmt::Display for AstString {
964    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
965        write!(f, "'{}'", self.0)
966    }
967}
968
969/// This trait is used to replace `Option` because `fmt::Display` can not be implemented for
970/// `Option<T>`.
971#[derive(Debug, Clone, PartialEq, Eq, Hash)]
972pub enum AstOption<T> {
973    /// No value
974    None,
975    /// Some value `T`
976    Some(T),
977}
978
979impl<T: ParseTo> ParseTo for AstOption<T> {
980    fn parse_to(parser: &mut Parser<'_>) -> ModalResult<Self> {
981        match T::parse_to(parser) {
982            Ok(t) => Ok(AstOption::Some(t)),
983            Err(_) => Ok(AstOption::None),
984        }
985    }
986}
987
988impl<T: fmt::Display> fmt::Display for AstOption<T> {
989    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
990        match &self {
991            AstOption::Some(t) => t.fmt(f),
992            AstOption::None => Ok(()),
993        }
994    }
995}
996
997impl<T> From<AstOption<T>> for Option<T> {
998    fn from(val: AstOption<T>) -> Self {
999        match val {
1000            AstOption::Some(t) => Some(t),
1001            AstOption::None => None,
1002        }
1003    }
1004}
1005
1006#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1007pub struct CreateUserStatement {
1008    pub user_name: ObjectName,
1009    pub with_options: UserOptions,
1010}
1011
1012#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1013pub struct AlterUserStatement {
1014    pub user_name: ObjectName,
1015    pub mode: AlterUserMode,
1016}
1017
1018#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1019pub enum AlterUserMode {
1020    Options(UserOptions),
1021    Rename(ObjectName),
1022}
1023
1024#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1025pub enum UserOption {
1026    SuperUser,
1027    NoSuperUser,
1028    CreateDB,
1029    NoCreateDB,
1030    CreateUser,
1031    NoCreateUser,
1032    Login,
1033    NoLogin,
1034    Admin,
1035    NoAdmin,
1036    EncryptedPassword(AstString),
1037    Password(Option<AstString>),
1038    OAuth(Vec<SqlOption>),
1039}
1040
1041impl fmt::Display for UserOption {
1042    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1043        match self {
1044            UserOption::SuperUser => write!(f, "SUPERUSER"),
1045            UserOption::NoSuperUser => write!(f, "NOSUPERUSER"),
1046            UserOption::CreateDB => write!(f, "CREATEDB"),
1047            UserOption::NoCreateDB => write!(f, "NOCREATEDB"),
1048            UserOption::CreateUser => write!(f, "CREATEUSER"),
1049            UserOption::NoCreateUser => write!(f, "NOCREATEUSER"),
1050            UserOption::Login => write!(f, "LOGIN"),
1051            UserOption::NoLogin => write!(f, "NOLOGIN"),
1052            UserOption::Admin => write!(f, "ADMIN"),
1053            UserOption::NoAdmin => write!(f, "NOADMIN"),
1054            UserOption::EncryptedPassword(p) => {
1055                if should_redact_user_password() {
1056                    write!(f, "ENCRYPTED PASSWORD [REDACTED]")
1057                } else {
1058                    write!(f, "ENCRYPTED PASSWORD {}", p)
1059                }
1060            }
1061            UserOption::Password(None) => write!(f, "PASSWORD NULL"),
1062            UserOption::Password(Some(p)) => {
1063                if should_redact_user_password() {
1064                    write!(f, "PASSWORD [REDACTED]")
1065                } else {
1066                    write!(f, "PASSWORD {}", p)
1067                }
1068            }
1069            UserOption::OAuth(options) => {
1070                write!(f, "({})", display_comma_separated(options.as_slice()))
1071            }
1072        }
1073    }
1074}
1075
1076#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1077pub struct UserOptions(pub Vec<UserOption>, pub bool);
1078
1079fn should_redact_user_password() -> bool {
1080    REDACT_SQL_OPTION_KEYWORDS.try_with(|_| ()).is_ok()
1081}
1082
1083#[derive(Default)]
1084struct UserOptionsBuilder {
1085    with_prefix: bool,
1086    super_user: Option<UserOption>,
1087    create_db: Option<UserOption>,
1088    create_user: Option<UserOption>,
1089    login: Option<UserOption>,
1090    admin: Option<UserOption>,
1091    password: Option<UserOption>,
1092}
1093
1094impl UserOptionsBuilder {
1095    fn build(self) -> UserOptions {
1096        let mut options = vec![];
1097        if let Some(option) = self.super_user {
1098            options.push(option);
1099        }
1100        if let Some(option) = self.create_db {
1101            options.push(option);
1102        }
1103        if let Some(option) = self.create_user {
1104            options.push(option);
1105        }
1106        if let Some(option) = self.login {
1107            options.push(option);
1108        }
1109        if let Some(option) = self.admin {
1110            options.push(option);
1111        }
1112        if let Some(option) = self.password {
1113            options.push(option);
1114        }
1115        UserOptions(options, self.with_prefix)
1116    }
1117}
1118
1119impl ParseTo for UserOptions {
1120    fn parse_to(parser: &mut Parser<'_>) -> ModalResult<Self> {
1121        let mut builder = UserOptionsBuilder {
1122            with_prefix: parser.parse_keyword(Keyword::WITH),
1123            ..Default::default()
1124        };
1125        let add_option = |item: &mut Option<UserOption>, user_option| {
1126            let old_value = item.replace(user_option);
1127            if old_value.is_some() {
1128                parser_err!("conflicting or redundant options");
1129            }
1130            Ok(())
1131        };
1132        loop {
1133            let token = parser.peek_token();
1134            if token == Token::EOF || token == Token::SemiColon {
1135                break;
1136            }
1137
1138            let option = parser.parse_identifier()?;
1139            let s = option.real_value();
1140            let (item_mut_ref, user_option) = match &s[..] {
1141                "superuser" => (&mut builder.super_user, UserOption::SuperUser),
1142                "nosuperuser" => (&mut builder.super_user, UserOption::NoSuperUser),
1143                "createdb" => (&mut builder.create_db, UserOption::CreateDB),
1144                "nocreatedb" => (&mut builder.create_db, UserOption::NoCreateDB),
1145                "createuser" => (&mut builder.create_user, UserOption::CreateUser),
1146                "nocreateuser" => (&mut builder.create_user, UserOption::NoCreateUser),
1147                "login" => (&mut builder.login, UserOption::Login),
1148                "nologin" => (&mut builder.login, UserOption::NoLogin),
1149                "admin" => (&mut builder.admin, UserOption::Admin),
1150                "noadmin" => (&mut builder.admin, UserOption::NoAdmin),
1151                "password" => {
1152                    if parser.parse_keyword(Keyword::NULL) {
1153                        (&mut builder.password, UserOption::Password(None))
1154                    } else {
1155                        (
1156                            &mut builder.password,
1157                            UserOption::Password(Some(AstString::parse_to(parser)?)),
1158                        )
1159                    }
1160                }
1161                "encrypted" => {
1162                    let option = parser.parse_identifier()?;
1163                    let s = option.real_value();
1164                    if s != "password" {
1165                        parser_err!("expected PASSWORD after ENCRYPTED, found {}", option);
1166                    }
1167                    (
1168                        &mut builder.password,
1169                        UserOption::EncryptedPassword(AstString::parse_to(parser)?),
1170                    )
1171                }
1172                "oauth" => {
1173                    let options = parser.parse_options()?;
1174                    (&mut builder.password, UserOption::OAuth(options))
1175                }
1176                _ => {
1177                    parser_err!("unexpected user option: {}", option);
1178                }
1179            };
1180            add_option(item_mut_ref, user_option)?;
1181        }
1182        Ok(builder.build())
1183    }
1184}
1185
1186impl fmt::Display for UserOptions {
1187    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1188        if !self.0.is_empty() {
1189            if self.1 {
1190                write!(f, "WITH ")?;
1191            }
1192            write!(f, "{}", display_separated(self.0.as_slice(), " "))
1193        } else {
1194            Ok(())
1195        }
1196    }
1197}
1198
1199impl ParseTo for CreateUserStatement {
1200    fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
1201        impl_parse_to!(user_name: ObjectName, p);
1202        impl_parse_to!(with_options: UserOptions, p);
1203
1204        Ok(CreateUserStatement {
1205            user_name,
1206            with_options,
1207        })
1208    }
1209}
1210
1211impl fmt::Display for CreateUserStatement {
1212    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1213        let mut v: Vec<String> = vec![];
1214        impl_fmt_display!(user_name, v, self);
1215        impl_fmt_display!(with_options, v, self);
1216        v.iter().join(" ").fmt(f)
1217    }
1218}
1219
1220impl fmt::Display for AlterUserMode {
1221    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1222        match self {
1223            AlterUserMode::Options(options) => {
1224                write!(f, "{}", options)
1225            }
1226            AlterUserMode::Rename(new_name) => {
1227                write!(f, "RENAME TO {}", new_name)
1228            }
1229        }
1230    }
1231}
1232
1233impl fmt::Display for AlterUserStatement {
1234    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1235        let mut v: Vec<String> = vec![];
1236        impl_fmt_display!(user_name, v, self);
1237        impl_fmt_display!(mode, v, self);
1238        v.iter().join(" ").fmt(f)
1239    }
1240}
1241
1242impl ParseTo for AlterUserStatement {
1243    fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
1244        impl_parse_to!(user_name: ObjectName, p);
1245        impl_parse_to!(mode: AlterUserMode, p);
1246
1247        Ok(AlterUserStatement { user_name, mode })
1248    }
1249}
1250
1251impl ParseTo for AlterUserMode {
1252    fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
1253        if p.parse_keyword(Keyword::RENAME) {
1254            p.expect_keyword(Keyword::TO)?;
1255            impl_parse_to!(new_name: ObjectName, p);
1256            Ok(AlterUserMode::Rename(new_name))
1257        } else {
1258            impl_parse_to!(with_options: UserOptions, p);
1259            Ok(AlterUserMode::Options(with_options))
1260        }
1261    }
1262}
1263
1264#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1265pub struct DropStatement {
1266    /// The type of the object to drop: TABLE, VIEW, etc.
1267    pub object_type: ObjectType,
1268    /// An optional `IF EXISTS` clause. (Non-standard.)
1269    pub if_exists: bool,
1270    /// Object to drop.
1271    pub object_name: ObjectName,
1272    /// Whether `CASCADE` was specified. This will be `false` when
1273    /// `RESTRICT` or no drop behavior at all was specified.
1274    pub drop_mode: AstOption<DropMode>,
1275}
1276
1277// sql_grammar!(DropStatement {
1278//     object_type: ObjectType,
1279//     if_exists => [Keyword::IF, Keyword::EXISTS],
1280//     name: ObjectName,
1281//     drop_mode: AstOption<DropMode>,
1282// });
1283impl ParseTo for DropStatement {
1284    fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
1285        impl_parse_to!(object_type: ObjectType, p);
1286        impl_parse_to!(if_exists => [Keyword::IF, Keyword::EXISTS], p);
1287        let object_name = p.parse_object_name()?;
1288        impl_parse_to!(drop_mode: AstOption<DropMode>, p);
1289        Ok(Self {
1290            object_type,
1291            if_exists,
1292            object_name,
1293            drop_mode,
1294        })
1295    }
1296}
1297
1298impl fmt::Display for DropStatement {
1299    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1300        let mut v: Vec<String> = vec![];
1301        impl_fmt_display!(object_type, v, self);
1302        impl_fmt_display!(if_exists => [Keyword::IF, Keyword::EXISTS], v, self);
1303        impl_fmt_display!(object_name, v, self);
1304        impl_fmt_display!(drop_mode, v, self);
1305        v.iter().join(" ").fmt(f)
1306    }
1307}
1308
1309#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1310pub enum DropMode {
1311    Cascade,
1312    Restrict,
1313}
1314
1315impl ParseTo for DropMode {
1316    fn parse_to(parser: &mut Parser<'_>) -> ModalResult<Self> {
1317        let drop_mode = if parser.parse_keyword(Keyword::CASCADE) {
1318            DropMode::Cascade
1319        } else if parser.parse_keyword(Keyword::RESTRICT) {
1320            DropMode::Restrict
1321        } else {
1322            return parser.expected("CASCADE | RESTRICT");
1323        };
1324        Ok(drop_mode)
1325    }
1326}
1327
1328impl fmt::Display for DropMode {
1329    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1330        f.write_str(match self {
1331            DropMode::Cascade => "CASCADE",
1332            DropMode::Restrict => "RESTRICT",
1333        })
1334    }
1335}