1use std::fmt;
16use std::fmt::{Formatter, Write};
17
18use itertools::Itertools;
19use winnow::ModalResult;
20
21use super::ddl::SourceWatermark;
22use super::legacy_source::{CompatibleFormatEncode, parse_format_encode};
23use super::{EmitMode, Ident, ObjectType, Query, Value};
24use crate::ast::{
25 ColumnDef, ObjectName, SqlOption, TableConstraint, display_comma_separated, display_separated,
26};
27use crate::keywords::Keyword;
28use crate::parser::{IncludeOption, IsOptional, Parser};
29use crate::parser_err;
30use crate::parser_v2::literal_u32;
31use crate::tokenizer::Token;
32
33pub trait ParseTo: Sized {
35 fn parse_to(parser: &mut Parser<'_>) -> ModalResult<Self>;
36}
37
38#[macro_export]
39macro_rules! impl_parse_to {
40 () => {};
41 ($field:ident : $field_type:ty, $parser:ident) => {
42 let $field = <$field_type>::parse_to($parser)?;
43 };
44 ($field:ident => [$($arr:tt)+], $parser:ident) => {
45 let $field = $parser.parse_keywords(&[$($arr)+]);
46 };
47 ([$($arr:tt)+], $parser:ident) => {
48 $parser.expect_keywords(&[$($arr)+])?;
49 };
50}
51
52#[macro_export]
53macro_rules! impl_fmt_display {
54 () => {};
55 ($field:ident, $v:ident, $self:ident) => {{
56 let s = format!("{}", $self.$field);
57 if !s.is_empty() {
58 $v.push(s);
59 }
60 }};
61 ($field:ident => [$($arr:tt)+], $v:ident, $self:ident) => {
62 if $self.$field {
63 $v.push(format!("{}", display_separated(&[$($arr)+], " ")));
64 }
65 };
66 ([$($arr:tt)+], $v:ident) => {
67 $v.push(format!("{}", display_separated(&[$($arr)+], " ")));
68 };
69}
70
71#[derive(Debug, Clone, PartialEq, Eq, Hash)]
80pub struct CreateSourceStatement {
81 pub temporary: bool,
82 pub if_not_exists: bool,
83 pub columns: Vec<ColumnDef>,
84 pub wildcard_idx: Option<usize>,
86 pub constraints: Vec<TableConstraint>,
87 pub source_name: ObjectName,
88 pub with_properties: WithProperties,
89 pub format_encode: CompatibleFormatEncode,
90 pub source_watermarks: Vec<SourceWatermark>,
91 pub include_column_options: IncludeOption,
92}
93
94#[derive(Debug, Clone, PartialEq, Eq, Hash)]
98pub enum Format {
99 Native,
102 None,
104 Debezium,
106 DebeziumMongo,
108 Maxwell,
110 Canal,
112 Upsert,
114 Plain,
116}
117
118impl fmt::Display for Format {
120 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
121 write!(
122 f,
123 "{}",
124 match self {
125 Format::Native => "NATIVE",
126 Format::Debezium => "DEBEZIUM",
127 Format::DebeziumMongo => "DEBEZIUM_MONGO",
128 Format::Maxwell => "MAXWELL",
129 Format::Canal => "CANAL",
130 Format::Upsert => "UPSERT",
131 Format::Plain => "PLAIN",
132 Format::None => "NONE",
133 }
134 )
135 }
136}
137
138impl Format {
139 pub fn from_keyword(s: &str) -> ModalResult<Self> {
140 Ok(match s {
141 "DEBEZIUM" => Format::Debezium,
142 "DEBEZIUM_MONGO" => Format::DebeziumMongo,
143 "MAXWELL" => Format::Maxwell,
144 "CANAL" => Format::Canal,
145 "PLAIN" => Format::Plain,
146 "UPSERT" => Format::Upsert,
147 "NATIVE" => Format::Native,
148 "NONE" => Format::None,
149 _ => parser_err!("expected PROTOBUF | DEBEZIUM | PLAIN | NATIVE | NONE after FORMAT"),
150 })
151 }
152}
153
154#[derive(Debug, Clone, PartialEq, Eq, Hash)]
156pub enum Encode {
157 Avro, Csv, Protobuf, Json, Bytes, None,
164 Text, Native,
168 Template,
169 Parquet,
170}
171
172impl fmt::Display for Encode {
174 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
175 write!(
176 f,
177 "{}",
178 match self {
179 Encode::Avro => "AVRO",
180 Encode::Csv => "CSV",
181 Encode::Protobuf => "PROTOBUF",
182 Encode::Json => "JSON",
183 Encode::Bytes => "BYTES",
184 Encode::Native => "NATIVE",
185 Encode::Template => "TEMPLATE",
186 Encode::None => "NONE",
187 Encode::Parquet => "PARQUET",
188 Encode::Text => "TEXT",
189 }
190 )
191 }
192}
193
194impl Encode {
195 pub fn from_keyword(s: &str) -> ModalResult<Self> {
196 Ok(match s {
197 "AVRO" => Encode::Avro,
198 "TEXT" => Encode::Text,
199 "BYTES" => Encode::Bytes,
200 "CSV" => Encode::Csv,
201 "PROTOBUF" => Encode::Protobuf,
202 "JSON" => Encode::Json,
203 "TEMPLATE" => Encode::Template,
204 "PARQUET" => Encode::Parquet,
205 "NATIVE" => Encode::Native,
206 "NONE" => Encode::None,
207 _ => parser_err!(
208 "expected AVRO | BYTES | CSV | PROTOBUF | JSON | NATIVE | TEMPLATE | PARQUET | NONE after Encode"
209 ),
210 })
211 }
212}
213
214#[derive(Debug, Clone, PartialEq, Eq, Hash)]
216pub struct FormatEncodeOptions {
217 pub format: Format,
218 pub row_encode: Encode,
219 pub row_options: Vec<SqlOption>,
220
221 pub key_encode: Option<Encode>,
222}
223
224impl Parser<'_> {
225 fn peek_format_encode_format(&mut self) -> bool {
227 (self.peek_nth_any_of_keywords(0, &[Keyword::ROW])
228 && self.peek_nth_any_of_keywords(1, &[Keyword::FORMAT])) || self.peek_nth_any_of_keywords(0, &[Keyword::FORMAT]) }
231
232 pub fn parse_format_encode_with_connector(
234 &mut self,
235 connector: &str,
236 cdc_source_job: bool,
237 ) -> ModalResult<CompatibleFormatEncode> {
238 if connector.contains("-cdc") {
243 let expected = if cdc_source_job {
244 FormatEncodeOptions::plain_json()
245 } else if connector.contains("mongodb") {
246 FormatEncodeOptions::debezium_mongo_json()
247 } else {
248 FormatEncodeOptions::debezium_json()
249 };
250
251 if self.peek_format_encode_format() {
252 let schema = parse_format_encode(self)?.into_v2();
253 if schema != expected {
254 parser_err!(
255 "Row format for CDC connectors should be \
256 either omitted or set to `{expected}`",
257 );
258 }
259 }
260 Ok(expected.into())
261 } else if connector.contains("nexmark") {
262 let expected = FormatEncodeOptions::native();
263 if self.peek_format_encode_format() {
264 let schema = parse_format_encode(self)?.into_v2();
265 if schema != expected {
266 parser_err!(
267 "Row format for nexmark connectors should be \
268 either omitted or set to `{expected}`",
269 );
270 }
271 }
272 Ok(expected.into())
273 } else if connector.contains("datagen") {
274 Ok(if self.peek_format_encode_format() {
275 parse_format_encode(self)?
276 } else {
277 FormatEncodeOptions::native().into()
278 })
279 } else if connector.contains("iceberg") || connector.contains("adbc_snowflake") {
280 let expected = FormatEncodeOptions::none();
281 if self.peek_format_encode_format() {
282 let schema = parse_format_encode(self)?.into_v2();
283 if schema != expected {
284 parser_err!(
285 "Row format for iceberg connectors should be \
286 either omitted or set to `{expected}`",
287 );
288 }
289 }
290 Ok(expected.into())
291 } else if connector.contains("webhook") {
292 parser_err!(
293 "Source with webhook connector is not supported. \
294 Please use the `CREATE TABLE ... WITH ...` statement instead.",
295 );
296 } else {
297 Ok(parse_format_encode(self)?)
298 }
299 }
300
301 pub fn parse_schema(&mut self) -> ModalResult<Option<FormatEncodeOptions>> {
303 if !self.parse_keyword(Keyword::FORMAT) {
304 return Ok(None);
305 }
306
307 let id = self.parse_identifier()?;
308 let s = id.value.to_ascii_uppercase();
309 let format = Format::from_keyword(&s)?;
310 self.expect_keyword(Keyword::ENCODE)?;
311 let id = self.parse_identifier()?;
312 let s = id.value.to_ascii_uppercase();
313 let row_encode = Encode::from_keyword(&s)?;
314 let row_options = self.parse_options()?;
315
316 let key_encode = if self.parse_keywords(&[Keyword::KEY, Keyword::ENCODE]) {
317 Some(Encode::from_keyword(
318 self.parse_identifier()?.value.to_ascii_uppercase().as_str(),
319 )?)
320 } else {
321 None
322 };
323
324 Ok(Some(FormatEncodeOptions {
325 format,
326 row_encode,
327 row_options,
328 key_encode,
329 }))
330 }
331}
332
333impl FormatEncodeOptions {
334 pub const fn plain_json() -> Self {
335 FormatEncodeOptions {
336 format: Format::Plain,
337 row_encode: Encode::Json,
338 row_options: Vec::new(),
339 key_encode: None,
340 }
341 }
342
343 pub const fn debezium_json() -> Self {
345 FormatEncodeOptions {
346 format: Format::Debezium,
347 row_encode: Encode::Json,
348 row_options: Vec::new(),
349 key_encode: None,
350 }
351 }
352
353 pub const fn debezium_mongo_json() -> Self {
354 FormatEncodeOptions {
355 format: Format::DebeziumMongo,
356 row_encode: Encode::Json,
357 row_options: Vec::new(),
358 key_encode: None,
359 }
360 }
361
362 pub const fn native() -> Self {
364 FormatEncodeOptions {
365 format: Format::Native,
366 row_encode: Encode::Native,
367 row_options: Vec::new(),
368 key_encode: None,
369 }
370 }
371
372 pub const fn none() -> Self {
375 FormatEncodeOptions {
376 format: Format::None,
377 row_encode: Encode::None,
378 row_options: Vec::new(),
379 key_encode: None,
380 }
381 }
382
383 pub fn row_options(&self) -> &[SqlOption] {
384 self.row_options.as_ref()
385 }
386}
387
388impl fmt::Display for FormatEncodeOptions {
389 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
390 write!(f, "FORMAT {} ENCODE {}", self.format, self.row_encode)?;
391
392 if !self.row_options().is_empty() {
393 write!(f, " ({})", display_comma_separated(self.row_options()))?;
394 }
395
396 if let Some(key_encode) = &self.key_encode {
397 write!(f, " KEY ENCODE {}", key_encode)?;
398 }
399
400 Ok(())
401 }
402}
403
404pub(super) fn fmt_create_items(
405 columns: &[ColumnDef],
406 constraints: &[TableConstraint],
407 watermarks: &[SourceWatermark],
408 wildcard_idx: Option<usize>,
409) -> std::result::Result<String, fmt::Error> {
410 let mut items = String::new();
411 let has_items = !columns.is_empty()
412 || !constraints.is_empty()
413 || !watermarks.is_empty()
414 || wildcard_idx.is_some();
415 has_items.then(|| write!(&mut items, "("));
416
417 if let Some(wildcard_idx) = wildcard_idx {
418 let (columns_l, columns_r) = columns.split_at(wildcard_idx);
419 write!(&mut items, "{}", display_comma_separated(columns_l))?;
420 if !columns_l.is_empty() {
421 write!(&mut items, ", ")?;
422 }
423 write!(&mut items, "{}", Token::Mul)?;
424 if !columns_r.is_empty() {
425 write!(&mut items, ", ")?;
426 }
427 write!(&mut items, "{}", display_comma_separated(columns_r))?;
428 } else {
429 write!(&mut items, "{}", display_comma_separated(columns))?;
430 }
431 let mut leading_items = !columns.is_empty() || wildcard_idx.is_some();
432
433 if leading_items && !constraints.is_empty() {
434 write!(&mut items, ", ")?;
435 }
436 write!(&mut items, "{}", display_comma_separated(constraints))?;
437 leading_items |= !constraints.is_empty();
438
439 if leading_items && !watermarks.is_empty() {
440 write!(&mut items, ", ")?;
441 }
442 write!(&mut items, "{}", display_comma_separated(watermarks))?;
443 has_items.then(|| write!(&mut items, ")"));
447 Ok(items)
448}
449
450impl fmt::Display for CreateSourceStatement {
451 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
452 let mut v: Vec<String> = vec![];
453 impl_fmt_display!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], v, self);
454 impl_fmt_display!(source_name, v, self);
455
456 let items = fmt_create_items(
457 &self.columns,
458 &self.constraints,
459 &self.source_watermarks,
460 self.wildcard_idx,
461 )?;
462 if !items.is_empty() {
463 v.push(items);
464 }
465
466 for item in &self.include_column_options {
467 v.push(format!("{}", item));
468 }
469
470 let is_cdc_source = self.with_properties.0.iter().any(|option| {
472 option.name.real_value().eq_ignore_ascii_case("connector")
473 && option.value.to_string().contains("cdc")
474 });
475
476 impl_fmt_display!(with_properties, v, self);
477 if !is_cdc_source {
478 impl_fmt_display!(format_encode, v, self);
479 }
480 v.iter().join(" ").fmt(f)
481 }
482}
483
484#[derive(Debug, Clone, PartialEq, Eq, Hash)]
485pub enum CreateSink {
486 From(ObjectName),
487 AsQuery(Box<Query>),
488}
489
490impl fmt::Display for CreateSink {
491 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
492 match self {
493 Self::From(mv) => write!(f, "FROM {}", mv),
494 Self::AsQuery(query) => write!(f, "AS {}", query),
495 }
496 }
497}
498#[derive(Debug, Clone, PartialEq, Eq, Hash)]
506pub struct CreateSinkStatement {
507 pub if_not_exists: bool,
508 pub sink_name: ObjectName,
509 pub with_properties: WithProperties,
510 pub sink_from: CreateSink,
511
512 pub columns: Vec<Ident>,
515 pub emit_mode: Option<EmitMode>,
516 pub sink_schema: Option<FormatEncodeOptions>,
517 pub into_table_name: Option<ObjectName>,
518}
519
520impl ParseTo for CreateSinkStatement {
521 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
522 impl_parse_to!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], p);
523 impl_parse_to!(sink_name: ObjectName, p);
524
525 let mut target_spec_columns = Vec::new();
526 let into_table_name = if p.parse_keyword(Keyword::INTO) {
527 impl_parse_to!(into_table_name: ObjectName, p);
528
529 target_spec_columns = p.parse_parenthesized_column_list(IsOptional::Optional)?;
531 Some(into_table_name)
532 } else {
533 None
534 };
535
536 let sink_from = if p.parse_keyword(Keyword::FROM) {
537 impl_parse_to!(from_name: ObjectName, p);
538 CreateSink::From(from_name)
539 } else if p.parse_keyword(Keyword::AS) {
540 let query = Box::new(p.parse_query()?);
541 CreateSink::AsQuery(query)
542 } else {
543 p.expected("FROM or AS after CREATE SINK sink_name")?
544 };
545
546 let emit_mode: Option<EmitMode> = p.parse_emit_mode()?;
547
548 if !p.peek_nth_any_of_keywords(0, &[Keyword::WITH]) && into_table_name.is_none() {
551 p.expected("WITH")?
552 }
553 impl_parse_to!(with_properties: WithProperties, p);
554
555 if with_properties.0.is_empty() && into_table_name.is_none() {
556 parser_err!("sink properties not provided");
557 }
558
559 let sink_schema = p.parse_schema()?;
560
561 Ok(Self {
562 if_not_exists,
563 sink_name,
564 with_properties,
565 sink_from,
566 columns: target_spec_columns,
567 emit_mode,
568 sink_schema,
569 into_table_name,
570 })
571 }
572}
573
574impl fmt::Display for CreateSinkStatement {
575 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
576 let mut v: Vec<String> = vec![];
577 impl_fmt_display!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], v, self);
578 impl_fmt_display!(sink_name, v, self);
579 if let Some(into_table) = &self.into_table_name {
580 impl_fmt_display!([Keyword::INTO], v);
581 impl_fmt_display!([into_table], v);
582 if !self.columns.is_empty() {
583 v.push(format!("({})", display_comma_separated(&self.columns)));
584 }
585 }
586 impl_fmt_display!(sink_from, v, self);
587 if let Some(ref emit_mode) = self.emit_mode {
588 v.push(format!("EMIT {}", emit_mode));
589 }
590 impl_fmt_display!(with_properties, v, self);
591 if let Some(schema) = &self.sink_schema {
592 v.push(format!("{}", schema));
593 }
594 v.iter().join(" ").fmt(f)
595 }
596}
597
598#[derive(Debug, Clone, PartialEq, Eq, Hash)]
606pub struct CreateSubscriptionStatement {
607 pub if_not_exists: bool,
608 pub subscription_name: ObjectName,
609 pub with_properties: WithProperties,
610 pub subscription_from: ObjectName,
611 }
613
614impl ParseTo for CreateSubscriptionStatement {
615 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
616 impl_parse_to!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], p);
617 impl_parse_to!(subscription_name: ObjectName, p);
618
619 let subscription_from = if p.parse_keyword(Keyword::FROM) {
620 impl_parse_to!(from_name: ObjectName, p);
621 from_name
622 } else {
623 p.expected("FROM after CREATE SUBSCRIPTION subscription_name")?
624 };
625
626 if !p.peek_nth_any_of_keywords(0, &[Keyword::WITH]) {
631 p.expected("WITH")?
632 }
633 impl_parse_to!(with_properties: WithProperties, p);
634
635 if with_properties.0.is_empty() {
636 parser_err!("subscription properties not provided");
637 }
638
639 Ok(Self {
640 if_not_exists,
641 subscription_name,
642 with_properties,
643 subscription_from,
644 })
645 }
646}
647
648impl fmt::Display for CreateSubscriptionStatement {
649 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
650 let mut v: Vec<String> = vec![];
651 impl_fmt_display!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], v, self);
652 impl_fmt_display!(subscription_name, v, self);
653 v.push(format!("FROM {}", self.subscription_from));
654 impl_fmt_display!(with_properties, v, self);
655 v.iter().join(" ").fmt(f)
656 }
657}
658
659#[derive(Debug, Clone, PartialEq, Eq, Hash)]
660pub enum DeclareCursor {
661 Query(Box<Query>),
662 Subscription(ObjectName, Since),
663}
664
665impl fmt::Display for DeclareCursor {
666 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
667 let mut v: Vec<String> = vec![];
668 match self {
669 DeclareCursor::Query(query) => v.push(format!("{}", query.as_ref())),
670 DeclareCursor::Subscription(name, since) => {
671 v.push(format!("{}", name));
672 v.push(format!("{:?}", since));
673 }
674 }
675 v.iter().join(" ").fmt(f)
676 }
677}
678
679#[derive(Debug, Clone, PartialEq, Eq, Hash)]
689pub struct DeclareCursorStatement {
690 pub cursor_name: Ident,
691 pub declare_cursor: DeclareCursor,
692}
693
694impl ParseTo for DeclareCursorStatement {
695 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
696 let cursor_name = p.parse_identifier_non_reserved()?;
697
698 let declare_cursor = if !p.parse_keyword(Keyword::SUBSCRIPTION) {
699 p.expect_keyword(Keyword::CURSOR)?;
700 p.expect_keyword(Keyword::FOR)?;
701 DeclareCursor::Query(Box::new(p.parse_query()?))
702 } else {
703 p.expect_keyword(Keyword::CURSOR)?;
704 p.expect_keyword(Keyword::FOR)?;
705 let cursor_for_name = p.parse_object_name()?;
706 let rw_timestamp = p.parse_since()?;
707 DeclareCursor::Subscription(cursor_for_name, rw_timestamp)
708 };
709
710 Ok(Self {
711 cursor_name,
712 declare_cursor,
713 })
714 }
715}
716
717impl fmt::Display for DeclareCursorStatement {
718 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
719 let mut v: Vec<String> = vec![];
720 impl_fmt_display!(cursor_name, v, self);
721 match &self.declare_cursor {
722 DeclareCursor::Query(_) => {
723 v.push("CURSOR FOR ".to_owned());
724 }
725 DeclareCursor::Subscription { .. } => {
726 v.push("SUBSCRIPTION CURSOR FOR ".to_owned());
727 }
728 }
729 impl_fmt_display!(declare_cursor, v, self);
730 v.iter().join(" ").fmt(f)
731 }
732}
733
734#[derive(Debug, Clone, PartialEq, Eq, Hash)]
738pub struct FetchCursorStatement {
739 pub cursor_name: Ident,
740 pub count: u32,
741 pub with_properties: WithProperties,
742}
743
744impl ParseTo for FetchCursorStatement {
745 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
746 let count = if p.parse_keyword(Keyword::NEXT) {
747 1
748 } else {
749 literal_u32(p)?
750 };
751 p.expect_keyword(Keyword::FROM)?;
752 let cursor_name = p.parse_identifier_non_reserved()?;
753 impl_parse_to!(with_properties: WithProperties, p);
754
755 Ok(Self {
756 cursor_name,
757 count,
758 with_properties,
759 })
760 }
761}
762
763impl fmt::Display for FetchCursorStatement {
764 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
765 let mut v: Vec<String> = vec![];
766 if self.count == 1 {
767 v.push("NEXT ".to_owned());
768 } else {
769 impl_fmt_display!(count, v, self);
770 }
771 v.push("FROM ".to_owned());
772 impl_fmt_display!(cursor_name, v, self);
773 v.iter().join(" ").fmt(f)
774 }
775}
776
777#[derive(Debug, Clone, PartialEq, Eq, Hash)]
781pub struct CloseCursorStatement {
782 pub cursor_name: Option<Ident>,
783}
784
785impl ParseTo for CloseCursorStatement {
786 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
787 let cursor_name = if p.parse_keyword(Keyword::ALL) {
788 None
789 } else {
790 Some(p.parse_identifier_non_reserved()?)
791 };
792
793 Ok(Self { cursor_name })
794 }
795}
796
797impl fmt::Display for CloseCursorStatement {
798 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
799 let mut v: Vec<String> = vec![];
800 if let Some(cursor_name) = &self.cursor_name {
801 v.push(format!("{}", cursor_name));
802 } else {
803 v.push("ALL".to_owned());
804 }
805 v.iter().join(" ").fmt(f)
806 }
807}
808
809#[derive(Debug, Clone, PartialEq, Eq, Hash)]
815pub struct CreateConnectionStatement {
816 pub if_not_exists: bool,
817 pub connection_name: ObjectName,
818 pub with_properties: WithProperties,
819}
820
821impl ParseTo for CreateConnectionStatement {
822 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
823 impl_parse_to!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], p);
824 impl_parse_to!(connection_name: ObjectName, p);
825 impl_parse_to!(with_properties: WithProperties, p);
826 if with_properties.0.is_empty() {
827 parser_err!("connection properties not provided");
828 }
829
830 Ok(Self {
831 if_not_exists,
832 connection_name,
833 with_properties,
834 })
835 }
836}
837
838impl fmt::Display for CreateConnectionStatement {
839 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
840 let mut v: Vec<String> = vec![];
841 impl_fmt_display!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], v, self);
842 impl_fmt_display!(connection_name, v, self);
843 impl_fmt_display!(with_properties, v, self);
844 v.iter().join(" ").fmt(f)
845 }
846}
847
848#[derive(Debug, Clone, PartialEq, Eq, Hash)]
849pub struct CreateSecretStatement {
850 pub if_not_exists: bool,
851 pub secret_name: ObjectName,
852 pub credential: Value,
853 pub with_properties: WithProperties,
854}
855
856impl ParseTo for CreateSecretStatement {
857 fn parse_to(parser: &mut Parser<'_>) -> ModalResult<Self> {
858 impl_parse_to!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], parser);
859 impl_parse_to!(secret_name: ObjectName, parser);
860 impl_parse_to!(with_properties: WithProperties, parser);
861 let mut credential = Value::Null;
862 if parser.parse_keyword(Keyword::AS) {
863 credential = parser.ensure_parse_value()?;
864 }
865 Ok(Self {
866 if_not_exists,
867 secret_name,
868 credential,
869 with_properties,
870 })
871 }
872}
873
874impl fmt::Display for CreateSecretStatement {
875 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
876 let mut v: Vec<String> = vec![];
877 impl_fmt_display!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], v, self);
878 impl_fmt_display!(secret_name, v, self);
879 impl_fmt_display!(with_properties, v, self);
880 if self.credential != Value::Null {
881 v.push("AS".to_owned());
882 impl_fmt_display!(credential, v, self);
883 }
884 v.iter().join(" ").fmt(f)
885 }
886}
887
888#[derive(Debug, Clone, PartialEq, Eq, Hash)]
889pub struct WithProperties(pub Vec<SqlOption>);
890
891impl ParseTo for WithProperties {
892 fn parse_to(parser: &mut Parser<'_>) -> ModalResult<Self> {
893 Ok(Self(
894 parser.parse_options_with_preceding_keyword(Keyword::WITH)?,
895 ))
896 }
897}
898
899impl fmt::Display for WithProperties {
900 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
901 if !self.0.is_empty() {
902 write!(f, "WITH ({})", display_comma_separated(self.0.as_slice()))
903 } else {
904 Ok(())
905 }
906 }
907}
908
909#[derive(Debug, Clone, PartialEq, Eq, Hash)]
910pub enum Since {
911 TimestampMsNum(u64),
912 ProcessTime,
913 Begin,
914 Full,
915}
916
917impl fmt::Display for Since {
918 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
919 use Since::*;
920 match self {
921 TimestampMsNum(ts) => write!(f, " SINCE {}", ts),
922 ProcessTime => write!(f, " SINCE PROCTIME()"),
923 Begin => write!(f, " SINCE BEGIN()"),
924 Full => write!(f, " FULL"),
925 }
926 }
927}
928
929#[derive(Debug, Clone, PartialEq, Eq, Hash)]
930pub struct RowSchemaLocation {
931 pub value: AstString,
932}
933
934impl ParseTo for RowSchemaLocation {
935 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
936 impl_parse_to!([Keyword::ROW, Keyword::SCHEMA, Keyword::LOCATION], p);
937 impl_parse_to!(value: AstString, p);
938 Ok(Self { value })
939 }
940}
941
942impl fmt::Display for RowSchemaLocation {
943 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
944 let mut v = vec![];
945 impl_fmt_display!([Keyword::ROW, Keyword::SCHEMA, Keyword::LOCATION], v);
946 impl_fmt_display!(value, v, self);
947 v.iter().join(" ").fmt(f)
948 }
949}
950
951#[derive(Debug, Clone, PartialEq, Eq, Hash)]
954pub struct AstString(pub String);
955
956impl ParseTo for AstString {
957 fn parse_to(parser: &mut Parser<'_>) -> ModalResult<Self> {
958 Ok(Self(parser.parse_literal_string()?))
959 }
960}
961
962impl fmt::Display for AstString {
963 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
964 write!(f, "'{}'", self.0)
965 }
966}
967
968#[derive(Debug, Clone, PartialEq, Eq, Hash)]
971pub enum AstOption<T> {
972 None,
974 Some(T),
976}
977
978impl<T: ParseTo> ParseTo for AstOption<T> {
979 fn parse_to(parser: &mut Parser<'_>) -> ModalResult<Self> {
980 match T::parse_to(parser) {
981 Ok(t) => Ok(AstOption::Some(t)),
982 Err(_) => Ok(AstOption::None),
983 }
984 }
985}
986
987impl<T: fmt::Display> fmt::Display for AstOption<T> {
988 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
989 match &self {
990 AstOption::Some(t) => t.fmt(f),
991 AstOption::None => Ok(()),
992 }
993 }
994}
995
996impl<T> From<AstOption<T>> for Option<T> {
997 fn from(val: AstOption<T>) -> Self {
998 match val {
999 AstOption::Some(t) => Some(t),
1000 AstOption::None => None,
1001 }
1002 }
1003}
1004
1005#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1006pub struct CreateUserStatement {
1007 pub user_name: ObjectName,
1008 pub with_options: UserOptions,
1009}
1010
1011#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1012pub struct AlterUserStatement {
1013 pub user_name: ObjectName,
1014 pub mode: AlterUserMode,
1015}
1016
1017#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1018pub enum AlterUserMode {
1019 Options(UserOptions),
1020 Rename(ObjectName),
1021}
1022
1023#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1024pub enum UserOption {
1025 SuperUser,
1026 NoSuperUser,
1027 CreateDB,
1028 NoCreateDB,
1029 CreateUser,
1030 NoCreateUser,
1031 Login,
1032 NoLogin,
1033 Admin,
1034 NoAdmin,
1035 EncryptedPassword(AstString),
1036 Password(Option<AstString>),
1037 OAuth(Vec<SqlOption>),
1038}
1039
1040impl fmt::Display for UserOption {
1041 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1042 match self {
1043 UserOption::SuperUser => write!(f, "SUPERUSER"),
1044 UserOption::NoSuperUser => write!(f, "NOSUPERUSER"),
1045 UserOption::CreateDB => write!(f, "CREATEDB"),
1046 UserOption::NoCreateDB => write!(f, "NOCREATEDB"),
1047 UserOption::CreateUser => write!(f, "CREATEUSER"),
1048 UserOption::NoCreateUser => write!(f, "NOCREATEUSER"),
1049 UserOption::Login => write!(f, "LOGIN"),
1050 UserOption::NoLogin => write!(f, "NOLOGIN"),
1051 UserOption::Admin => write!(f, "ADMIN"),
1052 UserOption::NoAdmin => write!(f, "NOADMIN"),
1053 UserOption::EncryptedPassword(p) => write!(f, "ENCRYPTED PASSWORD {}", p),
1054 UserOption::Password(None) => write!(f, "PASSWORD NULL"),
1055 UserOption::Password(Some(p)) => write!(f, "PASSWORD {}", p),
1056 UserOption::OAuth(options) => {
1057 write!(f, "({})", display_comma_separated(options.as_slice()))
1058 }
1059 }
1060 }
1061}
1062
1063#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1064pub struct UserOptions(pub Vec<UserOption>);
1065
1066#[derive(Default)]
1067struct UserOptionsBuilder {
1068 super_user: Option<UserOption>,
1069 create_db: Option<UserOption>,
1070 create_user: Option<UserOption>,
1071 login: Option<UserOption>,
1072 admin: Option<UserOption>,
1073 password: Option<UserOption>,
1074}
1075
1076impl UserOptionsBuilder {
1077 fn build(self) -> UserOptions {
1078 let mut options = vec![];
1079 if let Some(option) = self.super_user {
1080 options.push(option);
1081 }
1082 if let Some(option) = self.create_db {
1083 options.push(option);
1084 }
1085 if let Some(option) = self.create_user {
1086 options.push(option);
1087 }
1088 if let Some(option) = self.login {
1089 options.push(option);
1090 }
1091 if let Some(option) = self.admin {
1092 options.push(option);
1093 }
1094 if let Some(option) = self.password {
1095 options.push(option);
1096 }
1097 UserOptions(options)
1098 }
1099}
1100
1101impl ParseTo for UserOptions {
1102 fn parse_to(parser: &mut Parser<'_>) -> ModalResult<Self> {
1103 let mut builder = UserOptionsBuilder::default();
1104 let add_option = |item: &mut Option<UserOption>, user_option| {
1105 let old_value = item.replace(user_option);
1106 if old_value.is_some() {
1107 parser_err!("conflicting or redundant options");
1108 }
1109 Ok(())
1110 };
1111 let _ = parser.parse_keyword(Keyword::WITH);
1112 loop {
1113 let token = parser.peek_token();
1114 if token == Token::EOF || token == Token::SemiColon {
1115 break;
1116 }
1117
1118 let option = parser.parse_identifier()?;
1119 let s = option.real_value();
1120 let (item_mut_ref, user_option) = match &s[..] {
1121 "superuser" => (&mut builder.super_user, UserOption::SuperUser),
1122 "nosuperuser" => (&mut builder.super_user, UserOption::NoSuperUser),
1123 "createdb" => (&mut builder.create_db, UserOption::CreateDB),
1124 "nocreatedb" => (&mut builder.create_db, UserOption::NoCreateDB),
1125 "createuser" => (&mut builder.create_user, UserOption::CreateUser),
1126 "nocreateuser" => (&mut builder.create_user, UserOption::NoCreateUser),
1127 "login" => (&mut builder.login, UserOption::Login),
1128 "nologin" => (&mut builder.login, UserOption::NoLogin),
1129 "admin" => (&mut builder.admin, UserOption::Admin),
1130 "noadmin" => (&mut builder.admin, UserOption::NoAdmin),
1131 "password" => {
1132 if parser.parse_keyword(Keyword::NULL) {
1133 (&mut builder.password, UserOption::Password(None))
1134 } else {
1135 (
1136 &mut builder.password,
1137 UserOption::Password(Some(AstString::parse_to(parser)?)),
1138 )
1139 }
1140 }
1141 "encrypted" => {
1142 let option = parser.parse_identifier()?;
1143 let s = option.real_value();
1144 if s != "password" {
1145 parser_err!("expected PASSWORD after ENCRYPTED, found {}", option);
1146 }
1147 (
1148 &mut builder.password,
1149 UserOption::EncryptedPassword(AstString::parse_to(parser)?),
1150 )
1151 }
1152 "oauth" => {
1153 let options = parser.parse_options()?;
1154 (&mut builder.password, UserOption::OAuth(options))
1155 }
1156 _ => {
1157 parser_err!("unexpected user option: {}", option);
1158 }
1159 };
1160 add_option(item_mut_ref, user_option)?;
1161 }
1162 Ok(builder.build())
1163 }
1164}
1165
1166impl fmt::Display for UserOptions {
1167 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1168 if !self.0.is_empty() {
1169 write!(f, "WITH {}", display_separated(self.0.as_slice(), " "))
1170 } else {
1171 Ok(())
1172 }
1173 }
1174}
1175
1176impl ParseTo for CreateUserStatement {
1177 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
1178 impl_parse_to!(user_name: ObjectName, p);
1179 impl_parse_to!(with_options: UserOptions, p);
1180
1181 Ok(CreateUserStatement {
1182 user_name,
1183 with_options,
1184 })
1185 }
1186}
1187
1188impl fmt::Display for CreateUserStatement {
1189 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1190 let mut v: Vec<String> = vec![];
1191 impl_fmt_display!(user_name, v, self);
1192 impl_fmt_display!(with_options, v, self);
1193 v.iter().join(" ").fmt(f)
1194 }
1195}
1196
1197impl fmt::Display for AlterUserMode {
1198 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1199 match self {
1200 AlterUserMode::Options(options) => {
1201 write!(f, "{}", options)
1202 }
1203 AlterUserMode::Rename(new_name) => {
1204 write!(f, "RENAME TO {}", new_name)
1205 }
1206 }
1207 }
1208}
1209
1210impl fmt::Display for AlterUserStatement {
1211 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1212 let mut v: Vec<String> = vec![];
1213 impl_fmt_display!(user_name, v, self);
1214 impl_fmt_display!(mode, v, self);
1215 v.iter().join(" ").fmt(f)
1216 }
1217}
1218
1219impl ParseTo for AlterUserStatement {
1220 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
1221 impl_parse_to!(user_name: ObjectName, p);
1222 impl_parse_to!(mode: AlterUserMode, p);
1223
1224 Ok(AlterUserStatement { user_name, mode })
1225 }
1226}
1227
1228impl ParseTo for AlterUserMode {
1229 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
1230 if p.parse_keyword(Keyword::RENAME) {
1231 p.expect_keyword(Keyword::TO)?;
1232 impl_parse_to!(new_name: ObjectName, p);
1233 Ok(AlterUserMode::Rename(new_name))
1234 } else {
1235 impl_parse_to!(with_options: UserOptions, p);
1236 Ok(AlterUserMode::Options(with_options))
1237 }
1238 }
1239}
1240
1241#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1242pub struct DropStatement {
1243 pub object_type: ObjectType,
1245 pub if_exists: bool,
1247 pub object_name: ObjectName,
1249 pub drop_mode: AstOption<DropMode>,
1252}
1253
1254impl ParseTo for DropStatement {
1261 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
1262 impl_parse_to!(object_type: ObjectType, p);
1263 impl_parse_to!(if_exists => [Keyword::IF, Keyword::EXISTS], p);
1264 let object_name = p.parse_object_name()?;
1265 impl_parse_to!(drop_mode: AstOption<DropMode>, p);
1266 Ok(Self {
1267 object_type,
1268 if_exists,
1269 object_name,
1270 drop_mode,
1271 })
1272 }
1273}
1274
1275impl fmt::Display for DropStatement {
1276 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1277 let mut v: Vec<String> = vec![];
1278 impl_fmt_display!(object_type, v, self);
1279 impl_fmt_display!(if_exists => [Keyword::IF, Keyword::EXISTS], v, self);
1280 impl_fmt_display!(object_name, v, self);
1281 impl_fmt_display!(drop_mode, v, self);
1282 v.iter().join(" ").fmt(f)
1283 }
1284}
1285
1286#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1287pub enum DropMode {
1288 Cascade,
1289 Restrict,
1290}
1291
1292impl ParseTo for DropMode {
1293 fn parse_to(parser: &mut Parser<'_>) -> ModalResult<Self> {
1294 let drop_mode = if parser.parse_keyword(Keyword::CASCADE) {
1295 DropMode::Cascade
1296 } else if parser.parse_keyword(Keyword::RESTRICT) {
1297 DropMode::Restrict
1298 } else {
1299 return parser.expected("CASCADE | RESTRICT");
1300 };
1301 Ok(drop_mode)
1302 }
1303}
1304
1305impl fmt::Display for DropMode {
1306 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1307 f.write_str(match self {
1308 DropMode::Cascade => "CASCADE",
1309 DropMode::Restrict => "RESTRICT",
1310 })
1311 }
1312}