Skip to main content

risingwave_sqlparser/ast/
statement.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt;
16use std::fmt::{Formatter, Write};
17
18use itertools::Itertools;
19use winnow::ModalResult;
20
21use super::ddl::SourceWatermark;
22use super::legacy_source::{CompatibleFormatEncode, parse_format_encode};
23use super::{EmitMode, Ident, ObjectType, Query, Value};
24use crate::ast::{
25    ColumnDef, ObjectName, REDACT_SQL_OPTION_KEYWORDS, SqlOption, TableConstraint,
26    display_comma_separated, display_separated,
27};
28use crate::keywords::Keyword;
29use crate::parser::{IncludeOption, IsOptional, Parser};
30use crate::parser_err;
31use crate::parser_v2::literal_u32;
32use crate::tokenizer::Token;
33
34/// Consumes token from the parser into an AST node.
35pub trait ParseTo: Sized {
36    fn parse_to(parser: &mut Parser<'_>) -> ModalResult<Self>;
37}
38
39#[macro_export]
40macro_rules! impl_parse_to {
41    () => {};
42    ($field:ident : $field_type:ty, $parser:ident) => {
43        let $field = <$field_type>::parse_to($parser)?;
44    };
45    ($field:ident => [$($arr:tt)+], $parser:ident) => {
46        let $field = $parser.parse_keywords(&[$($arr)+]);
47    };
48    ([$($arr:tt)+], $parser:ident) => {
49        $parser.expect_keywords(&[$($arr)+])?;
50    };
51}
52
53#[macro_export]
54macro_rules! impl_fmt_display {
55    () => {};
56    ($field:ident, $v:ident, $self:ident) => {{
57        let s = format!("{}", $self.$field);
58        if !s.is_empty() {
59            $v.push(s);
60        }
61    }};
62    ($field:ident => [$($arr:tt)+], $v:ident, $self:ident) => {
63        if $self.$field {
64            $v.push(format!("{}", display_separated(&[$($arr)+], " ")));
65        }
66    };
67    ([$($arr:tt)+], $v:ident) => {
68        $v.push(format!("{}", display_separated(&[$($arr)+], " ")));
69    };
70}
71
72// sql_grammar!(CreateSourceStatement {
73//     if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS],
74//     source_name: Ident,
75//     with_properties: AstOption<WithProperties>,
76//     [Keyword::ROW, Keyword::FORMAT],
77//     format_encode: SourceSchema,
78//     [Keyword::WATERMARK, Keyword::FOR] column [Keyword::AS] <expr>
79// });
80#[derive(Debug, Clone, PartialEq, Eq, Hash)]
81pub struct CreateSourceStatement {
82    pub temporary: bool,
83    pub if_not_exists: bool,
84    pub columns: Vec<ColumnDef>,
85    // The wildchar position in columns defined in sql. Only exist when using external schema.
86    pub wildcard_idx: Option<usize>,
87    pub constraints: Vec<TableConstraint>,
88    pub source_name: ObjectName,
89    pub with_properties: WithProperties,
90    pub format_encode: CompatibleFormatEncode,
91    pub source_watermarks: Vec<SourceWatermark>,
92    pub include_column_options: IncludeOption,
93}
94
95/// FORMAT means how to get the operation(Insert/Delete) from the input.
96///
97/// Check `CONNECTORS_COMPATIBLE_FORMATS` for what `FORMAT ... ENCODE ...` combinations are allowed.
98#[derive(Debug, Clone, PartialEq, Eq, Hash)]
99pub enum Format {
100    /// The format is the same with RisingWave's internal representation.
101    /// Used internally for schema change
102    Native,
103    /// for self-explanatory sources like iceberg, they have their own format, and should not be specified by user.
104    None,
105    // Keyword::DEBEZIUM
106    Debezium,
107    // Keyword::DEBEZIUM_MONGO
108    DebeziumMongo,
109    // Keyword::MAXWELL
110    Maxwell,
111    // Keyword::CANAL
112    Canal,
113    // Keyword::UPSERT
114    Upsert,
115    // Keyword::PLAIN
116    Plain,
117}
118
119// TODO: unify with `from_keyword`
120impl fmt::Display for Format {
121    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
122        write!(
123            f,
124            "{}",
125            match self {
126                Format::Native => "NATIVE",
127                Format::Debezium => "DEBEZIUM",
128                Format::DebeziumMongo => "DEBEZIUM_MONGO",
129                Format::Maxwell => "MAXWELL",
130                Format::Canal => "CANAL",
131                Format::Upsert => "UPSERT",
132                Format::Plain => "PLAIN",
133                Format::None => "NONE",
134            }
135        )
136    }
137}
138
139impl Format {
140    pub fn from_keyword(s: &str) -> ModalResult<Self> {
141        Ok(match s {
142            "DEBEZIUM" => Format::Debezium,
143            "DEBEZIUM_MONGO" => Format::DebeziumMongo,
144            "MAXWELL" => Format::Maxwell,
145            "CANAL" => Format::Canal,
146            "PLAIN" => Format::Plain,
147            "UPSERT" => Format::Upsert,
148            "NATIVE" => Format::Native,
149            "NONE" => Format::None,
150            _ => parser_err!("expected PROTOBUF | DEBEZIUM | PLAIN | NATIVE | NONE after FORMAT"),
151        })
152    }
153}
154
155/// Check `CONNECTORS_COMPATIBLE_FORMATS` for what `FORMAT ... ENCODE ...` combinations are allowed.
156#[derive(Debug, Clone, PartialEq, Eq, Hash)]
157pub enum Encode {
158    Avro,     // Keyword::Avro
159    Csv,      // Keyword::CSV
160    Protobuf, // Keyword::PROTOBUF
161    Json,     // Keyword::JSON
162    Bytes,    // Keyword::BYTES
163    /// for self-explanatory sources like iceberg, they have their own format, and should not be specified by user.
164    None,
165    Text, // Keyword::TEXT
166    /// The encode is the same with RisingWave's internal representation.
167    /// Used internally for schema change
168    Native,
169    Template,
170    Parquet,
171}
172
173// TODO: unify with `from_keyword`
174impl fmt::Display for Encode {
175    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
176        write!(
177            f,
178            "{}",
179            match self {
180                Encode::Avro => "AVRO",
181                Encode::Csv => "CSV",
182                Encode::Protobuf => "PROTOBUF",
183                Encode::Json => "JSON",
184                Encode::Bytes => "BYTES",
185                Encode::Native => "NATIVE",
186                Encode::Template => "TEMPLATE",
187                Encode::None => "NONE",
188                Encode::Parquet => "PARQUET",
189                Encode::Text => "TEXT",
190            }
191        )
192    }
193}
194
195impl Encode {
196    pub fn from_keyword(s: &str) -> ModalResult<Self> {
197        Ok(match s {
198            "AVRO" => Encode::Avro,
199            "TEXT" => Encode::Text,
200            "BYTES" => Encode::Bytes,
201            "CSV" => Encode::Csv,
202            "PROTOBUF" => Encode::Protobuf,
203            "JSON" => Encode::Json,
204            "TEMPLATE" => Encode::Template,
205            "PARQUET" => Encode::Parquet,
206            "NATIVE" => Encode::Native,
207            "NONE" => Encode::None,
208            _ => parser_err!(
209                "expected AVRO | BYTES | CSV | PROTOBUF | JSON | NATIVE | TEMPLATE | PARQUET | NONE after Encode"
210            ),
211        })
212    }
213}
214
215/// `FORMAT ... ENCODE ... [(a=b, ...)] [KEY ENCODE ...]`
216#[derive(Debug, Clone, PartialEq, Eq, Hash)]
217pub struct FormatEncodeOptions {
218    pub format: Format,
219    pub row_encode: Encode,
220    pub row_options: Vec<SqlOption>,
221
222    pub key_encode: Option<Encode>,
223}
224
225impl Parser<'_> {
226    /// Peek the next tokens to see if it is `FORMAT` or `ROW FORMAT` (for compatibility).
227    fn peek_format_encode_format(&mut self) -> bool {
228        (self.peek_nth_any_of_keywords(0, &[Keyword::ROW])
229            && self.peek_nth_any_of_keywords(1, &[Keyword::FORMAT])) // ROW FORMAT
230            || self.peek_nth_any_of_keywords(0, &[Keyword::FORMAT]) // FORMAT
231    }
232
233    /// Parse the source schema. The behavior depends on the `connector` type.
234    pub fn parse_format_encode_with_connector(
235        &mut self,
236        connector: &str,
237        cdc_source_job: bool,
238    ) -> ModalResult<CompatibleFormatEncode> {
239        // row format for cdc source must be debezium json
240        // row format for nexmark source must be native
241        // default row format for datagen source is native
242        // FIXME: parse input `connector` to enum type instead using string here
243        if connector.contains("-cdc") {
244            let expected = if cdc_source_job {
245                FormatEncodeOptions::plain_json()
246            } else if connector.contains("mongodb") {
247                FormatEncodeOptions::debezium_mongo_json()
248            } else {
249                FormatEncodeOptions::debezium_json()
250            };
251
252            if self.peek_format_encode_format() {
253                let schema = parse_format_encode(self)?.into_v2();
254                if schema != expected {
255                    parser_err!(
256                        "Row format for CDC connectors should be \
257                         either omitted or set to `{expected}`",
258                    );
259                }
260            }
261            Ok(expected.into())
262        } else if connector.contains("nexmark") {
263            let expected = FormatEncodeOptions::native();
264            if self.peek_format_encode_format() {
265                let schema = parse_format_encode(self)?.into_v2();
266                if schema != expected {
267                    parser_err!(
268                        "Row format for nexmark connectors should be \
269                         either omitted or set to `{expected}`",
270                    );
271                }
272            }
273            Ok(expected.into())
274        } else if connector.contains("datagen") {
275            Ok(if self.peek_format_encode_format() {
276                parse_format_encode(self)?
277            } else {
278                FormatEncodeOptions::native().into()
279            })
280        } else if connector.contains("iceberg") || connector.contains("adbc_snowflake") {
281            let expected = FormatEncodeOptions::none();
282            if self.peek_format_encode_format() {
283                let schema = parse_format_encode(self)?.into_v2();
284                if schema != expected {
285                    parser_err!(
286                        "Row format for iceberg connectors should be \
287                         either omitted or set to `{expected}`",
288                    );
289                }
290            }
291            Ok(expected.into())
292        } else if connector.contains("webhook") {
293            parser_err!(
294                "Source with webhook connector is not supported. \
295                 Please use the `CREATE TABLE ... WITH ...` statement instead.",
296            );
297        } else {
298            Ok(parse_format_encode(self)?)
299        }
300    }
301
302    /// Parse `FORMAT ... ENCODE ... (...)`.
303    pub fn parse_schema(&mut self) -> ModalResult<Option<FormatEncodeOptions>> {
304        if !self.parse_keyword(Keyword::FORMAT) {
305            return Ok(None);
306        }
307
308        let id = self.parse_identifier()?;
309        let s = id.value.to_ascii_uppercase();
310        let format = Format::from_keyword(&s)?;
311        self.expect_keyword(Keyword::ENCODE)?;
312        let id = self.parse_identifier()?;
313        let s = id.value.to_ascii_uppercase();
314        let row_encode = Encode::from_keyword(&s)?;
315        let row_options = self.parse_options()?;
316
317        let key_encode = if self.parse_keywords(&[Keyword::KEY, Keyword::ENCODE]) {
318            Some(Encode::from_keyword(
319                self.parse_identifier()?.value.to_ascii_uppercase().as_str(),
320            )?)
321        } else {
322            None
323        };
324
325        Ok(Some(FormatEncodeOptions {
326            format,
327            row_encode,
328            row_options,
329            key_encode,
330        }))
331    }
332}
333
334impl FormatEncodeOptions {
335    pub const fn plain_json() -> Self {
336        FormatEncodeOptions {
337            format: Format::Plain,
338            row_encode: Encode::Json,
339            row_options: Vec::new(),
340            key_encode: None,
341        }
342    }
343
344    /// Create a new source schema with `Debezium` format and `Json` encoding.
345    pub const fn debezium_json() -> Self {
346        FormatEncodeOptions {
347            format: Format::Debezium,
348            row_encode: Encode::Json,
349            row_options: Vec::new(),
350            key_encode: None,
351        }
352    }
353
354    pub const fn debezium_mongo_json() -> Self {
355        FormatEncodeOptions {
356            format: Format::DebeziumMongo,
357            row_encode: Encode::Json,
358            row_options: Vec::new(),
359            key_encode: None,
360        }
361    }
362
363    /// Create a new source schema with `Native` format and encoding.
364    pub const fn native() -> Self {
365        FormatEncodeOptions {
366            format: Format::Native,
367            row_encode: Encode::Native,
368            row_options: Vec::new(),
369            key_encode: None,
370        }
371    }
372
373    /// Create a new source schema with `None` format and encoding.
374    /// Used for self-explanatory source like iceberg.
375    pub const fn none() -> Self {
376        FormatEncodeOptions {
377            format: Format::None,
378            row_encode: Encode::None,
379            row_options: Vec::new(),
380            key_encode: None,
381        }
382    }
383
384    pub fn row_options(&self) -> &[SqlOption] {
385        self.row_options.as_ref()
386    }
387}
388
389impl fmt::Display for FormatEncodeOptions {
390    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
391        write!(f, "FORMAT {} ENCODE {}", self.format, self.row_encode)?;
392
393        if !self.row_options().is_empty() {
394            write!(f, " ({})", display_comma_separated(self.row_options()))?;
395        }
396
397        if let Some(key_encode) = &self.key_encode {
398            write!(f, " KEY ENCODE {}", key_encode)?;
399        }
400
401        Ok(())
402    }
403}
404
405pub(super) fn fmt_create_items(
406    columns: &[ColumnDef],
407    constraints: &[TableConstraint],
408    watermarks: &[SourceWatermark],
409    wildcard_idx: Option<usize>,
410) -> std::result::Result<String, fmt::Error> {
411    let mut items = String::new();
412    let has_items = !columns.is_empty()
413        || !constraints.is_empty()
414        || !watermarks.is_empty()
415        || wildcard_idx.is_some();
416    has_items.then(|| write!(&mut items, "("));
417
418    if let Some(wildcard_idx) = wildcard_idx {
419        let (columns_l, columns_r) = columns.split_at(wildcard_idx);
420        write!(&mut items, "{}", display_comma_separated(columns_l))?;
421        if !columns_l.is_empty() {
422            write!(&mut items, ", ")?;
423        }
424        write!(&mut items, "{}", Token::Mul)?;
425        if !columns_r.is_empty() {
426            write!(&mut items, ", ")?;
427        }
428        write!(&mut items, "{}", display_comma_separated(columns_r))?;
429    } else {
430        write!(&mut items, "{}", display_comma_separated(columns))?;
431    }
432    let mut leading_items = !columns.is_empty() || wildcard_idx.is_some();
433
434    if leading_items && !constraints.is_empty() {
435        write!(&mut items, ", ")?;
436    }
437    write!(&mut items, "{}", display_comma_separated(constraints))?;
438    leading_items |= !constraints.is_empty();
439
440    if leading_items && !watermarks.is_empty() {
441        write!(&mut items, ", ")?;
442    }
443    write!(&mut items, "{}", display_comma_separated(watermarks))?;
444    // uncomment this when adding more sections below
445    // leading_items |= !watermarks.is_empty();
446
447    has_items.then(|| write!(&mut items, ")"));
448    Ok(items)
449}
450
451impl fmt::Display for CreateSourceStatement {
452    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
453        let mut v: Vec<String> = vec![];
454        impl_fmt_display!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], v, self);
455        impl_fmt_display!(source_name, v, self);
456
457        let items = fmt_create_items(
458            &self.columns,
459            &self.constraints,
460            &self.source_watermarks,
461            self.wildcard_idx,
462        )?;
463        if !items.is_empty() {
464            v.push(items);
465        }
466
467        for item in &self.include_column_options {
468            v.push(format!("{}", item));
469        }
470
471        // skip format_encode for cdc source
472        let is_cdc_source = self.with_properties.0.iter().any(|option| {
473            option.name.real_value().eq_ignore_ascii_case("connector")
474                && option.value.to_string().contains("cdc")
475        });
476
477        impl_fmt_display!(with_properties, v, self);
478        if !is_cdc_source {
479            impl_fmt_display!(format_encode, v, self);
480        }
481        v.iter().join(" ").fmt(f)
482    }
483}
484
485#[derive(Debug, Clone, PartialEq, Eq, Hash)]
486pub enum CreateSink {
487    From(ObjectName),
488    AsQuery(Box<Query>),
489}
490
491impl fmt::Display for CreateSink {
492    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
493        match self {
494            Self::From(mv) => write!(f, "FROM {}", mv),
495            Self::AsQuery(query) => write!(f, "AS {}", query),
496        }
497    }
498}
499// sql_grammar!(CreateSinkStatement {
500//     if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS],
501//     sink_name: Ident,
502//     [Keyword::FROM],
503//     materialized_view: Ident,
504//     with_properties: AstOption<WithProperties>,
505// });
506#[derive(Debug, Clone, PartialEq, Eq, Hash)]
507pub struct CreateSinkStatement {
508    pub or_replace: bool,
509    pub if_not_exists: bool,
510    pub sink_name: ObjectName,
511    pub with_properties: WithProperties,
512    pub sink_from: CreateSink,
513
514    // only used when creating sink into a table
515    // insert to specific columns of the target table
516    pub columns: Vec<Ident>,
517    pub emit_mode: Option<EmitMode>,
518    pub sink_schema: Option<FormatEncodeOptions>,
519    pub into_table_name: Option<ObjectName>,
520}
521
522impl ParseTo for CreateSinkStatement {
523    fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
524        Self::parse_to_with_or_replace(p, false)
525    }
526}
527
528impl CreateSinkStatement {
529    pub fn parse_to_with_or_replace(p: &mut Parser<'_>, or_replace: bool) -> ModalResult<Self> {
530        impl_parse_to!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], p);
531        impl_parse_to!(sink_name: ObjectName, p);
532
533        let mut target_spec_columns = Vec::new();
534        let into_table_name = if p.parse_keyword(Keyword::INTO) {
535            impl_parse_to!(into_table_name: ObjectName, p);
536
537            // we only allow specify columns when creating sink into a table
538            target_spec_columns = p.parse_parenthesized_column_list(IsOptional::Optional)?;
539            Some(into_table_name)
540        } else {
541            None
542        };
543
544        let sink_from = if p.parse_keyword(Keyword::FROM) {
545            impl_parse_to!(from_name: ObjectName, p);
546            CreateSink::From(from_name)
547        } else if p.parse_keyword(Keyword::AS) {
548            let query = Box::new(p.parse_query()?);
549            CreateSink::AsQuery(query)
550        } else {
551            p.expected(if or_replace {
552                "FROM or AS after REPLACE SINK sink_name"
553            } else {
554                "FROM or AS after CREATE SINK sink_name"
555            })?
556        };
557
558        let emit_mode: Option<EmitMode> = p.parse_emit_mode()?;
559
560        // This check cannot be put into the `WithProperties::parse_to`, since other
561        // statements may not need the with properties.
562        if !p.peek_nth_any_of_keywords(0, &[Keyword::WITH]) && into_table_name.is_none() {
563            p.expected("WITH")?
564        }
565        impl_parse_to!(with_properties: WithProperties, p);
566
567        if with_properties.0.is_empty() && into_table_name.is_none() {
568            parser_err!("sink properties not provided");
569        }
570
571        let sink_schema = p.parse_schema()?;
572
573        Ok(Self {
574            or_replace,
575            if_not_exists,
576            sink_name,
577            with_properties,
578            sink_from,
579            columns: target_spec_columns,
580            emit_mode,
581            sink_schema,
582            into_table_name,
583        })
584    }
585}
586
587impl fmt::Display for CreateSinkStatement {
588    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
589        let mut v: Vec<String> = vec![];
590        impl_fmt_display!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], v, self);
591        impl_fmt_display!(sink_name, v, self);
592        if let Some(into_table) = &self.into_table_name {
593            impl_fmt_display!([Keyword::INTO], v);
594            impl_fmt_display!([into_table], v);
595            if !self.columns.is_empty() {
596                v.push(format!("({})", display_comma_separated(&self.columns)));
597            }
598        }
599        impl_fmt_display!(sink_from, v, self);
600        if let Some(ref emit_mode) = self.emit_mode {
601            v.push(format!("EMIT {}", emit_mode));
602        }
603        impl_fmt_display!(with_properties, v, self);
604        if let Some(schema) = &self.sink_schema {
605            v.push(format!("{}", schema));
606        }
607        v.iter().join(" ").fmt(f)
608    }
609}
610
611// sql_grammar!(CreateSubscriptionStatement {
612//     if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS],
613//     subscription_name: Ident,
614//     [Keyword::FROM],
615//     materialized_view: Ident,
616//     with_properties: AstOption<WithProperties>,
617// });
618#[derive(Debug, Clone, PartialEq, Eq, Hash)]
619pub struct CreateSubscriptionStatement {
620    pub if_not_exists: bool,
621    pub subscription_name: ObjectName,
622    pub with_properties: WithProperties,
623    pub subscription_from: ObjectName,
624    // pub emit_mode: Option<EmitMode>,
625}
626
627impl ParseTo for CreateSubscriptionStatement {
628    fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
629        impl_parse_to!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], p);
630        impl_parse_to!(subscription_name: ObjectName, p);
631
632        let subscription_from = if p.parse_keyword(Keyword::FROM) {
633            impl_parse_to!(from_name: ObjectName, p);
634            from_name
635        } else {
636            p.expected("FROM after CREATE SUBSCRIPTION subscription_name")?
637        };
638
639        // let emit_mode = p.parse_emit_mode()?;
640
641        // This check cannot be put into the `WithProperties::parse_to`, since other
642        // statements may not need the with properties.
643        if !p.peek_nth_any_of_keywords(0, &[Keyword::WITH]) {
644            p.expected("WITH")?
645        }
646        impl_parse_to!(with_properties: WithProperties, p);
647
648        if with_properties.0.is_empty() {
649            parser_err!("subscription properties not provided");
650        }
651
652        Ok(Self {
653            if_not_exists,
654            subscription_name,
655            with_properties,
656            subscription_from,
657        })
658    }
659}
660
661impl fmt::Display for CreateSubscriptionStatement {
662    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
663        let mut v: Vec<String> = vec![];
664        impl_fmt_display!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], v, self);
665        impl_fmt_display!(subscription_name, v, self);
666        v.push(format!("FROM {}", self.subscription_from));
667        impl_fmt_display!(with_properties, v, self);
668        v.iter().join(" ").fmt(f)
669    }
670}
671
672#[derive(Debug, Clone, PartialEq, Eq, Hash)]
673pub enum DeclareCursor {
674    Query(Box<Query>),
675    Subscription(ObjectName, Since),
676}
677
678impl fmt::Display for DeclareCursor {
679    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
680        let mut v: Vec<String> = vec![];
681        match self {
682            DeclareCursor::Query(query) => v.push(format!("{}", query.as_ref())),
683            DeclareCursor::Subscription(name, since) => {
684                v.push(format!("{}", name));
685                v.push(format!("{:?}", since));
686            }
687        }
688        v.iter().join(" ").fmt(f)
689    }
690}
691
692// sql_grammar!(DeclareCursorStatement {
693//     cursor_name: Ident,
694//     [Keyword::SUBSCRIPTION]
695//     [Keyword::CURSOR],
696//     [Keyword::FOR],
697//     subscription: Ident or query: Query,
698//     [Keyword::SINCE],
699//     rw_timestamp: Ident,
700// });
701#[derive(Debug, Clone, PartialEq, Eq, Hash)]
702pub struct DeclareCursorStatement {
703    pub cursor_name: Ident,
704    pub declare_cursor: DeclareCursor,
705}
706
707impl ParseTo for DeclareCursorStatement {
708    fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
709        let cursor_name = p.parse_identifier_non_reserved()?;
710
711        let declare_cursor = if !p.parse_keyword(Keyword::SUBSCRIPTION) {
712            p.expect_keyword(Keyword::CURSOR)?;
713            p.expect_keyword(Keyword::FOR)?;
714            DeclareCursor::Query(Box::new(p.parse_query()?))
715        } else {
716            p.expect_keyword(Keyword::CURSOR)?;
717            p.expect_keyword(Keyword::FOR)?;
718            let cursor_for_name = p.parse_object_name()?;
719            let rw_timestamp = p.parse_since()?;
720            DeclareCursor::Subscription(cursor_for_name, rw_timestamp)
721        };
722
723        Ok(Self {
724            cursor_name,
725            declare_cursor,
726        })
727    }
728}
729
730impl fmt::Display for DeclareCursorStatement {
731    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
732        let mut v: Vec<String> = vec![];
733        impl_fmt_display!(cursor_name, v, self);
734        match &self.declare_cursor {
735            DeclareCursor::Query(_) => {
736                v.push("CURSOR FOR ".to_owned());
737            }
738            DeclareCursor::Subscription { .. } => {
739                v.push("SUBSCRIPTION CURSOR FOR ".to_owned());
740            }
741        }
742        impl_fmt_display!(declare_cursor, v, self);
743        v.iter().join(" ").fmt(f)
744    }
745}
746
747// sql_grammar!(FetchCursorStatement {
748//     cursor_name: Ident,
749// });
750#[derive(Debug, Clone, PartialEq, Eq, Hash)]
751pub struct FetchCursorStatement {
752    pub cursor_name: Ident,
753    pub count: u32,
754    pub with_properties: WithProperties,
755}
756
757impl ParseTo for FetchCursorStatement {
758    fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
759        let count = if p.parse_keyword(Keyword::NEXT) {
760            1
761        } else {
762            literal_u32(p)?
763        };
764        p.expect_keyword(Keyword::FROM)?;
765        let cursor_name = p.parse_identifier_non_reserved()?;
766        impl_parse_to!(with_properties: WithProperties, p);
767
768        Ok(Self {
769            cursor_name,
770            count,
771            with_properties,
772        })
773    }
774}
775
776impl fmt::Display for FetchCursorStatement {
777    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
778        let mut v: Vec<String> = vec![];
779        if self.count == 1 {
780            v.push("NEXT ".to_owned());
781        } else {
782            impl_fmt_display!(count, v, self);
783        }
784        v.push("FROM ".to_owned());
785        impl_fmt_display!(cursor_name, v, self);
786        v.iter().join(" ").fmt(f)
787    }
788}
789
790// sql_grammar!(CloseCursorStatement {
791//     cursor_name: Ident,
792// });
793#[derive(Debug, Clone, PartialEq, Eq, Hash)]
794pub struct CloseCursorStatement {
795    pub cursor_name: Option<Ident>,
796}
797
798impl ParseTo for CloseCursorStatement {
799    fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
800        let cursor_name = if p.parse_keyword(Keyword::ALL) {
801            None
802        } else {
803            Some(p.parse_identifier_non_reserved()?)
804        };
805
806        Ok(Self { cursor_name })
807    }
808}
809
810impl fmt::Display for CloseCursorStatement {
811    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
812        let mut v: Vec<String> = vec![];
813        if let Some(cursor_name) = &self.cursor_name {
814            v.push(format!("{}", cursor_name));
815        } else {
816            v.push("ALL".to_owned());
817        }
818        v.iter().join(" ").fmt(f)
819    }
820}
821
822// sql_grammar!(CreateConnectionStatement {
823//     if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS],
824//     connection_name: Ident,
825//     with_properties: AstOption<WithProperties>,
826// });
827#[derive(Debug, Clone, PartialEq, Eq, Hash)]
828pub struct CreateConnectionStatement {
829    pub if_not_exists: bool,
830    pub connection_name: ObjectName,
831    pub with_properties: WithProperties,
832}
833
834impl ParseTo for CreateConnectionStatement {
835    fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
836        impl_parse_to!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], p);
837        impl_parse_to!(connection_name: ObjectName, p);
838        impl_parse_to!(with_properties: WithProperties, p);
839        if with_properties.0.is_empty() {
840            parser_err!("connection properties not provided");
841        }
842
843        Ok(Self {
844            if_not_exists,
845            connection_name,
846            with_properties,
847        })
848    }
849}
850
851impl fmt::Display for CreateConnectionStatement {
852    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
853        let mut v: Vec<String> = vec![];
854        impl_fmt_display!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], v, self);
855        impl_fmt_display!(connection_name, v, self);
856        impl_fmt_display!(with_properties, v, self);
857        v.iter().join(" ").fmt(f)
858    }
859}
860
861#[derive(Debug, Clone, PartialEq, Eq, Hash)]
862pub struct CreateSecretStatement {
863    pub if_not_exists: bool,
864    pub secret_name: ObjectName,
865    pub credential: Value,
866    pub with_properties: WithProperties,
867}
868
869impl ParseTo for CreateSecretStatement {
870    fn parse_to(parser: &mut Parser<'_>) -> ModalResult<Self> {
871        impl_parse_to!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], parser);
872        impl_parse_to!(secret_name: ObjectName, parser);
873        impl_parse_to!(with_properties: WithProperties, parser);
874        let mut credential = Value::Null;
875        if parser.parse_keyword(Keyword::AS) {
876            credential = parser.ensure_parse_value()?;
877        }
878        Ok(Self {
879            if_not_exists,
880            secret_name,
881            credential,
882            with_properties,
883        })
884    }
885}
886
887impl fmt::Display for CreateSecretStatement {
888    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
889        let mut v: Vec<String> = vec![];
890        impl_fmt_display!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], v, self);
891        impl_fmt_display!(secret_name, v, self);
892        impl_fmt_display!(with_properties, v, self);
893        if self.credential != Value::Null {
894            v.push("AS".to_owned());
895            impl_fmt_display!(credential, v, self);
896        }
897        v.iter().join(" ").fmt(f)
898    }
899}
900
901#[derive(Debug, Clone, PartialEq, Eq, Hash)]
902pub struct WithProperties(pub Vec<SqlOption>);
903
904impl ParseTo for WithProperties {
905    fn parse_to(parser: &mut Parser<'_>) -> ModalResult<Self> {
906        Ok(Self(
907            parser.parse_options_with_preceding_keyword(Keyword::WITH)?,
908        ))
909    }
910}
911
912impl fmt::Display for WithProperties {
913    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
914        if !self.0.is_empty() {
915            write!(f, "WITH ({})", display_comma_separated(self.0.as_slice()))
916        } else {
917            Ok(())
918        }
919    }
920}
921
922#[derive(Debug, Clone, PartialEq, Eq, Hash)]
923pub enum Since {
924    TimestampMsNum(u64),
925    ProcessTime,
926    Begin,
927    Full,
928}
929
930impl fmt::Display for Since {
931    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
932        use Since::*;
933        match self {
934            TimestampMsNum(ts) => write!(f, " SINCE {}", ts),
935            ProcessTime => write!(f, " SINCE PROCTIME()"),
936            Begin => write!(f, " SINCE BEGIN()"),
937            Full => write!(f, " FULL"),
938        }
939    }
940}
941
942#[derive(Debug, Clone, PartialEq, Eq, Hash)]
943pub struct RowSchemaLocation {
944    pub value: AstString,
945}
946
947impl ParseTo for RowSchemaLocation {
948    fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
949        impl_parse_to!([Keyword::ROW, Keyword::SCHEMA, Keyword::LOCATION], p);
950        impl_parse_to!(value: AstString, p);
951        Ok(Self { value })
952    }
953}
954
955impl fmt::Display for RowSchemaLocation {
956    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
957        let mut v = vec![];
958        impl_fmt_display!([Keyword::ROW, Keyword::SCHEMA, Keyword::LOCATION], v);
959        impl_fmt_display!(value, v, self);
960        v.iter().join(" ").fmt(f)
961    }
962}
963
964/// String literal. The difference with String is that it is displayed with
965/// single-quotes.
966#[derive(Debug, Clone, PartialEq, Eq, Hash)]
967pub struct AstString(pub String);
968
969impl ParseTo for AstString {
970    fn parse_to(parser: &mut Parser<'_>) -> ModalResult<Self> {
971        Ok(Self(parser.parse_literal_string()?))
972    }
973}
974
975impl fmt::Display for AstString {
976    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
977        write!(f, "'{}'", self.0)
978    }
979}
980
981/// This trait is used to replace `Option` because `fmt::Display` can not be implemented for
982/// `Option<T>`.
983#[derive(Debug, Clone, PartialEq, Eq, Hash)]
984pub enum AstOption<T> {
985    /// No value
986    None,
987    /// Some value `T`
988    Some(T),
989}
990
991impl<T: ParseTo> ParseTo for AstOption<T> {
992    fn parse_to(parser: &mut Parser<'_>) -> ModalResult<Self> {
993        match T::parse_to(parser) {
994            Ok(t) => Ok(AstOption::Some(t)),
995            Err(_) => Ok(AstOption::None),
996        }
997    }
998}
999
1000impl<T: fmt::Display> fmt::Display for AstOption<T> {
1001    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1002        match &self {
1003            AstOption::Some(t) => t.fmt(f),
1004            AstOption::None => Ok(()),
1005        }
1006    }
1007}
1008
1009impl<T> From<AstOption<T>> for Option<T> {
1010    fn from(val: AstOption<T>) -> Self {
1011        match val {
1012            AstOption::Some(t) => Some(t),
1013            AstOption::None => None,
1014        }
1015    }
1016}
1017
1018#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1019pub struct CreateUserStatement {
1020    pub user_name: ObjectName,
1021    pub with_options: UserOptions,
1022}
1023
1024#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1025pub struct AlterUserStatement {
1026    pub user_name: ObjectName,
1027    pub mode: AlterUserMode,
1028}
1029
1030#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1031pub enum AlterUserMode {
1032    Options(UserOptions),
1033    Rename(ObjectName),
1034}
1035
1036#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1037pub enum UserOption {
1038    SuperUser,
1039    NoSuperUser,
1040    CreateDB,
1041    NoCreateDB,
1042    CreateUser,
1043    NoCreateUser,
1044    Login,
1045    NoLogin,
1046    Admin,
1047    NoAdmin,
1048    EncryptedPassword(AstString),
1049    Password(Option<AstString>),
1050    OAuth(Vec<SqlOption>),
1051}
1052
1053impl fmt::Display for UserOption {
1054    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1055        match self {
1056            UserOption::SuperUser => write!(f, "SUPERUSER"),
1057            UserOption::NoSuperUser => write!(f, "NOSUPERUSER"),
1058            UserOption::CreateDB => write!(f, "CREATEDB"),
1059            UserOption::NoCreateDB => write!(f, "NOCREATEDB"),
1060            UserOption::CreateUser => write!(f, "CREATEUSER"),
1061            UserOption::NoCreateUser => write!(f, "NOCREATEUSER"),
1062            UserOption::Login => write!(f, "LOGIN"),
1063            UserOption::NoLogin => write!(f, "NOLOGIN"),
1064            UserOption::Admin => write!(f, "ADMIN"),
1065            UserOption::NoAdmin => write!(f, "NOADMIN"),
1066            UserOption::EncryptedPassword(p) => {
1067                if should_redact_user_password() {
1068                    write!(f, "ENCRYPTED PASSWORD [REDACTED]")
1069                } else {
1070                    write!(f, "ENCRYPTED PASSWORD {}", p)
1071                }
1072            }
1073            UserOption::Password(None) => write!(f, "PASSWORD NULL"),
1074            UserOption::Password(Some(p)) => {
1075                if should_redact_user_password() {
1076                    write!(f, "PASSWORD [REDACTED]")
1077                } else {
1078                    write!(f, "PASSWORD {}", p)
1079                }
1080            }
1081            UserOption::OAuth(options) => {
1082                write!(f, "({})", display_comma_separated(options.as_slice()))
1083            }
1084        }
1085    }
1086}
1087
1088#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1089pub struct UserOptions(pub Vec<UserOption>, pub bool);
1090
1091fn should_redact_user_password() -> bool {
1092    REDACT_SQL_OPTION_KEYWORDS.try_with(|_| ()).is_ok()
1093}
1094
1095#[derive(Default)]
1096struct UserOptionsBuilder {
1097    with_prefix: bool,
1098    super_user: Option<UserOption>,
1099    create_db: Option<UserOption>,
1100    create_user: Option<UserOption>,
1101    login: Option<UserOption>,
1102    admin: Option<UserOption>,
1103    password: Option<UserOption>,
1104}
1105
1106impl UserOptionsBuilder {
1107    fn build(self) -> UserOptions {
1108        let mut options = vec![];
1109        if let Some(option) = self.super_user {
1110            options.push(option);
1111        }
1112        if let Some(option) = self.create_db {
1113            options.push(option);
1114        }
1115        if let Some(option) = self.create_user {
1116            options.push(option);
1117        }
1118        if let Some(option) = self.login {
1119            options.push(option);
1120        }
1121        if let Some(option) = self.admin {
1122            options.push(option);
1123        }
1124        if let Some(option) = self.password {
1125            options.push(option);
1126        }
1127        UserOptions(options, self.with_prefix)
1128    }
1129}
1130
1131impl ParseTo for UserOptions {
1132    fn parse_to(parser: &mut Parser<'_>) -> ModalResult<Self> {
1133        let mut builder = UserOptionsBuilder {
1134            with_prefix: parser.parse_keyword(Keyword::WITH),
1135            ..Default::default()
1136        };
1137        let add_option = |item: &mut Option<UserOption>, user_option| {
1138            let old_value = item.replace(user_option);
1139            if old_value.is_some() {
1140                parser_err!("conflicting or redundant options");
1141            }
1142            Ok(())
1143        };
1144        loop {
1145            let token = parser.peek_token();
1146            if token == Token::EOF || token == Token::SemiColon {
1147                break;
1148            }
1149
1150            let option = parser.parse_identifier()?;
1151            let s = option.real_value();
1152            let (item_mut_ref, user_option) = match &s[..] {
1153                "superuser" => (&mut builder.super_user, UserOption::SuperUser),
1154                "nosuperuser" => (&mut builder.super_user, UserOption::NoSuperUser),
1155                "createdb" => (&mut builder.create_db, UserOption::CreateDB),
1156                "nocreatedb" => (&mut builder.create_db, UserOption::NoCreateDB),
1157                "createuser" => (&mut builder.create_user, UserOption::CreateUser),
1158                "nocreateuser" => (&mut builder.create_user, UserOption::NoCreateUser),
1159                "login" => (&mut builder.login, UserOption::Login),
1160                "nologin" => (&mut builder.login, UserOption::NoLogin),
1161                "admin" => (&mut builder.admin, UserOption::Admin),
1162                "noadmin" => (&mut builder.admin, UserOption::NoAdmin),
1163                "password" => {
1164                    if parser.parse_keyword(Keyword::NULL) {
1165                        (&mut builder.password, UserOption::Password(None))
1166                    } else {
1167                        (
1168                            &mut builder.password,
1169                            UserOption::Password(Some(AstString::parse_to(parser)?)),
1170                        )
1171                    }
1172                }
1173                "encrypted" => {
1174                    let option = parser.parse_identifier()?;
1175                    let s = option.real_value();
1176                    if s != "password" {
1177                        parser_err!("expected PASSWORD after ENCRYPTED, found {}", option);
1178                    }
1179                    (
1180                        &mut builder.password,
1181                        UserOption::EncryptedPassword(AstString::parse_to(parser)?),
1182                    )
1183                }
1184                "oauth" => {
1185                    let options = parser.parse_options()?;
1186                    (&mut builder.password, UserOption::OAuth(options))
1187                }
1188                _ => {
1189                    parser_err!("unexpected user option: {}", option);
1190                }
1191            };
1192            add_option(item_mut_ref, user_option)?;
1193        }
1194        Ok(builder.build())
1195    }
1196}
1197
1198impl fmt::Display for UserOptions {
1199    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1200        if !self.0.is_empty() {
1201            if self.1 {
1202                write!(f, "WITH ")?;
1203            }
1204            write!(f, "{}", display_separated(self.0.as_slice(), " "))
1205        } else {
1206            Ok(())
1207        }
1208    }
1209}
1210
1211impl ParseTo for CreateUserStatement {
1212    fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
1213        impl_parse_to!(user_name: ObjectName, p);
1214        impl_parse_to!(with_options: UserOptions, p);
1215
1216        Ok(CreateUserStatement {
1217            user_name,
1218            with_options,
1219        })
1220    }
1221}
1222
1223impl fmt::Display for CreateUserStatement {
1224    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1225        let mut v: Vec<String> = vec![];
1226        impl_fmt_display!(user_name, v, self);
1227        impl_fmt_display!(with_options, v, self);
1228        v.iter().join(" ").fmt(f)
1229    }
1230}
1231
1232impl fmt::Display for AlterUserMode {
1233    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1234        match self {
1235            AlterUserMode::Options(options) => {
1236                write!(f, "{}", options)
1237            }
1238            AlterUserMode::Rename(new_name) => {
1239                write!(f, "RENAME TO {}", new_name)
1240            }
1241        }
1242    }
1243}
1244
1245impl fmt::Display for AlterUserStatement {
1246    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1247        let mut v: Vec<String> = vec![];
1248        impl_fmt_display!(user_name, v, self);
1249        impl_fmt_display!(mode, v, self);
1250        v.iter().join(" ").fmt(f)
1251    }
1252}
1253
1254impl ParseTo for AlterUserStatement {
1255    fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
1256        impl_parse_to!(user_name: ObjectName, p);
1257        impl_parse_to!(mode: AlterUserMode, p);
1258
1259        Ok(AlterUserStatement { user_name, mode })
1260    }
1261}
1262
1263impl ParseTo for AlterUserMode {
1264    fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
1265        if p.parse_keyword(Keyword::RENAME) {
1266            p.expect_keyword(Keyword::TO)?;
1267            impl_parse_to!(new_name: ObjectName, p);
1268            Ok(AlterUserMode::Rename(new_name))
1269        } else {
1270            impl_parse_to!(with_options: UserOptions, p);
1271            Ok(AlterUserMode::Options(with_options))
1272        }
1273    }
1274}
1275
1276#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1277pub struct DropStatement {
1278    /// The type of the object to drop: TABLE, VIEW, etc.
1279    pub object_type: ObjectType,
1280    /// An optional `IF EXISTS` clause. (Non-standard.)
1281    pub if_exists: bool,
1282    /// Object to drop.
1283    pub object_name: ObjectName,
1284    /// Whether `CASCADE` was specified. This will be `false` when
1285    /// `RESTRICT` or no drop behavior at all was specified.
1286    pub drop_mode: AstOption<DropMode>,
1287}
1288
1289// sql_grammar!(DropStatement {
1290//     object_type: ObjectType,
1291//     if_exists => [Keyword::IF, Keyword::EXISTS],
1292//     name: ObjectName,
1293//     drop_mode: AstOption<DropMode>,
1294// });
1295impl ParseTo for DropStatement {
1296    fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
1297        impl_parse_to!(object_type: ObjectType, p);
1298        impl_parse_to!(if_exists => [Keyword::IF, Keyword::EXISTS], p);
1299        let object_name = p.parse_object_name()?;
1300        impl_parse_to!(drop_mode: AstOption<DropMode>, p);
1301        Ok(Self {
1302            object_type,
1303            if_exists,
1304            object_name,
1305            drop_mode,
1306        })
1307    }
1308}
1309
1310impl fmt::Display for DropStatement {
1311    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1312        let mut v: Vec<String> = vec![];
1313        impl_fmt_display!(object_type, v, self);
1314        impl_fmt_display!(if_exists => [Keyword::IF, Keyword::EXISTS], v, self);
1315        impl_fmt_display!(object_name, v, self);
1316        impl_fmt_display!(drop_mode, v, self);
1317        v.iter().join(" ").fmt(f)
1318    }
1319}
1320
1321#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1322pub enum DropMode {
1323    Cascade,
1324    Restrict,
1325}
1326
1327impl ParseTo for DropMode {
1328    fn parse_to(parser: &mut Parser<'_>) -> ModalResult<Self> {
1329        let drop_mode = if parser.parse_keyword(Keyword::CASCADE) {
1330            DropMode::Cascade
1331        } else if parser.parse_keyword(Keyword::RESTRICT) {
1332            DropMode::Restrict
1333        } else {
1334            return parser.expected("CASCADE | RESTRICT");
1335        };
1336        Ok(drop_mode)
1337    }
1338}
1339
1340impl fmt::Display for DropMode {
1341    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1342        f.write_str(match self {
1343            DropMode::Cascade => "CASCADE",
1344            DropMode::Restrict => "RESTRICT",
1345        })
1346    }
1347}