1use std::fmt;
16use std::fmt::{Formatter, Write};
17
18use itertools::Itertools;
19use winnow::ModalResult;
20
21use super::ddl::SourceWatermark;
22use super::legacy_source::{CompatibleFormatEncode, parse_format_encode};
23use super::{EmitMode, Ident, ObjectType, Query, Value};
24use crate::ast::{
25 ColumnDef, ObjectName, SqlOption, TableConstraint, display_comma_separated, display_separated,
26};
27use crate::keywords::Keyword;
28use crate::parser::{IncludeOption, IsOptional, Parser};
29use crate::parser_err;
30use crate::parser_v2::literal_u32;
31use crate::tokenizer::Token;
32
33pub trait ParseTo: Sized {
35 fn parse_to(parser: &mut Parser<'_>) -> ModalResult<Self>;
36}
37
38#[macro_export]
39macro_rules! impl_parse_to {
40 () => {};
41 ($field:ident : $field_type:ty, $parser:ident) => {
42 let $field = <$field_type>::parse_to($parser)?;
43 };
44 ($field:ident => [$($arr:tt)+], $parser:ident) => {
45 let $field = $parser.parse_keywords(&[$($arr)+]);
46 };
47 ([$($arr:tt)+], $parser:ident) => {
48 $parser.expect_keywords(&[$($arr)+])?;
49 };
50}
51
52#[macro_export]
53macro_rules! impl_fmt_display {
54 () => {};
55 ($field:ident, $v:ident, $self:ident) => {{
56 let s = format!("{}", $self.$field);
57 if !s.is_empty() {
58 $v.push(s);
59 }
60 }};
61 ($field:ident => [$($arr:tt)+], $v:ident, $self:ident) => {
62 if $self.$field {
63 $v.push(format!("{}", display_separated(&[$($arr)+], " ")));
64 }
65 };
66 ([$($arr:tt)+], $v:ident) => {
67 $v.push(format!("{}", display_separated(&[$($arr)+], " ")));
68 };
69}
70
71#[derive(Debug, Clone, PartialEq, Eq, Hash)]
80pub struct CreateSourceStatement {
81 pub temporary: bool,
82 pub if_not_exists: bool,
83 pub columns: Vec<ColumnDef>,
84 pub wildcard_idx: Option<usize>,
86 pub constraints: Vec<TableConstraint>,
87 pub source_name: ObjectName,
88 pub with_properties: WithProperties,
89 pub format_encode: CompatibleFormatEncode,
90 pub source_watermarks: Vec<SourceWatermark>,
91 pub include_column_options: IncludeOption,
92}
93
94#[derive(Debug, Clone, PartialEq, Eq, Hash)]
98pub enum Format {
99 Native,
102 None,
104 Debezium,
106 DebeziumMongo,
108 Maxwell,
110 Canal,
112 Upsert,
114 Plain,
116}
117
118impl fmt::Display for Format {
120 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
121 write!(
122 f,
123 "{}",
124 match self {
125 Format::Native => "NATIVE",
126 Format::Debezium => "DEBEZIUM",
127 Format::DebeziumMongo => "DEBEZIUM_MONGO",
128 Format::Maxwell => "MAXWELL",
129 Format::Canal => "CANAL",
130 Format::Upsert => "UPSERT",
131 Format::Plain => "PLAIN",
132 Format::None => "NONE",
133 }
134 )
135 }
136}
137
138impl Format {
139 pub fn from_keyword(s: &str) -> ModalResult<Self> {
140 Ok(match s {
141 "DEBEZIUM" => Format::Debezium,
142 "DEBEZIUM_MONGO" => Format::DebeziumMongo,
143 "MAXWELL" => Format::Maxwell,
144 "CANAL" => Format::Canal,
145 "PLAIN" => Format::Plain,
146 "UPSERT" => Format::Upsert,
147 "NATIVE" => Format::Native,
148 "NONE" => Format::None,
149 _ => parser_err!(
150 "expected CANAL | PROTOBUF | DEBEZIUM | MAXWELL | PLAIN | NATIVE | NONE after FORMAT"
151 ),
152 })
153 }
154}
155
156#[derive(Debug, Clone, PartialEq, Eq, Hash)]
158pub enum Encode {
159 Avro, Csv, Protobuf, Json, Bytes, None,
166 Text, Native,
170 Template,
171 Parquet,
172}
173
174impl fmt::Display for Encode {
176 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
177 write!(
178 f,
179 "{}",
180 match self {
181 Encode::Avro => "AVRO",
182 Encode::Csv => "CSV",
183 Encode::Protobuf => "PROTOBUF",
184 Encode::Json => "JSON",
185 Encode::Bytes => "BYTES",
186 Encode::Native => "NATIVE",
187 Encode::Template => "TEMPLATE",
188 Encode::None => "NONE",
189 Encode::Parquet => "PARQUET",
190 Encode::Text => "TEXT",
191 }
192 )
193 }
194}
195
196impl Encode {
197 pub fn from_keyword(s: &str) -> ModalResult<Self> {
198 Ok(match s {
199 "AVRO" => Encode::Avro,
200 "TEXT" => Encode::Text,
201 "BYTES" => Encode::Bytes,
202 "CSV" => Encode::Csv,
203 "PROTOBUF" => Encode::Protobuf,
204 "JSON" => Encode::Json,
205 "TEMPLATE" => Encode::Template,
206 "PARQUET" => Encode::Parquet,
207 "NATIVE" => Encode::Native,
208 "NONE" => Encode::None,
209 _ => parser_err!(
210 "expected AVRO | BYTES | CSV | PROTOBUF | JSON | NATIVE | TEMPLATE | PARQUET | NONE after Encode"
211 ),
212 })
213 }
214}
215
216#[derive(Debug, Clone, PartialEq, Eq, Hash)]
218pub struct FormatEncodeOptions {
219 pub format: Format,
220 pub row_encode: Encode,
221 pub row_options: Vec<SqlOption>,
222
223 pub key_encode: Option<Encode>,
224}
225
226impl Parser<'_> {
227 fn peek_format_encode_format(&mut self) -> bool {
229 (self.peek_nth_any_of_keywords(0, &[Keyword::ROW])
230 && self.peek_nth_any_of_keywords(1, &[Keyword::FORMAT])) || self.peek_nth_any_of_keywords(0, &[Keyword::FORMAT]) }
233
234 pub fn parse_format_encode_with_connector(
236 &mut self,
237 connector: &str,
238 cdc_source_job: bool,
239 ) -> ModalResult<CompatibleFormatEncode> {
240 if connector.contains("-cdc") {
245 let expected = if cdc_source_job {
246 FormatEncodeOptions::plain_json()
247 } else if connector.contains("mongodb") {
248 FormatEncodeOptions::debezium_mongo_json()
249 } else {
250 FormatEncodeOptions::debezium_json()
251 };
252
253 if self.peek_format_encode_format() {
254 let schema = parse_format_encode(self)?.into_v2();
255 if schema != expected {
256 parser_err!(
257 "Row format for CDC connectors should be \
258 either omitted or set to `{expected}`",
259 );
260 }
261 }
262 Ok(expected.into())
263 } else if connector.contains("nexmark") {
264 let expected = FormatEncodeOptions::native();
265 if self.peek_format_encode_format() {
266 let schema = parse_format_encode(self)?.into_v2();
267 if schema != expected {
268 parser_err!(
269 "Row format for nexmark connectors should be \
270 either omitted or set to `{expected}`",
271 );
272 }
273 }
274 Ok(expected.into())
275 } else if connector.contains("datagen") {
276 Ok(if self.peek_format_encode_format() {
277 parse_format_encode(self)?
278 } else {
279 FormatEncodeOptions::native().into()
280 })
281 } else if connector.contains("iceberg") {
282 let expected = FormatEncodeOptions::none();
283 if self.peek_format_encode_format() {
284 let schema = parse_format_encode(self)?.into_v2();
285 if schema != expected {
286 parser_err!(
287 "Row format for iceberg connectors should be \
288 either omitted or set to `{expected}`",
289 );
290 }
291 }
292 Ok(expected.into())
293 } else if connector.contains("webhook") {
294 parser_err!(
295 "Source with webhook connector is not supported. \
296 Please use the `CREATE TABLE ... WITH ...` statement instead.",
297 );
298 } else {
299 Ok(parse_format_encode(self)?)
300 }
301 }
302
303 pub fn parse_schema(&mut self) -> ModalResult<Option<FormatEncodeOptions>> {
305 if !self.parse_keyword(Keyword::FORMAT) {
306 return Ok(None);
307 }
308
309 let id = self.parse_identifier()?;
310 let s = id.value.to_ascii_uppercase();
311 let format = Format::from_keyword(&s)?;
312 self.expect_keyword(Keyword::ENCODE)?;
313 let id = self.parse_identifier()?;
314 let s = id.value.to_ascii_uppercase();
315 let row_encode = Encode::from_keyword(&s)?;
316 let row_options = self.parse_options()?;
317
318 let key_encode = if self.parse_keywords(&[Keyword::KEY, Keyword::ENCODE]) {
319 Some(Encode::from_keyword(
320 self.parse_identifier()?.value.to_ascii_uppercase().as_str(),
321 )?)
322 } else {
323 None
324 };
325
326 Ok(Some(FormatEncodeOptions {
327 format,
328 row_encode,
329 row_options,
330 key_encode,
331 }))
332 }
333}
334
335impl FormatEncodeOptions {
336 pub const fn plain_json() -> Self {
337 FormatEncodeOptions {
338 format: Format::Plain,
339 row_encode: Encode::Json,
340 row_options: Vec::new(),
341 key_encode: None,
342 }
343 }
344
345 pub const fn debezium_json() -> Self {
347 FormatEncodeOptions {
348 format: Format::Debezium,
349 row_encode: Encode::Json,
350 row_options: Vec::new(),
351 key_encode: None,
352 }
353 }
354
355 pub const fn debezium_mongo_json() -> Self {
356 FormatEncodeOptions {
357 format: Format::DebeziumMongo,
358 row_encode: Encode::Json,
359 row_options: Vec::new(),
360 key_encode: None,
361 }
362 }
363
364 pub const fn native() -> Self {
366 FormatEncodeOptions {
367 format: Format::Native,
368 row_encode: Encode::Native,
369 row_options: Vec::new(),
370 key_encode: None,
371 }
372 }
373
374 pub const fn none() -> Self {
377 FormatEncodeOptions {
378 format: Format::None,
379 row_encode: Encode::None,
380 row_options: Vec::new(),
381 key_encode: None,
382 }
383 }
384
385 pub fn row_options(&self) -> &[SqlOption] {
386 self.row_options.as_ref()
387 }
388}
389
390impl fmt::Display for FormatEncodeOptions {
391 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
392 write!(f, "FORMAT {} ENCODE {}", self.format, self.row_encode)?;
393
394 if !self.row_options().is_empty() {
395 write!(f, " ({})", display_comma_separated(self.row_options()))?;
396 }
397
398 if let Some(key_encode) = &self.key_encode {
399 write!(f, " KEY ENCODE {}", key_encode)?;
400 }
401
402 Ok(())
403 }
404}
405
406pub(super) fn fmt_create_items(
407 columns: &[ColumnDef],
408 constraints: &[TableConstraint],
409 watermarks: &[SourceWatermark],
410 wildcard_idx: Option<usize>,
411) -> std::result::Result<String, fmt::Error> {
412 let mut items = String::new();
413 let has_items = !columns.is_empty()
414 || !constraints.is_empty()
415 || !watermarks.is_empty()
416 || wildcard_idx.is_some();
417 has_items.then(|| write!(&mut items, "("));
418
419 if let Some(wildcard_idx) = wildcard_idx {
420 let (columns_l, columns_r) = columns.split_at(wildcard_idx);
421 write!(&mut items, "{}", display_comma_separated(columns_l))?;
422 if !columns_l.is_empty() {
423 write!(&mut items, ", ")?;
424 }
425 write!(&mut items, "{}", Token::Mul)?;
426 if !columns_r.is_empty() {
427 write!(&mut items, ", ")?;
428 }
429 write!(&mut items, "{}", display_comma_separated(columns_r))?;
430 } else {
431 write!(&mut items, "{}", display_comma_separated(columns))?;
432 }
433 let mut leading_items = !columns.is_empty() || wildcard_idx.is_some();
434
435 if leading_items && !constraints.is_empty() {
436 write!(&mut items, ", ")?;
437 }
438 write!(&mut items, "{}", display_comma_separated(constraints))?;
439 leading_items |= !constraints.is_empty();
440
441 if leading_items && !watermarks.is_empty() {
442 write!(&mut items, ", ")?;
443 }
444 write!(&mut items, "{}", display_comma_separated(watermarks))?;
445 has_items.then(|| write!(&mut items, ")"));
449 Ok(items)
450}
451
452impl fmt::Display for CreateSourceStatement {
453 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
454 let mut v: Vec<String> = vec![];
455 impl_fmt_display!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], v, self);
456 impl_fmt_display!(source_name, v, self);
457
458 let items = fmt_create_items(
459 &self.columns,
460 &self.constraints,
461 &self.source_watermarks,
462 self.wildcard_idx,
463 )?;
464 if !items.is_empty() {
465 v.push(items);
466 }
467
468 for item in &self.include_column_options {
469 v.push(format!("{}", item));
470 }
471
472 let is_cdc_source = self.with_properties.0.iter().any(|option| {
474 option.name.real_value().eq_ignore_ascii_case("connector")
475 && option.value.to_string().contains("cdc")
476 });
477
478 impl_fmt_display!(with_properties, v, self);
479 if !is_cdc_source {
480 impl_fmt_display!(format_encode, v, self);
481 }
482 v.iter().join(" ").fmt(f)
483 }
484}
485
486#[derive(Debug, Clone, PartialEq, Eq, Hash)]
487pub enum CreateSink {
488 From(ObjectName),
489 AsQuery(Box<Query>),
490}
491
492impl fmt::Display for CreateSink {
493 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
494 match self {
495 Self::From(mv) => write!(f, "FROM {}", mv),
496 Self::AsQuery(query) => write!(f, "AS {}", query),
497 }
498 }
499}
500#[derive(Debug, Clone, PartialEq, Eq, Hash)]
508pub struct CreateSinkStatement {
509 pub if_not_exists: bool,
510 pub sink_name: ObjectName,
511 pub with_properties: WithProperties,
512 pub sink_from: CreateSink,
513
514 pub columns: Vec<Ident>,
517 pub emit_mode: Option<EmitMode>,
518 pub sink_schema: Option<FormatEncodeOptions>,
519 pub into_table_name: Option<ObjectName>,
520}
521
522impl ParseTo for CreateSinkStatement {
523 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
524 impl_parse_to!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], p);
525 impl_parse_to!(sink_name: ObjectName, p);
526
527 let mut target_spec_columns = Vec::new();
528 let into_table_name = if p.parse_keyword(Keyword::INTO) {
529 impl_parse_to!(into_table_name: ObjectName, p);
530
531 target_spec_columns = p.parse_parenthesized_column_list(IsOptional::Optional)?;
533 Some(into_table_name)
534 } else {
535 None
536 };
537
538 let sink_from = if p.parse_keyword(Keyword::FROM) {
539 impl_parse_to!(from_name: ObjectName, p);
540 CreateSink::From(from_name)
541 } else if p.parse_keyword(Keyword::AS) {
542 let query = Box::new(p.parse_query()?);
543 CreateSink::AsQuery(query)
544 } else {
545 p.expected("FROM or AS after CREATE SINK sink_name")?
546 };
547
548 let emit_mode: Option<EmitMode> = p.parse_emit_mode()?;
549
550 if !p.peek_nth_any_of_keywords(0, &[Keyword::WITH]) && into_table_name.is_none() {
553 p.expected("WITH")?
554 }
555 impl_parse_to!(with_properties: WithProperties, p);
556
557 if with_properties.0.is_empty() && into_table_name.is_none() {
558 parser_err!("sink properties not provided");
559 }
560
561 let sink_schema = p.parse_schema()?;
562
563 Ok(Self {
564 if_not_exists,
565 sink_name,
566 with_properties,
567 sink_from,
568 columns: target_spec_columns,
569 emit_mode,
570 sink_schema,
571 into_table_name,
572 })
573 }
574}
575
576impl fmt::Display for CreateSinkStatement {
577 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
578 let mut v: Vec<String> = vec![];
579 impl_fmt_display!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], v, self);
580 impl_fmt_display!(sink_name, v, self);
581 if let Some(into_table) = &self.into_table_name {
582 impl_fmt_display!([Keyword::INTO], v);
583 impl_fmt_display!([into_table], v);
584 if !self.columns.is_empty() {
585 v.push(format!("({})", display_comma_separated(&self.columns)));
586 }
587 }
588 impl_fmt_display!(sink_from, v, self);
589 if let Some(ref emit_mode) = self.emit_mode {
590 v.push(format!("EMIT {}", emit_mode));
591 }
592 impl_fmt_display!(with_properties, v, self);
593 if let Some(schema) = &self.sink_schema {
594 v.push(format!("{}", schema));
595 }
596 v.iter().join(" ").fmt(f)
597 }
598}
599
600#[derive(Debug, Clone, PartialEq, Eq, Hash)]
608pub struct CreateSubscriptionStatement {
609 pub if_not_exists: bool,
610 pub subscription_name: ObjectName,
611 pub with_properties: WithProperties,
612 pub subscription_from: ObjectName,
613 }
615
616impl ParseTo for CreateSubscriptionStatement {
617 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
618 impl_parse_to!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], p);
619 impl_parse_to!(subscription_name: ObjectName, p);
620
621 let subscription_from = if p.parse_keyword(Keyword::FROM) {
622 impl_parse_to!(from_name: ObjectName, p);
623 from_name
624 } else {
625 p.expected("FROM after CREATE SUBSCRIPTION subscription_name")?
626 };
627
628 if !p.peek_nth_any_of_keywords(0, &[Keyword::WITH]) {
633 p.expected("WITH")?
634 }
635 impl_parse_to!(with_properties: WithProperties, p);
636
637 if with_properties.0.is_empty() {
638 parser_err!("subscription properties not provided");
639 }
640
641 Ok(Self {
642 if_not_exists,
643 subscription_name,
644 with_properties,
645 subscription_from,
646 })
647 }
648}
649
650impl fmt::Display for CreateSubscriptionStatement {
651 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
652 let mut v: Vec<String> = vec![];
653 impl_fmt_display!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], v, self);
654 impl_fmt_display!(subscription_name, v, self);
655 v.push(format!("FROM {}", self.subscription_from));
656 impl_fmt_display!(with_properties, v, self);
657 v.iter().join(" ").fmt(f)
658 }
659}
660
661#[derive(Debug, Clone, PartialEq, Eq, Hash)]
662pub enum DeclareCursor {
663 Query(Box<Query>),
664 Subscription(ObjectName, Since),
665}
666
667impl fmt::Display for DeclareCursor {
668 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
669 let mut v: Vec<String> = vec![];
670 match self {
671 DeclareCursor::Query(query) => v.push(format!("{}", query.as_ref())),
672 DeclareCursor::Subscription(name, since) => {
673 v.push(format!("{}", name));
674 v.push(format!("{:?}", since));
675 }
676 }
677 v.iter().join(" ").fmt(f)
678 }
679}
680
681#[derive(Debug, Clone, PartialEq, Eq, Hash)]
691pub struct DeclareCursorStatement {
692 pub cursor_name: Ident,
693 pub declare_cursor: DeclareCursor,
694}
695
696impl ParseTo for DeclareCursorStatement {
697 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
698 let cursor_name = p.parse_identifier_non_reserved()?;
699
700 let declare_cursor = if !p.parse_keyword(Keyword::SUBSCRIPTION) {
701 p.expect_keyword(Keyword::CURSOR)?;
702 p.expect_keyword(Keyword::FOR)?;
703 DeclareCursor::Query(Box::new(p.parse_query()?))
704 } else {
705 p.expect_keyword(Keyword::CURSOR)?;
706 p.expect_keyword(Keyword::FOR)?;
707 let cursor_for_name = p.parse_object_name()?;
708 let rw_timestamp = p.parse_since()?;
709 DeclareCursor::Subscription(cursor_for_name, rw_timestamp)
710 };
711
712 Ok(Self {
713 cursor_name,
714 declare_cursor,
715 })
716 }
717}
718
719impl fmt::Display for DeclareCursorStatement {
720 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
721 let mut v: Vec<String> = vec![];
722 impl_fmt_display!(cursor_name, v, self);
723 match &self.declare_cursor {
724 DeclareCursor::Query(_) => {
725 v.push("CURSOR FOR ".to_owned());
726 }
727 DeclareCursor::Subscription { .. } => {
728 v.push("SUBSCRIPTION CURSOR FOR ".to_owned());
729 }
730 }
731 impl_fmt_display!(declare_cursor, v, self);
732 v.iter().join(" ").fmt(f)
733 }
734}
735
736#[derive(Debug, Clone, PartialEq, Eq, Hash)]
740pub struct FetchCursorStatement {
741 pub cursor_name: Ident,
742 pub count: u32,
743 pub with_properties: WithProperties,
744}
745
746impl ParseTo for FetchCursorStatement {
747 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
748 let count = if p.parse_keyword(Keyword::NEXT) {
749 1
750 } else {
751 literal_u32(p)?
752 };
753 p.expect_keyword(Keyword::FROM)?;
754 let cursor_name = p.parse_identifier_non_reserved()?;
755 impl_parse_to!(with_properties: WithProperties, p);
756
757 Ok(Self {
758 cursor_name,
759 count,
760 with_properties,
761 })
762 }
763}
764
765impl fmt::Display for FetchCursorStatement {
766 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
767 let mut v: Vec<String> = vec![];
768 if self.count == 1 {
769 v.push("NEXT ".to_owned());
770 } else {
771 impl_fmt_display!(count, v, self);
772 }
773 v.push("FROM ".to_owned());
774 impl_fmt_display!(cursor_name, v, self);
775 v.iter().join(" ").fmt(f)
776 }
777}
778
779#[derive(Debug, Clone, PartialEq, Eq, Hash)]
783pub struct CloseCursorStatement {
784 pub cursor_name: Option<Ident>,
785}
786
787impl ParseTo for CloseCursorStatement {
788 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
789 let cursor_name = if p.parse_keyword(Keyword::ALL) {
790 None
791 } else {
792 Some(p.parse_identifier_non_reserved()?)
793 };
794
795 Ok(Self { cursor_name })
796 }
797}
798
799impl fmt::Display for CloseCursorStatement {
800 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
801 let mut v: Vec<String> = vec![];
802 if let Some(cursor_name) = &self.cursor_name {
803 v.push(format!("{}", cursor_name));
804 } else {
805 v.push("ALL".to_owned());
806 }
807 v.iter().join(" ").fmt(f)
808 }
809}
810
811#[derive(Debug, Clone, PartialEq, Eq, Hash)]
817pub struct CreateConnectionStatement {
818 pub if_not_exists: bool,
819 pub connection_name: ObjectName,
820 pub with_properties: WithProperties,
821}
822
823impl ParseTo for CreateConnectionStatement {
824 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
825 impl_parse_to!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], p);
826 impl_parse_to!(connection_name: ObjectName, p);
827 impl_parse_to!(with_properties: WithProperties, p);
828 if with_properties.0.is_empty() {
829 parser_err!("connection properties not provided");
830 }
831
832 Ok(Self {
833 if_not_exists,
834 connection_name,
835 with_properties,
836 })
837 }
838}
839
840impl fmt::Display for CreateConnectionStatement {
841 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
842 let mut v: Vec<String> = vec![];
843 impl_fmt_display!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], v, self);
844 impl_fmt_display!(connection_name, v, self);
845 impl_fmt_display!(with_properties, v, self);
846 v.iter().join(" ").fmt(f)
847 }
848}
849
850#[derive(Debug, Clone, PartialEq, Eq, Hash)]
851pub struct CreateSecretStatement {
852 pub if_not_exists: bool,
853 pub secret_name: ObjectName,
854 pub credential: Value,
855 pub with_properties: WithProperties,
856}
857
858impl ParseTo for CreateSecretStatement {
859 fn parse_to(parser: &mut Parser<'_>) -> ModalResult<Self> {
860 impl_parse_to!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], parser);
861 impl_parse_to!(secret_name: ObjectName, parser);
862 impl_parse_to!(with_properties: WithProperties, parser);
863 let mut credential = Value::Null;
864 if parser.parse_keyword(Keyword::AS) {
865 credential = parser.ensure_parse_value()?;
866 }
867 Ok(Self {
868 if_not_exists,
869 secret_name,
870 credential,
871 with_properties,
872 })
873 }
874}
875
876impl fmt::Display for CreateSecretStatement {
877 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
878 let mut v: Vec<String> = vec![];
879 impl_fmt_display!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], v, self);
880 impl_fmt_display!(secret_name, v, self);
881 impl_fmt_display!(with_properties, v, self);
882 if self.credential != Value::Null {
883 v.push("AS".to_owned());
884 impl_fmt_display!(credential, v, self);
885 }
886 v.iter().join(" ").fmt(f)
887 }
888}
889
890#[derive(Debug, Clone, PartialEq, Eq, Hash)]
891pub struct WithProperties(pub Vec<SqlOption>);
892
893impl ParseTo for WithProperties {
894 fn parse_to(parser: &mut Parser<'_>) -> ModalResult<Self> {
895 Ok(Self(
896 parser.parse_options_with_preceding_keyword(Keyword::WITH)?,
897 ))
898 }
899}
900
901impl fmt::Display for WithProperties {
902 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
903 if !self.0.is_empty() {
904 write!(f, "WITH ({})", display_comma_separated(self.0.as_slice()))
905 } else {
906 Ok(())
907 }
908 }
909}
910
911#[derive(Debug, Clone, PartialEq, Eq, Hash)]
912pub enum Since {
913 TimestampMsNum(u64),
914 ProcessTime,
915 Begin,
916 Full,
917}
918
919impl fmt::Display for Since {
920 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
921 use Since::*;
922 match self {
923 TimestampMsNum(ts) => write!(f, " SINCE {}", ts),
924 ProcessTime => write!(f, " SINCE PROCTIME()"),
925 Begin => write!(f, " SINCE BEGIN()"),
926 Full => write!(f, " FULL"),
927 }
928 }
929}
930
931#[derive(Debug, Clone, PartialEq, Eq, Hash)]
932pub struct RowSchemaLocation {
933 pub value: AstString,
934}
935
936impl ParseTo for RowSchemaLocation {
937 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
938 impl_parse_to!([Keyword::ROW, Keyword::SCHEMA, Keyword::LOCATION], p);
939 impl_parse_to!(value: AstString, p);
940 Ok(Self { value })
941 }
942}
943
944impl fmt::Display for RowSchemaLocation {
945 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
946 let mut v = vec![];
947 impl_fmt_display!([Keyword::ROW, Keyword::SCHEMA, Keyword::LOCATION], v);
948 impl_fmt_display!(value, v, self);
949 v.iter().join(" ").fmt(f)
950 }
951}
952
953#[derive(Debug, Clone, PartialEq, Eq, Hash)]
956pub struct AstString(pub String);
957
958impl ParseTo for AstString {
959 fn parse_to(parser: &mut Parser<'_>) -> ModalResult<Self> {
960 Ok(Self(parser.parse_literal_string()?))
961 }
962}
963
964impl fmt::Display for AstString {
965 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
966 write!(f, "'{}'", self.0)
967 }
968}
969
970#[derive(Debug, Clone, PartialEq, Eq, Hash)]
973pub enum AstOption<T> {
974 None,
976 Some(T),
978}
979
980impl<T: ParseTo> ParseTo for AstOption<T> {
981 fn parse_to(parser: &mut Parser<'_>) -> ModalResult<Self> {
982 match T::parse_to(parser) {
983 Ok(t) => Ok(AstOption::Some(t)),
984 Err(_) => Ok(AstOption::None),
985 }
986 }
987}
988
989impl<T: fmt::Display> fmt::Display for AstOption<T> {
990 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
991 match &self {
992 AstOption::Some(t) => t.fmt(f),
993 AstOption::None => Ok(()),
994 }
995 }
996}
997
998impl<T> From<AstOption<T>> for Option<T> {
999 fn from(val: AstOption<T>) -> Self {
1000 match val {
1001 AstOption::Some(t) => Some(t),
1002 AstOption::None => None,
1003 }
1004 }
1005}
1006
1007#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1008pub struct CreateUserStatement {
1009 pub user_name: ObjectName,
1010 pub with_options: UserOptions,
1011}
1012
1013#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1014pub struct AlterUserStatement {
1015 pub user_name: ObjectName,
1016 pub mode: AlterUserMode,
1017}
1018
1019#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1020pub enum AlterUserMode {
1021 Options(UserOptions),
1022 Rename(ObjectName),
1023}
1024
1025#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1026pub enum UserOption {
1027 SuperUser,
1028 NoSuperUser,
1029 CreateDB,
1030 NoCreateDB,
1031 CreateUser,
1032 NoCreateUser,
1033 Login,
1034 NoLogin,
1035 Admin,
1036 NoAdmin,
1037 EncryptedPassword(AstString),
1038 Password(Option<AstString>),
1039 OAuth(Vec<SqlOption>),
1040}
1041
1042impl fmt::Display for UserOption {
1043 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1044 match self {
1045 UserOption::SuperUser => write!(f, "SUPERUSER"),
1046 UserOption::NoSuperUser => write!(f, "NOSUPERUSER"),
1047 UserOption::CreateDB => write!(f, "CREATEDB"),
1048 UserOption::NoCreateDB => write!(f, "NOCREATEDB"),
1049 UserOption::CreateUser => write!(f, "CREATEUSER"),
1050 UserOption::NoCreateUser => write!(f, "NOCREATEUSER"),
1051 UserOption::Login => write!(f, "LOGIN"),
1052 UserOption::NoLogin => write!(f, "NOLOGIN"),
1053 UserOption::Admin => write!(f, "ADMIN"),
1054 UserOption::NoAdmin => write!(f, "NOADMIN"),
1055 UserOption::EncryptedPassword(p) => write!(f, "ENCRYPTED PASSWORD {}", p),
1056 UserOption::Password(None) => write!(f, "PASSWORD NULL"),
1057 UserOption::Password(Some(p)) => write!(f, "PASSWORD {}", p),
1058 UserOption::OAuth(options) => {
1059 write!(f, "({})", display_comma_separated(options.as_slice()))
1060 }
1061 }
1062 }
1063}
1064
1065#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1066pub struct UserOptions(pub Vec<UserOption>);
1067
1068#[derive(Default)]
1069struct UserOptionsBuilder {
1070 super_user: Option<UserOption>,
1071 create_db: Option<UserOption>,
1072 create_user: Option<UserOption>,
1073 login: Option<UserOption>,
1074 admin: Option<UserOption>,
1075 password: Option<UserOption>,
1076}
1077
1078impl UserOptionsBuilder {
1079 fn build(self) -> UserOptions {
1080 let mut options = vec![];
1081 if let Some(option) = self.super_user {
1082 options.push(option);
1083 }
1084 if let Some(option) = self.create_db {
1085 options.push(option);
1086 }
1087 if let Some(option) = self.create_user {
1088 options.push(option);
1089 }
1090 if let Some(option) = self.login {
1091 options.push(option);
1092 }
1093 if let Some(option) = self.admin {
1094 options.push(option);
1095 }
1096 if let Some(option) = self.password {
1097 options.push(option);
1098 }
1099 UserOptions(options)
1100 }
1101}
1102
1103impl ParseTo for UserOptions {
1104 fn parse_to(parser: &mut Parser<'_>) -> ModalResult<Self> {
1105 let mut builder = UserOptionsBuilder::default();
1106 let add_option = |item: &mut Option<UserOption>, user_option| {
1107 let old_value = item.replace(user_option);
1108 if old_value.is_some() {
1109 parser_err!("conflicting or redundant options");
1110 }
1111 Ok(())
1112 };
1113 let _ = parser.parse_keyword(Keyword::WITH);
1114 loop {
1115 let token = parser.peek_token();
1116 if token == Token::EOF || token == Token::SemiColon {
1117 break;
1118 }
1119
1120 let option = parser.parse_identifier()?;
1121 let s = option.real_value();
1122 let (item_mut_ref, user_option) = match &s[..] {
1123 "superuser" => (&mut builder.super_user, UserOption::SuperUser),
1124 "nosuperuser" => (&mut builder.super_user, UserOption::NoSuperUser),
1125 "createdb" => (&mut builder.create_db, UserOption::CreateDB),
1126 "nocreatedb" => (&mut builder.create_db, UserOption::NoCreateDB),
1127 "createuser" => (&mut builder.create_user, UserOption::CreateUser),
1128 "nocreateuser" => (&mut builder.create_user, UserOption::NoCreateUser),
1129 "login" => (&mut builder.login, UserOption::Login),
1130 "nologin" => (&mut builder.login, UserOption::NoLogin),
1131 "admin" => (&mut builder.admin, UserOption::Admin),
1132 "noadmin" => (&mut builder.admin, UserOption::NoAdmin),
1133 "password" => {
1134 if parser.parse_keyword(Keyword::NULL) {
1135 (&mut builder.password, UserOption::Password(None))
1136 } else {
1137 (
1138 &mut builder.password,
1139 UserOption::Password(Some(AstString::parse_to(parser)?)),
1140 )
1141 }
1142 }
1143 "encrypted" => {
1144 let option = parser.parse_identifier()?;
1145 let s = option.real_value();
1146 if s != "password" {
1147 parser_err!("expected PASSWORD after ENCRYPTED, found {}", option);
1148 }
1149 (
1150 &mut builder.password,
1151 UserOption::EncryptedPassword(AstString::parse_to(parser)?),
1152 )
1153 }
1154 "oauth" => {
1155 let options = parser.parse_options()?;
1156 (&mut builder.password, UserOption::OAuth(options))
1157 }
1158 _ => {
1159 parser_err!("unexpected user option: {}", option);
1160 }
1161 };
1162 add_option(item_mut_ref, user_option)?;
1163 }
1164 Ok(builder.build())
1165 }
1166}
1167
1168impl fmt::Display for UserOptions {
1169 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1170 if !self.0.is_empty() {
1171 write!(f, "WITH {}", display_separated(self.0.as_slice(), " "))
1172 } else {
1173 Ok(())
1174 }
1175 }
1176}
1177
1178impl ParseTo for CreateUserStatement {
1179 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
1180 impl_parse_to!(user_name: ObjectName, p);
1181 impl_parse_to!(with_options: UserOptions, p);
1182
1183 Ok(CreateUserStatement {
1184 user_name,
1185 with_options,
1186 })
1187 }
1188}
1189
1190impl fmt::Display for CreateUserStatement {
1191 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1192 let mut v: Vec<String> = vec![];
1193 impl_fmt_display!(user_name, v, self);
1194 impl_fmt_display!(with_options, v, self);
1195 v.iter().join(" ").fmt(f)
1196 }
1197}
1198
1199impl fmt::Display for AlterUserMode {
1200 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1201 match self {
1202 AlterUserMode::Options(options) => {
1203 write!(f, "{}", options)
1204 }
1205 AlterUserMode::Rename(new_name) => {
1206 write!(f, "RENAME TO {}", new_name)
1207 }
1208 }
1209 }
1210}
1211
1212impl fmt::Display for AlterUserStatement {
1213 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1214 let mut v: Vec<String> = vec![];
1215 impl_fmt_display!(user_name, v, self);
1216 impl_fmt_display!(mode, v, self);
1217 v.iter().join(" ").fmt(f)
1218 }
1219}
1220
1221impl ParseTo for AlterUserStatement {
1222 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
1223 impl_parse_to!(user_name: ObjectName, p);
1224 impl_parse_to!(mode: AlterUserMode, p);
1225
1226 Ok(AlterUserStatement { user_name, mode })
1227 }
1228}
1229
1230impl ParseTo for AlterUserMode {
1231 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
1232 if p.parse_keyword(Keyword::RENAME) {
1233 p.expect_keyword(Keyword::TO)?;
1234 impl_parse_to!(new_name: ObjectName, p);
1235 Ok(AlterUserMode::Rename(new_name))
1236 } else {
1237 impl_parse_to!(with_options: UserOptions, p);
1238 Ok(AlterUserMode::Options(with_options))
1239 }
1240 }
1241}
1242
1243#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1244pub struct DropStatement {
1245 pub object_type: ObjectType,
1247 pub if_exists: bool,
1249 pub object_name: ObjectName,
1251 pub drop_mode: AstOption<DropMode>,
1254}
1255
1256impl ParseTo for DropStatement {
1263 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
1264 impl_parse_to!(object_type: ObjectType, p);
1265 impl_parse_to!(if_exists => [Keyword::IF, Keyword::EXISTS], p);
1266 let object_name = p.parse_object_name()?;
1267 impl_parse_to!(drop_mode: AstOption<DropMode>, p);
1268 Ok(Self {
1269 object_type,
1270 if_exists,
1271 object_name,
1272 drop_mode,
1273 })
1274 }
1275}
1276
1277impl fmt::Display for DropStatement {
1278 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1279 let mut v: Vec<String> = vec![];
1280 impl_fmt_display!(object_type, v, self);
1281 impl_fmt_display!(if_exists => [Keyword::IF, Keyword::EXISTS], v, self);
1282 impl_fmt_display!(object_name, v, self);
1283 impl_fmt_display!(drop_mode, v, self);
1284 v.iter().join(" ").fmt(f)
1285 }
1286}
1287
1288#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1289pub enum DropMode {
1290 Cascade,
1291 Restrict,
1292}
1293
1294impl ParseTo for DropMode {
1295 fn parse_to(parser: &mut Parser<'_>) -> ModalResult<Self> {
1296 let drop_mode = if parser.parse_keyword(Keyword::CASCADE) {
1297 DropMode::Cascade
1298 } else if parser.parse_keyword(Keyword::RESTRICT) {
1299 DropMode::Restrict
1300 } else {
1301 return parser.expected("CASCADE | RESTRICT");
1302 };
1303 Ok(drop_mode)
1304 }
1305}
1306
1307impl fmt::Display for DropMode {
1308 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1309 f.write_str(match self {
1310 DropMode::Cascade => "CASCADE",
1311 DropMode::Restrict => "RESTRICT",
1312 })
1313 }
1314}