1use core::fmt;
16use core::fmt::Formatter;
17use std::fmt::Write;
18
19use itertools::Itertools;
20#[cfg(feature = "serde")]
21use serde::{Deserialize, Serialize};
22use winnow::ModalResult;
23
24use super::ddl::SourceWatermark;
25use super::legacy_source::{CompatibleFormatEncode, parse_format_encode};
26use super::{EmitMode, Ident, ObjectType, Query, Value};
27use crate::ast::{
28 ColumnDef, ObjectName, SqlOption, TableConstraint, display_comma_separated, display_separated,
29};
30use crate::keywords::Keyword;
31use crate::parser::{IncludeOption, IsOptional, Parser};
32use crate::parser_err;
33use crate::parser_v2::literal_u32;
34use crate::tokenizer::Token;
35
36pub trait ParseTo: Sized {
38 fn parse_to(parser: &mut Parser<'_>) -> ModalResult<Self>;
39}
40
41#[macro_export]
42macro_rules! impl_parse_to {
43 () => {};
44 ($field:ident : $field_type:ty, $parser:ident) => {
45 let $field = <$field_type>::parse_to($parser)?;
46 };
47 ($field:ident => [$($arr:tt)+], $parser:ident) => {
48 let $field = $parser.parse_keywords(&[$($arr)+]);
49 };
50 ([$($arr:tt)+], $parser:ident) => {
51 $parser.expect_keywords(&[$($arr)+])?;
52 };
53}
54
55#[macro_export]
56macro_rules! impl_fmt_display {
57 () => {};
58 ($field:ident, $v:ident, $self:ident) => {{
59 let s = format!("{}", $self.$field);
60 if !s.is_empty() {
61 $v.push(s);
62 }
63 }};
64 ($field:ident => [$($arr:tt)+], $v:ident, $self:ident) => {
65 if $self.$field {
66 $v.push(format!("{}", display_separated(&[$($arr)+], " ")));
67 }
68 };
69 ([$($arr:tt)+], $v:ident) => {
70 $v.push(format!("{}", display_separated(&[$($arr)+], " ")));
71 };
72}
73
74#[derive(Debug, Clone, PartialEq, Eq, Hash)]
83#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
84pub struct CreateSourceStatement {
85 pub temporary: bool,
86 pub if_not_exists: bool,
87 pub columns: Vec<ColumnDef>,
88 pub wildcard_idx: Option<usize>,
90 pub constraints: Vec<TableConstraint>,
91 pub source_name: ObjectName,
92 pub with_properties: WithProperties,
93 pub format_encode: CompatibleFormatEncode,
94 pub source_watermarks: Vec<SourceWatermark>,
95 pub include_column_options: IncludeOption,
96}
97
98#[derive(Debug, Clone, PartialEq, Eq, Hash)]
102#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
103pub enum Format {
104 Native,
107 None,
109 Debezium,
111 DebeziumMongo,
113 Maxwell,
115 Canal,
117 Upsert,
119 Plain,
121}
122
123impl fmt::Display for Format {
125 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
126 write!(
127 f,
128 "{}",
129 match self {
130 Format::Native => "NATIVE",
131 Format::Debezium => "DEBEZIUM",
132 Format::DebeziumMongo => "DEBEZIUM_MONGO",
133 Format::Maxwell => "MAXWELL",
134 Format::Canal => "CANAL",
135 Format::Upsert => "UPSERT",
136 Format::Plain => "PLAIN",
137 Format::None => "NONE",
138 }
139 )
140 }
141}
142
143impl Format {
144 pub fn from_keyword(s: &str) -> ModalResult<Self> {
145 Ok(match s {
146 "DEBEZIUM" => Format::Debezium,
147 "DEBEZIUM_MONGO" => Format::DebeziumMongo,
148 "MAXWELL" => Format::Maxwell,
149 "CANAL" => Format::Canal,
150 "PLAIN" => Format::Plain,
151 "UPSERT" => Format::Upsert,
152 "NATIVE" => Format::Native,
153 "NONE" => Format::None,
154 _ => parser_err!(
155 "expected CANAL | PROTOBUF | DEBEZIUM | MAXWELL | PLAIN | NATIVE | NONE after FORMAT"
156 ),
157 })
158 }
159}
160
161#[derive(Debug, Clone, PartialEq, Eq, Hash)]
163#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
164pub enum Encode {
165 Avro, Csv, Protobuf, Json, Bytes, None,
172 Text, Native,
176 Template,
177 Parquet,
178}
179
180impl fmt::Display for Encode {
182 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
183 write!(
184 f,
185 "{}",
186 match self {
187 Encode::Avro => "AVRO",
188 Encode::Csv => "CSV",
189 Encode::Protobuf => "PROTOBUF",
190 Encode::Json => "JSON",
191 Encode::Bytes => "BYTES",
192 Encode::Native => "NATIVE",
193 Encode::Template => "TEMPLATE",
194 Encode::None => "NONE",
195 Encode::Parquet => "PARQUET",
196 Encode::Text => "TEXT",
197 }
198 )
199 }
200}
201
202impl Encode {
203 pub fn from_keyword(s: &str) -> ModalResult<Self> {
204 Ok(match s {
205 "AVRO" => Encode::Avro,
206 "TEXT" => Encode::Text,
207 "BYTES" => Encode::Bytes,
208 "CSV" => Encode::Csv,
209 "PROTOBUF" => Encode::Protobuf,
210 "JSON" => Encode::Json,
211 "TEMPLATE" => Encode::Template,
212 "PARQUET" => Encode::Parquet,
213 "NATIVE" => Encode::Native,
214 "NONE" => Encode::None,
215 _ => parser_err!(
216 "expected AVRO | BYTES | CSV | PROTOBUF | JSON | NATIVE | TEMPLATE | PARQUET | NONE after Encode"
217 ),
218 })
219 }
220}
221
222#[derive(Debug, Clone, PartialEq, Eq, Hash)]
224#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
225pub struct FormatEncodeOptions {
226 pub format: Format,
227 pub row_encode: Encode,
228 pub row_options: Vec<SqlOption>,
229
230 pub key_encode: Option<Encode>,
231}
232
233impl Parser<'_> {
234 fn peek_format_encode_format(&mut self) -> bool {
236 (self.peek_nth_any_of_keywords(0, &[Keyword::ROW])
237 && self.peek_nth_any_of_keywords(1, &[Keyword::FORMAT])) || self.peek_nth_any_of_keywords(0, &[Keyword::FORMAT]) }
240
241 pub fn parse_format_encode_with_connector(
243 &mut self,
244 connector: &str,
245 cdc_source_job: bool,
246 ) -> ModalResult<CompatibleFormatEncode> {
247 if connector.contains("-cdc") {
252 let expected = if cdc_source_job {
253 FormatEncodeOptions::plain_json()
254 } else if connector.contains("mongodb") {
255 FormatEncodeOptions::debezium_mongo_json()
256 } else {
257 FormatEncodeOptions::debezium_json()
258 };
259
260 if self.peek_format_encode_format() {
261 let schema = parse_format_encode(self)?.into_v2();
262 if schema != expected {
263 parser_err!(
264 "Row format for CDC connectors should be \
265 either omitted or set to `{expected}`",
266 );
267 }
268 }
269 Ok(expected.into())
270 } else if connector.contains("nexmark") {
271 let expected = FormatEncodeOptions::native();
272 if self.peek_format_encode_format() {
273 let schema = parse_format_encode(self)?.into_v2();
274 if schema != expected {
275 parser_err!(
276 "Row format for nexmark connectors should be \
277 either omitted or set to `{expected}`",
278 );
279 }
280 }
281 Ok(expected.into())
282 } else if connector.contains("datagen") {
283 Ok(if self.peek_format_encode_format() {
284 parse_format_encode(self)?
285 } else {
286 FormatEncodeOptions::native().into()
287 })
288 } else if connector.contains("iceberg") {
289 let expected = FormatEncodeOptions::none();
290 if self.peek_format_encode_format() {
291 let schema = parse_format_encode(self)?.into_v2();
292 if schema != expected {
293 parser_err!(
294 "Row format for iceberg connectors should be \
295 either omitted or set to `{expected}`",
296 );
297 }
298 }
299 Ok(expected.into())
300 } else if connector.contains("webhook") {
301 parser_err!(
302 "Source with webhook connector is not supported. \
303 Please use the `CREATE TABLE ... WITH ...` statement instead.",
304 );
305 } else {
306 Ok(parse_format_encode(self)?)
307 }
308 }
309
310 pub fn parse_schema(&mut self) -> ModalResult<Option<FormatEncodeOptions>> {
312 if !self.parse_keyword(Keyword::FORMAT) {
313 return Ok(None);
314 }
315
316 let id = self.parse_identifier()?;
317 let s = id.value.to_ascii_uppercase();
318 let format = Format::from_keyword(&s)?;
319 self.expect_keyword(Keyword::ENCODE)?;
320 let id = self.parse_identifier()?;
321 let s = id.value.to_ascii_uppercase();
322 let row_encode = Encode::from_keyword(&s)?;
323 let row_options = self.parse_options()?;
324
325 let key_encode = if self.parse_keywords(&[Keyword::KEY, Keyword::ENCODE]) {
326 Some(Encode::from_keyword(
327 self.parse_identifier()?.value.to_ascii_uppercase().as_str(),
328 )?)
329 } else {
330 None
331 };
332
333 Ok(Some(FormatEncodeOptions {
334 format,
335 row_encode,
336 row_options,
337 key_encode,
338 }))
339 }
340}
341
342impl FormatEncodeOptions {
343 pub const fn plain_json() -> Self {
344 FormatEncodeOptions {
345 format: Format::Plain,
346 row_encode: Encode::Json,
347 row_options: Vec::new(),
348 key_encode: None,
349 }
350 }
351
352 pub const fn debezium_json() -> Self {
354 FormatEncodeOptions {
355 format: Format::Debezium,
356 row_encode: Encode::Json,
357 row_options: Vec::new(),
358 key_encode: None,
359 }
360 }
361
362 pub const fn debezium_mongo_json() -> Self {
363 FormatEncodeOptions {
364 format: Format::DebeziumMongo,
365 row_encode: Encode::Json,
366 row_options: Vec::new(),
367 key_encode: None,
368 }
369 }
370
371 pub const fn native() -> Self {
373 FormatEncodeOptions {
374 format: Format::Native,
375 row_encode: Encode::Native,
376 row_options: Vec::new(),
377 key_encode: None,
378 }
379 }
380
381 pub const fn none() -> Self {
384 FormatEncodeOptions {
385 format: Format::None,
386 row_encode: Encode::None,
387 row_options: Vec::new(),
388 key_encode: None,
389 }
390 }
391
392 pub fn row_options(&self) -> &[SqlOption] {
393 self.row_options.as_ref()
394 }
395}
396
397impl fmt::Display for FormatEncodeOptions {
398 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
399 write!(f, "FORMAT {} ENCODE {}", self.format, self.row_encode)?;
400
401 if !self.row_options().is_empty() {
402 write!(f, " ({})", display_comma_separated(self.row_options()))?;
403 }
404
405 if let Some(key_encode) = &self.key_encode {
406 write!(f, " KEY ENCODE {}", key_encode)?;
407 }
408
409 Ok(())
410 }
411}
412
413pub(super) fn fmt_create_items(
414 columns: &[ColumnDef],
415 constraints: &[TableConstraint],
416 watermarks: &[SourceWatermark],
417 wildcard_idx: Option<usize>,
418) -> std::result::Result<String, fmt::Error> {
419 let mut items = String::new();
420 let has_items = !columns.is_empty()
421 || !constraints.is_empty()
422 || !watermarks.is_empty()
423 || wildcard_idx.is_some();
424 has_items.then(|| write!(&mut items, "("));
425
426 if let Some(wildcard_idx) = wildcard_idx {
427 let (columns_l, columns_r) = columns.split_at(wildcard_idx);
428 write!(&mut items, "{}", display_comma_separated(columns_l))?;
429 if !columns_l.is_empty() {
430 write!(&mut items, ", ")?;
431 }
432 write!(&mut items, "{}", Token::Mul)?;
433 if !columns_r.is_empty() {
434 write!(&mut items, ", ")?;
435 }
436 write!(&mut items, "{}", display_comma_separated(columns_r))?;
437 } else {
438 write!(&mut items, "{}", display_comma_separated(columns))?;
439 }
440 let mut leading_items = !columns.is_empty() || wildcard_idx.is_some();
441
442 if leading_items && !constraints.is_empty() {
443 write!(&mut items, ", ")?;
444 }
445 write!(&mut items, "{}", display_comma_separated(constraints))?;
446 leading_items |= !constraints.is_empty();
447
448 if leading_items && !watermarks.is_empty() {
449 write!(&mut items, ", ")?;
450 }
451 write!(&mut items, "{}", display_comma_separated(watermarks))?;
452 has_items.then(|| write!(&mut items, ")"));
456 Ok(items)
457}
458
459impl fmt::Display for CreateSourceStatement {
460 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
461 let mut v: Vec<String> = vec![];
462 impl_fmt_display!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], v, self);
463 impl_fmt_display!(source_name, v, self);
464
465 let items = fmt_create_items(
466 &self.columns,
467 &self.constraints,
468 &self.source_watermarks,
469 self.wildcard_idx,
470 )?;
471 if !items.is_empty() {
472 v.push(items);
473 }
474
475 for item in &self.include_column_options {
476 v.push(format!("{}", item));
477 }
478
479 let is_cdc_source = self.with_properties.0.iter().any(|option| {
481 option.name.real_value().eq_ignore_ascii_case("connector")
482 && option.value.to_string().contains("cdc")
483 });
484
485 impl_fmt_display!(with_properties, v, self);
486 if !is_cdc_source {
487 impl_fmt_display!(format_encode, v, self);
488 }
489 v.iter().join(" ").fmt(f)
490 }
491}
492
493#[derive(Debug, Clone, PartialEq, Eq, Hash)]
494#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
495pub enum CreateSink {
496 From(ObjectName),
497 AsQuery(Box<Query>),
498}
499
500impl fmt::Display for CreateSink {
501 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
502 match self {
503 Self::From(mv) => write!(f, "FROM {}", mv),
504 Self::AsQuery(query) => write!(f, "AS {}", query),
505 }
506 }
507}
508#[derive(Debug, Clone, PartialEq, Eq, Hash)]
516#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
517pub struct CreateSinkStatement {
518 pub if_not_exists: bool,
519 pub sink_name: ObjectName,
520 pub with_properties: WithProperties,
521 pub sink_from: CreateSink,
522
523 pub columns: Vec<Ident>,
526 pub emit_mode: Option<EmitMode>,
527 pub sink_schema: Option<FormatEncodeOptions>,
528 pub into_table_name: Option<ObjectName>,
529}
530
531impl ParseTo for CreateSinkStatement {
532 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
533 impl_parse_to!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], p);
534 impl_parse_to!(sink_name: ObjectName, p);
535
536 let mut target_spec_columns = Vec::new();
537 let into_table_name = if p.parse_keyword(Keyword::INTO) {
538 impl_parse_to!(into_table_name: ObjectName, p);
539
540 target_spec_columns = p.parse_parenthesized_column_list(IsOptional::Optional)?;
542 Some(into_table_name)
543 } else {
544 None
545 };
546
547 let sink_from = if p.parse_keyword(Keyword::FROM) {
548 impl_parse_to!(from_name: ObjectName, p);
549 CreateSink::From(from_name)
550 } else if p.parse_keyword(Keyword::AS) {
551 let query = Box::new(p.parse_query()?);
552 CreateSink::AsQuery(query)
553 } else {
554 p.expected("FROM or AS after CREATE SINK sink_name")?
555 };
556
557 let emit_mode: Option<EmitMode> = p.parse_emit_mode()?;
558
559 if !p.peek_nth_any_of_keywords(0, &[Keyword::WITH]) && into_table_name.is_none() {
562 p.expected("WITH")?
563 }
564 impl_parse_to!(with_properties: WithProperties, p);
565
566 if with_properties.0.is_empty() && into_table_name.is_none() {
567 parser_err!("sink properties not provided");
568 }
569
570 let sink_schema = p.parse_schema()?;
571
572 Ok(Self {
573 if_not_exists,
574 sink_name,
575 with_properties,
576 sink_from,
577 columns: target_spec_columns,
578 emit_mode,
579 sink_schema,
580 into_table_name,
581 })
582 }
583}
584
585impl fmt::Display for CreateSinkStatement {
586 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
587 let mut v: Vec<String> = vec![];
588 impl_fmt_display!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], v, self);
589 impl_fmt_display!(sink_name, v, self);
590 if let Some(into_table) = &self.into_table_name {
591 impl_fmt_display!([Keyword::INTO], v);
592 impl_fmt_display!([into_table], v);
593 if !self.columns.is_empty() {
594 v.push(format!("({})", display_comma_separated(&self.columns)));
595 }
596 }
597 impl_fmt_display!(sink_from, v, self);
598 if let Some(ref emit_mode) = self.emit_mode {
599 v.push(format!("EMIT {}", emit_mode));
600 }
601 impl_fmt_display!(with_properties, v, self);
602 if let Some(schema) = &self.sink_schema {
603 v.push(format!("{}", schema));
604 }
605 v.iter().join(" ").fmt(f)
606 }
607}
608
609#[derive(Debug, Clone, PartialEq, Eq, Hash)]
617#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
618pub struct CreateSubscriptionStatement {
619 pub if_not_exists: bool,
620 pub subscription_name: ObjectName,
621 pub with_properties: WithProperties,
622 pub subscription_from: ObjectName,
623 }
625
626impl ParseTo for CreateSubscriptionStatement {
627 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
628 impl_parse_to!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], p);
629 impl_parse_to!(subscription_name: ObjectName, p);
630
631 let subscription_from = if p.parse_keyword(Keyword::FROM) {
632 impl_parse_to!(from_name: ObjectName, p);
633 from_name
634 } else {
635 p.expected("FROM after CREATE SUBSCRIPTION subscription_name")?
636 };
637
638 if !p.peek_nth_any_of_keywords(0, &[Keyword::WITH]) {
643 p.expected("WITH")?
644 }
645 impl_parse_to!(with_properties: WithProperties, p);
646
647 if with_properties.0.is_empty() {
648 parser_err!("subscription properties not provided");
649 }
650
651 Ok(Self {
652 if_not_exists,
653 subscription_name,
654 with_properties,
655 subscription_from,
656 })
657 }
658}
659
660impl fmt::Display for CreateSubscriptionStatement {
661 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
662 let mut v: Vec<String> = vec![];
663 impl_fmt_display!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], v, self);
664 impl_fmt_display!(subscription_name, v, self);
665 v.push(format!("FROM {}", self.subscription_from));
666 impl_fmt_display!(with_properties, v, self);
667 v.iter().join(" ").fmt(f)
668 }
669}
670
671#[derive(Debug, Clone, PartialEq, Eq, Hash)]
672#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
673pub enum DeclareCursor {
674 Query(Box<Query>),
675 Subscription(ObjectName, Since),
676}
677
678impl fmt::Display for DeclareCursor {
679 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
680 let mut v: Vec<String> = vec![];
681 match self {
682 DeclareCursor::Query(query) => v.push(format!("{}", query.as_ref())),
683 DeclareCursor::Subscription(name, since) => {
684 v.push(format!("{}", name));
685 v.push(format!("{:?}", since));
686 }
687 }
688 v.iter().join(" ").fmt(f)
689 }
690}
691
692#[derive(Debug, Clone, PartialEq, Eq, Hash)]
702#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
703pub struct DeclareCursorStatement {
704 pub cursor_name: Ident,
705 pub declare_cursor: DeclareCursor,
706}
707
708impl ParseTo for DeclareCursorStatement {
709 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
710 let cursor_name = p.parse_identifier_non_reserved()?;
711
712 let declare_cursor = if !p.parse_keyword(Keyword::SUBSCRIPTION) {
713 p.expect_keyword(Keyword::CURSOR)?;
714 p.expect_keyword(Keyword::FOR)?;
715 DeclareCursor::Query(Box::new(p.parse_query()?))
716 } else {
717 p.expect_keyword(Keyword::CURSOR)?;
718 p.expect_keyword(Keyword::FOR)?;
719 let cursor_for_name = p.parse_object_name()?;
720 let rw_timestamp = p.parse_since()?;
721 DeclareCursor::Subscription(cursor_for_name, rw_timestamp)
722 };
723
724 Ok(Self {
725 cursor_name,
726 declare_cursor,
727 })
728 }
729}
730
731impl fmt::Display for DeclareCursorStatement {
732 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
733 let mut v: Vec<String> = vec![];
734 impl_fmt_display!(cursor_name, v, self);
735 match &self.declare_cursor {
736 DeclareCursor::Query(_) => {
737 v.push("CURSOR FOR ".to_owned());
738 }
739 DeclareCursor::Subscription { .. } => {
740 v.push("SUBSCRIPTION CURSOR FOR ".to_owned());
741 }
742 }
743 impl_fmt_display!(declare_cursor, v, self);
744 v.iter().join(" ").fmt(f)
745 }
746}
747
748#[derive(Debug, Clone, PartialEq, Eq, Hash)]
752#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
753pub struct FetchCursorStatement {
754 pub cursor_name: Ident,
755 pub count: u32,
756 pub with_properties: WithProperties,
757}
758
759impl ParseTo for FetchCursorStatement {
760 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
761 let count = if p.parse_keyword(Keyword::NEXT) {
762 1
763 } else {
764 literal_u32(p)?
765 };
766 p.expect_keyword(Keyword::FROM)?;
767 let cursor_name = p.parse_identifier_non_reserved()?;
768 impl_parse_to!(with_properties: WithProperties, p);
769
770 Ok(Self {
771 cursor_name,
772 count,
773 with_properties,
774 })
775 }
776}
777
778impl fmt::Display for FetchCursorStatement {
779 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
780 let mut v: Vec<String> = vec![];
781 if self.count == 1 {
782 v.push("NEXT ".to_owned());
783 } else {
784 impl_fmt_display!(count, v, self);
785 }
786 v.push("FROM ".to_owned());
787 impl_fmt_display!(cursor_name, v, self);
788 v.iter().join(" ").fmt(f)
789 }
790}
791
792#[derive(Debug, Clone, PartialEq, Eq, Hash)]
796#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
797pub struct CloseCursorStatement {
798 pub cursor_name: Option<Ident>,
799}
800
801impl ParseTo for CloseCursorStatement {
802 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
803 let cursor_name = if p.parse_keyword(Keyword::ALL) {
804 None
805 } else {
806 Some(p.parse_identifier_non_reserved()?)
807 };
808
809 Ok(Self { cursor_name })
810 }
811}
812
813impl fmt::Display for CloseCursorStatement {
814 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
815 let mut v: Vec<String> = vec![];
816 if let Some(cursor_name) = &self.cursor_name {
817 v.push(format!("{}", cursor_name));
818 } else {
819 v.push("ALL".to_owned());
820 }
821 v.iter().join(" ").fmt(f)
822 }
823}
824
825#[derive(Debug, Clone, PartialEq, Eq, Hash)]
831#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
832pub struct CreateConnectionStatement {
833 pub if_not_exists: bool,
834 pub connection_name: ObjectName,
835 pub with_properties: WithProperties,
836}
837
838impl ParseTo for CreateConnectionStatement {
839 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
840 impl_parse_to!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], p);
841 impl_parse_to!(connection_name: ObjectName, p);
842 impl_parse_to!(with_properties: WithProperties, p);
843 if with_properties.0.is_empty() {
844 parser_err!("connection properties not provided");
845 }
846
847 Ok(Self {
848 if_not_exists,
849 connection_name,
850 with_properties,
851 })
852 }
853}
854
855impl fmt::Display for CreateConnectionStatement {
856 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
857 let mut v: Vec<String> = vec![];
858 impl_fmt_display!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], v, self);
859 impl_fmt_display!(connection_name, v, self);
860 impl_fmt_display!(with_properties, v, self);
861 v.iter().join(" ").fmt(f)
862 }
863}
864
865#[derive(Debug, Clone, PartialEq, Eq, Hash)]
866#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
867pub struct CreateSecretStatement {
868 pub if_not_exists: bool,
869 pub secret_name: ObjectName,
870 pub credential: Value,
871 pub with_properties: WithProperties,
872}
873
874impl ParseTo for CreateSecretStatement {
875 fn parse_to(parser: &mut Parser<'_>) -> ModalResult<Self> {
876 impl_parse_to!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], parser);
877 impl_parse_to!(secret_name: ObjectName, parser);
878 impl_parse_to!(with_properties: WithProperties, parser);
879 let mut credential = Value::Null;
880 if parser.parse_keyword(Keyword::AS) {
881 credential = parser.ensure_parse_value()?;
882 }
883 Ok(Self {
884 if_not_exists,
885 secret_name,
886 credential,
887 with_properties,
888 })
889 }
890}
891
892impl fmt::Display for CreateSecretStatement {
893 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
894 let mut v: Vec<String> = vec![];
895 impl_fmt_display!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], v, self);
896 impl_fmt_display!(secret_name, v, self);
897 impl_fmt_display!(with_properties, v, self);
898 if self.credential != Value::Null {
899 v.push("AS".to_owned());
900 impl_fmt_display!(credential, v, self);
901 }
902 v.iter().join(" ").fmt(f)
903 }
904}
905
906#[derive(Debug, Clone, PartialEq, Eq, Hash)]
907#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
908pub struct WithProperties(pub Vec<SqlOption>);
909
910impl ParseTo for WithProperties {
911 fn parse_to(parser: &mut Parser<'_>) -> ModalResult<Self> {
912 Ok(Self(
913 parser.parse_options_with_preceding_keyword(Keyword::WITH)?,
914 ))
915 }
916}
917
918impl fmt::Display for WithProperties {
919 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
920 if !self.0.is_empty() {
921 write!(f, "WITH ({})", display_comma_separated(self.0.as_slice()))
922 } else {
923 Ok(())
924 }
925 }
926}
927
928#[derive(Debug, Clone, PartialEq, Eq, Hash)]
929#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
930pub enum Since {
931 TimestampMsNum(u64),
932 ProcessTime,
933 Begin,
934 Full,
935}
936
937impl fmt::Display for Since {
938 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
939 use Since::*;
940 match self {
941 TimestampMsNum(ts) => write!(f, " SINCE {}", ts),
942 ProcessTime => write!(f, " SINCE PROCTIME()"),
943 Begin => write!(f, " SINCE BEGIN()"),
944 Full => write!(f, " FULL"),
945 }
946 }
947}
948
949#[derive(Debug, Clone, PartialEq, Eq, Hash)]
950#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
951pub struct RowSchemaLocation {
952 pub value: AstString,
953}
954
955impl ParseTo for RowSchemaLocation {
956 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
957 impl_parse_to!([Keyword::ROW, Keyword::SCHEMA, Keyword::LOCATION], p);
958 impl_parse_to!(value: AstString, p);
959 Ok(Self { value })
960 }
961}
962
963impl fmt::Display for RowSchemaLocation {
964 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
965 let mut v = vec![];
966 impl_fmt_display!([Keyword::ROW, Keyword::SCHEMA, Keyword::LOCATION], v);
967 impl_fmt_display!(value, v, self);
968 v.iter().join(" ").fmt(f)
969 }
970}
971
972#[derive(Debug, Clone, PartialEq, Eq, Hash)]
975#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
976pub struct AstString(pub String);
977
978impl ParseTo for AstString {
979 fn parse_to(parser: &mut Parser<'_>) -> ModalResult<Self> {
980 Ok(Self(parser.parse_literal_string()?))
981 }
982}
983
984impl fmt::Display for AstString {
985 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
986 write!(f, "'{}'", self.0)
987 }
988}
989
990#[derive(Debug, Clone, PartialEq, Eq, Hash)]
993#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
994pub enum AstOption<T> {
995 None,
997 Some(T),
999}
1000
1001impl<T: ParseTo> ParseTo for AstOption<T> {
1002 fn parse_to(parser: &mut Parser<'_>) -> ModalResult<Self> {
1003 match T::parse_to(parser) {
1004 Ok(t) => Ok(AstOption::Some(t)),
1005 Err(_) => Ok(AstOption::None),
1006 }
1007 }
1008}
1009
1010impl<T: fmt::Display> fmt::Display for AstOption<T> {
1011 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1012 match &self {
1013 AstOption::Some(t) => t.fmt(f),
1014 AstOption::None => Ok(()),
1015 }
1016 }
1017}
1018
1019impl<T> From<AstOption<T>> for Option<T> {
1020 fn from(val: AstOption<T>) -> Self {
1021 match val {
1022 AstOption::Some(t) => Some(t),
1023 AstOption::None => None,
1024 }
1025 }
1026}
1027
1028#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1029#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
1030pub struct CreateUserStatement {
1031 pub user_name: ObjectName,
1032 pub with_options: UserOptions,
1033}
1034
1035#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1036#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
1037pub struct AlterUserStatement {
1038 pub user_name: ObjectName,
1039 pub mode: AlterUserMode,
1040}
1041
1042#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1043#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
1044pub enum AlterUserMode {
1045 Options(UserOptions),
1046 Rename(ObjectName),
1047}
1048
1049#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1050#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
1051pub enum UserOption {
1052 SuperUser,
1053 NoSuperUser,
1054 CreateDB,
1055 NoCreateDB,
1056 CreateUser,
1057 NoCreateUser,
1058 Login,
1059 NoLogin,
1060 EncryptedPassword(AstString),
1061 Password(Option<AstString>),
1062 OAuth(Vec<SqlOption>),
1063}
1064
1065impl fmt::Display for UserOption {
1066 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1067 match self {
1068 UserOption::SuperUser => write!(f, "SUPERUSER"),
1069 UserOption::NoSuperUser => write!(f, "NOSUPERUSER"),
1070 UserOption::CreateDB => write!(f, "CREATEDB"),
1071 UserOption::NoCreateDB => write!(f, "NOCREATEDB"),
1072 UserOption::CreateUser => write!(f, "CREATEUSER"),
1073 UserOption::NoCreateUser => write!(f, "NOCREATEUSER"),
1074 UserOption::Login => write!(f, "LOGIN"),
1075 UserOption::NoLogin => write!(f, "NOLOGIN"),
1076 UserOption::EncryptedPassword(p) => write!(f, "ENCRYPTED PASSWORD {}", p),
1077 UserOption::Password(None) => write!(f, "PASSWORD NULL"),
1078 UserOption::Password(Some(p)) => write!(f, "PASSWORD {}", p),
1079 UserOption::OAuth(options) => {
1080 write!(f, "({})", display_comma_separated(options.as_slice()))
1081 }
1082 }
1083 }
1084}
1085
1086#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1087#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
1088pub struct UserOptions(pub Vec<UserOption>);
1089
1090#[derive(Default)]
1091struct UserOptionsBuilder {
1092 super_user: Option<UserOption>,
1093 create_db: Option<UserOption>,
1094 create_user: Option<UserOption>,
1095 login: Option<UserOption>,
1096 password: Option<UserOption>,
1097}
1098
1099impl UserOptionsBuilder {
1100 fn build(self) -> UserOptions {
1101 let mut options = vec![];
1102 if let Some(option) = self.super_user {
1103 options.push(option);
1104 }
1105 if let Some(option) = self.create_db {
1106 options.push(option);
1107 }
1108 if let Some(option) = self.create_user {
1109 options.push(option);
1110 }
1111 if let Some(option) = self.login {
1112 options.push(option);
1113 }
1114 if let Some(option) = self.password {
1115 options.push(option);
1116 }
1117 UserOptions(options)
1118 }
1119}
1120
1121impl ParseTo for UserOptions {
1122 fn parse_to(parser: &mut Parser<'_>) -> ModalResult<Self> {
1123 let mut builder = UserOptionsBuilder::default();
1124 let add_option = |item: &mut Option<UserOption>, user_option| {
1125 let old_value = item.replace(user_option);
1126 if old_value.is_some() {
1127 parser_err!("conflicting or redundant options");
1128 }
1129 Ok(())
1130 };
1131 let _ = parser.parse_keyword(Keyword::WITH);
1132 loop {
1133 let token = parser.peek_token();
1134 if token == Token::EOF || token == Token::SemiColon {
1135 break;
1136 }
1137
1138 if let Token::Word(ref w) = token.token {
1139 let checkpoint = *parser;
1140 parser.next_token();
1141 let (item_mut_ref, user_option) = match w.keyword {
1142 Keyword::SUPERUSER => (&mut builder.super_user, UserOption::SuperUser),
1143 Keyword::NOSUPERUSER => (&mut builder.super_user, UserOption::NoSuperUser),
1144 Keyword::CREATEDB => (&mut builder.create_db, UserOption::CreateDB),
1145 Keyword::NOCREATEDB => (&mut builder.create_db, UserOption::NoCreateDB),
1146 Keyword::CREATEUSER => (&mut builder.create_user, UserOption::CreateUser),
1147 Keyword::NOCREATEUSER => (&mut builder.create_user, UserOption::NoCreateUser),
1148 Keyword::LOGIN => (&mut builder.login, UserOption::Login),
1149 Keyword::NOLOGIN => (&mut builder.login, UserOption::NoLogin),
1150 Keyword::PASSWORD => {
1151 if parser.parse_keyword(Keyword::NULL) {
1152 (&mut builder.password, UserOption::Password(None))
1153 } else {
1154 (
1155 &mut builder.password,
1156 UserOption::Password(Some(AstString::parse_to(parser)?)),
1157 )
1158 }
1159 }
1160 Keyword::ENCRYPTED => {
1161 parser.expect_keyword(Keyword::PASSWORD)?;
1162 (
1163 &mut builder.password,
1164 UserOption::EncryptedPassword(AstString::parse_to(parser)?),
1165 )
1166 }
1167 Keyword::OAUTH => {
1168 let options = parser.parse_options()?;
1169 (&mut builder.password, UserOption::OAuth(options))
1170 }
1171 _ => {
1172 parser.expected_at(
1173 checkpoint,
1174 "SUPERUSER | NOSUPERUSER | CREATEDB | NOCREATEDB | LOGIN \
1175 | NOLOGIN | CREATEUSER | NOCREATEUSER | [ENCRYPTED] PASSWORD | NULL | OAUTH",
1176 )?;
1177 unreachable!()
1178 }
1179 };
1180 add_option(item_mut_ref, user_option)?;
1181 } else {
1182 parser.expected(
1183 "SUPERUSER | NOSUPERUSER | CREATEDB | NOCREATEDB | LOGIN | NOLOGIN \
1184 | CREATEUSER | NOCREATEUSER | [ENCRYPTED] PASSWORD | NULL | OAUTH",
1185 )?
1186 }
1187 }
1188 Ok(builder.build())
1189 }
1190}
1191
1192impl fmt::Display for UserOptions {
1193 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1194 if !self.0.is_empty() {
1195 write!(f, "WITH {}", display_separated(self.0.as_slice(), " "))
1196 } else {
1197 Ok(())
1198 }
1199 }
1200}
1201
1202impl ParseTo for CreateUserStatement {
1203 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
1204 impl_parse_to!(user_name: ObjectName, p);
1205 impl_parse_to!(with_options: UserOptions, p);
1206
1207 Ok(CreateUserStatement {
1208 user_name,
1209 with_options,
1210 })
1211 }
1212}
1213
1214impl fmt::Display for CreateUserStatement {
1215 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1216 let mut v: Vec<String> = vec![];
1217 impl_fmt_display!(user_name, v, self);
1218 impl_fmt_display!(with_options, v, self);
1219 v.iter().join(" ").fmt(f)
1220 }
1221}
1222
1223impl fmt::Display for AlterUserMode {
1224 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1225 match self {
1226 AlterUserMode::Options(options) => {
1227 write!(f, "{}", options)
1228 }
1229 AlterUserMode::Rename(new_name) => {
1230 write!(f, "RENAME TO {}", new_name)
1231 }
1232 }
1233 }
1234}
1235
1236impl fmt::Display for AlterUserStatement {
1237 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1238 let mut v: Vec<String> = vec![];
1239 impl_fmt_display!(user_name, v, self);
1240 impl_fmt_display!(mode, v, self);
1241 v.iter().join(" ").fmt(f)
1242 }
1243}
1244
1245impl ParseTo for AlterUserStatement {
1246 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
1247 impl_parse_to!(user_name: ObjectName, p);
1248 impl_parse_to!(mode: AlterUserMode, p);
1249
1250 Ok(AlterUserStatement { user_name, mode })
1251 }
1252}
1253
1254impl ParseTo for AlterUserMode {
1255 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
1256 if p.parse_keyword(Keyword::RENAME) {
1257 p.expect_keyword(Keyword::TO)?;
1258 impl_parse_to!(new_name: ObjectName, p);
1259 Ok(AlterUserMode::Rename(new_name))
1260 } else {
1261 impl_parse_to!(with_options: UserOptions, p);
1262 Ok(AlterUserMode::Options(with_options))
1263 }
1264 }
1265}
1266
1267#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1268#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
1269pub struct DropStatement {
1270 pub object_type: ObjectType,
1272 pub if_exists: bool,
1274 pub object_name: ObjectName,
1276 pub drop_mode: AstOption<DropMode>,
1279}
1280
1281impl ParseTo for DropStatement {
1288 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
1289 impl_parse_to!(object_type: ObjectType, p);
1290 impl_parse_to!(if_exists => [Keyword::IF, Keyword::EXISTS], p);
1291 let object_name = p.parse_object_name()?;
1292 impl_parse_to!(drop_mode: AstOption<DropMode>, p);
1293 Ok(Self {
1294 object_type,
1295 if_exists,
1296 object_name,
1297 drop_mode,
1298 })
1299 }
1300}
1301
1302impl fmt::Display for DropStatement {
1303 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1304 let mut v: Vec<String> = vec![];
1305 impl_fmt_display!(object_type, v, self);
1306 impl_fmt_display!(if_exists => [Keyword::IF, Keyword::EXISTS], v, self);
1307 impl_fmt_display!(object_name, v, self);
1308 impl_fmt_display!(drop_mode, v, self);
1309 v.iter().join(" ").fmt(f)
1310 }
1311}
1312
1313#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1314#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
1315pub enum DropMode {
1316 Cascade,
1317 Restrict,
1318}
1319
1320impl ParseTo for DropMode {
1321 fn parse_to(parser: &mut Parser<'_>) -> ModalResult<Self> {
1322 let drop_mode = if parser.parse_keyword(Keyword::CASCADE) {
1323 DropMode::Cascade
1324 } else if parser.parse_keyword(Keyword::RESTRICT) {
1325 DropMode::Restrict
1326 } else {
1327 return parser.expected("CASCADE | RESTRICT");
1328 };
1329 Ok(drop_mode)
1330 }
1331}
1332
1333impl fmt::Display for DropMode {
1334 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1335 f.write_str(match self {
1336 DropMode::Cascade => "CASCADE",
1337 DropMode::Restrict => "RESTRICT",
1338 })
1339 }
1340}