1use core::fmt;
16use core::fmt::Formatter;
17use std::fmt::Write;
18
19use itertools::Itertools;
20#[cfg(feature = "serde")]
21use serde::{Deserialize, Serialize};
22use winnow::ModalResult;
23
24use super::ddl::SourceWatermark;
25use super::legacy_source::{CompatibleFormatEncode, parse_format_encode};
26use super::{EmitMode, Ident, ObjectType, Query, Value};
27use crate::ast::{
28 ColumnDef, ObjectName, SqlOption, TableConstraint, display_comma_separated, display_separated,
29};
30use crate::keywords::Keyword;
31use crate::parser::{IncludeOption, IsOptional, Parser};
32use crate::parser_err;
33use crate::parser_v2::literal_u32;
34use crate::tokenizer::Token;
35
36pub trait ParseTo: Sized {
38 fn parse_to(parser: &mut Parser<'_>) -> ModalResult<Self>;
39}
40
41#[macro_export]
42macro_rules! impl_parse_to {
43 () => {};
44 ($field:ident : $field_type:ty, $parser:ident) => {
45 let $field = <$field_type>::parse_to($parser)?;
46 };
47 ($field:ident => [$($arr:tt)+], $parser:ident) => {
48 let $field = $parser.parse_keywords(&[$($arr)+]);
49 };
50 ([$($arr:tt)+], $parser:ident) => {
51 $parser.expect_keywords(&[$($arr)+])?;
52 };
53}
54
55#[macro_export]
56macro_rules! impl_fmt_display {
57 () => {};
58 ($field:ident, $v:ident, $self:ident) => {{
59 let s = format!("{}", $self.$field);
60 if !s.is_empty() {
61 $v.push(s);
62 }
63 }};
64 ($field:ident => [$($arr:tt)+], $v:ident, $self:ident) => {
65 if $self.$field {
66 $v.push(format!("{}", display_separated(&[$($arr)+], " ")));
67 }
68 };
69 ([$($arr:tt)+], $v:ident) => {
70 $v.push(format!("{}", display_separated(&[$($arr)+], " ")));
71 };
72}
73
74#[derive(Debug, Clone, PartialEq, Eq, Hash)]
83#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
84pub struct CreateSourceStatement {
85 pub temporary: bool,
86 pub if_not_exists: bool,
87 pub columns: Vec<ColumnDef>,
88 pub wildcard_idx: Option<usize>,
90 pub constraints: Vec<TableConstraint>,
91 pub source_name: ObjectName,
92 pub with_properties: WithProperties,
93 pub format_encode: CompatibleFormatEncode,
94 pub source_watermarks: Vec<SourceWatermark>,
95 pub include_column_options: IncludeOption,
96}
97
98#[derive(Debug, Clone, PartialEq, Eq, Hash)]
102#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
103pub enum Format {
104 Native,
107 None,
109 Debezium,
111 DebeziumMongo,
113 Maxwell,
115 Canal,
117 Upsert,
119 Plain,
121}
122
123impl fmt::Display for Format {
125 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
126 write!(
127 f,
128 "{}",
129 match self {
130 Format::Native => "NATIVE",
131 Format::Debezium => "DEBEZIUM",
132 Format::DebeziumMongo => "DEBEZIUM_MONGO",
133 Format::Maxwell => "MAXWELL",
134 Format::Canal => "CANAL",
135 Format::Upsert => "UPSERT",
136 Format::Plain => "PLAIN",
137 Format::None => "NONE",
138 }
139 )
140 }
141}
142
143impl Format {
144 pub fn from_keyword(s: &str) -> ModalResult<Self> {
145 Ok(match s {
146 "DEBEZIUM" => Format::Debezium,
147 "DEBEZIUM_MONGO" => Format::DebeziumMongo,
148 "MAXWELL" => Format::Maxwell,
149 "CANAL" => Format::Canal,
150 "PLAIN" => Format::Plain,
151 "UPSERT" => Format::Upsert,
152 "NATIVE" => Format::Native,
153 "NONE" => Format::None,
154 _ => parser_err!(
155 "expected CANAL | PROTOBUF | DEBEZIUM | MAXWELL | PLAIN | NATIVE | NONE after FORMAT"
156 ),
157 })
158 }
159}
160
161#[derive(Debug, Clone, PartialEq, Eq, Hash)]
163#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
164pub enum Encode {
165 Avro, Csv, Protobuf, Json, Bytes, None,
172 Text, Native,
176 Template,
177 Parquet,
178}
179
180impl fmt::Display for Encode {
182 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
183 write!(
184 f,
185 "{}",
186 match self {
187 Encode::Avro => "AVRO",
188 Encode::Csv => "CSV",
189 Encode::Protobuf => "PROTOBUF",
190 Encode::Json => "JSON",
191 Encode::Bytes => "BYTES",
192 Encode::Native => "NATIVE",
193 Encode::Template => "TEMPLATE",
194 Encode::None => "NONE",
195 Encode::Parquet => "PARQUET",
196 Encode::Text => "TEXT",
197 }
198 )
199 }
200}
201
202impl Encode {
203 pub fn from_keyword(s: &str) -> ModalResult<Self> {
204 Ok(match s {
205 "AVRO" => Encode::Avro,
206 "TEXT" => Encode::Text,
207 "BYTES" => Encode::Bytes,
208 "CSV" => Encode::Csv,
209 "PROTOBUF" => Encode::Protobuf,
210 "JSON" => Encode::Json,
211 "TEMPLATE" => Encode::Template,
212 "PARQUET" => Encode::Parquet,
213 "NATIVE" => Encode::Native,
214 "NONE" => Encode::None,
215 _ => parser_err!(
216 "expected AVRO | BYTES | CSV | PROTOBUF | JSON | NATIVE | TEMPLATE | PARQUET | NONE after Encode"
217 ),
218 })
219 }
220}
221
222#[derive(Debug, Clone, PartialEq, Eq, Hash)]
224#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
225pub struct FormatEncodeOptions {
226 pub format: Format,
227 pub row_encode: Encode,
228 pub row_options: Vec<SqlOption>,
229
230 pub key_encode: Option<Encode>,
231}
232
233impl Parser<'_> {
234 fn peek_format_encode_format(&mut self) -> bool {
236 (self.peek_nth_any_of_keywords(0, &[Keyword::ROW])
237 && self.peek_nth_any_of_keywords(1, &[Keyword::FORMAT])) || self.peek_nth_any_of_keywords(0, &[Keyword::FORMAT]) }
240
241 pub fn parse_format_encode_with_connector(
243 &mut self,
244 connector: &str,
245 cdc_source_job: bool,
246 ) -> ModalResult<CompatibleFormatEncode> {
247 if connector.contains("-cdc") {
252 let expected = if cdc_source_job {
253 FormatEncodeOptions::plain_json()
254 } else if connector.contains("mongodb") {
255 FormatEncodeOptions::debezium_mongo_json()
256 } else {
257 FormatEncodeOptions::debezium_json()
258 };
259
260 if self.peek_format_encode_format() {
261 let schema = parse_format_encode(self)?.into_v2();
262 if schema != expected {
263 parser_err!(
264 "Row format for CDC connectors should be \
265 either omitted or set to `{expected}`",
266 );
267 }
268 }
269 Ok(expected.into())
270 } else if connector.contains("nexmark") {
271 let expected = FormatEncodeOptions::native();
272 if self.peek_format_encode_format() {
273 let schema = parse_format_encode(self)?.into_v2();
274 if schema != expected {
275 parser_err!(
276 "Row format for nexmark connectors should be \
277 either omitted or set to `{expected}`",
278 );
279 }
280 }
281 Ok(expected.into())
282 } else if connector.contains("datagen") {
283 Ok(if self.peek_format_encode_format() {
284 parse_format_encode(self)?
285 } else {
286 FormatEncodeOptions::native().into()
287 })
288 } else if connector.contains("iceberg") {
289 let expected = FormatEncodeOptions::none();
290 if self.peek_format_encode_format() {
291 let schema = parse_format_encode(self)?.into_v2();
292 if schema != expected {
293 parser_err!(
294 "Row format for iceberg connectors should be \
295 either omitted or set to `{expected}`",
296 );
297 }
298 }
299 Ok(expected.into())
300 } else if connector.contains("webhook") {
301 parser_err!(
302 "Source with webhook connector is not supported. \
303 Please use the `CREATE TABLE ... WITH ...` statement instead.",
304 );
305 } else {
306 Ok(parse_format_encode(self)?)
307 }
308 }
309
310 pub fn parse_schema(&mut self) -> ModalResult<Option<FormatEncodeOptions>> {
312 if !self.parse_keyword(Keyword::FORMAT) {
313 return Ok(None);
314 }
315
316 let id = self.parse_identifier()?;
317 let s = id.value.to_ascii_uppercase();
318 let format = Format::from_keyword(&s)?;
319 self.expect_keyword(Keyword::ENCODE)?;
320 let id = self.parse_identifier()?;
321 let s = id.value.to_ascii_uppercase();
322 let row_encode = Encode::from_keyword(&s)?;
323 let row_options = self.parse_options()?;
324
325 let key_encode = if self.parse_keywords(&[Keyword::KEY, Keyword::ENCODE]) {
326 Some(Encode::from_keyword(
327 self.parse_identifier()?.value.to_ascii_uppercase().as_str(),
328 )?)
329 } else {
330 None
331 };
332
333 Ok(Some(FormatEncodeOptions {
334 format,
335 row_encode,
336 row_options,
337 key_encode,
338 }))
339 }
340}
341
342impl FormatEncodeOptions {
343 pub const fn plain_json() -> Self {
344 FormatEncodeOptions {
345 format: Format::Plain,
346 row_encode: Encode::Json,
347 row_options: Vec::new(),
348 key_encode: None,
349 }
350 }
351
352 pub const fn debezium_json() -> Self {
354 FormatEncodeOptions {
355 format: Format::Debezium,
356 row_encode: Encode::Json,
357 row_options: Vec::new(),
358 key_encode: None,
359 }
360 }
361
362 pub const fn debezium_mongo_json() -> Self {
363 FormatEncodeOptions {
364 format: Format::DebeziumMongo,
365 row_encode: Encode::Json,
366 row_options: Vec::new(),
367 key_encode: None,
368 }
369 }
370
371 pub const fn native() -> Self {
373 FormatEncodeOptions {
374 format: Format::Native,
375 row_encode: Encode::Native,
376 row_options: Vec::new(),
377 key_encode: None,
378 }
379 }
380
381 pub const fn none() -> Self {
384 FormatEncodeOptions {
385 format: Format::None,
386 row_encode: Encode::None,
387 row_options: Vec::new(),
388 key_encode: None,
389 }
390 }
391
392 pub fn row_options(&self) -> &[SqlOption] {
393 self.row_options.as_ref()
394 }
395}
396
397impl fmt::Display for FormatEncodeOptions {
398 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
399 write!(f, "FORMAT {} ENCODE {}", self.format, self.row_encode)?;
400
401 if !self.row_options().is_empty() {
402 write!(f, " ({})", display_comma_separated(self.row_options()))
403 } else {
404 Ok(())
405 }
406 }
407}
408
409pub(super) fn fmt_create_items(
410 columns: &[ColumnDef],
411 constraints: &[TableConstraint],
412 watermarks: &[SourceWatermark],
413 wildcard_idx: Option<usize>,
414) -> std::result::Result<String, fmt::Error> {
415 let mut items = String::new();
416 let has_items = !columns.is_empty()
417 || !constraints.is_empty()
418 || !watermarks.is_empty()
419 || wildcard_idx.is_some();
420 has_items.then(|| write!(&mut items, "("));
421
422 if let Some(wildcard_idx) = wildcard_idx {
423 let (columns_l, columns_r) = columns.split_at(wildcard_idx);
424 write!(&mut items, "{}", display_comma_separated(columns_l))?;
425 if !columns_l.is_empty() {
426 write!(&mut items, ", ")?;
427 }
428 write!(&mut items, "{}", Token::Mul)?;
429 if !columns_r.is_empty() {
430 write!(&mut items, ", ")?;
431 }
432 write!(&mut items, "{}", display_comma_separated(columns_r))?;
433 } else {
434 write!(&mut items, "{}", display_comma_separated(columns))?;
435 }
436 let mut leading_items = !columns.is_empty() || wildcard_idx.is_some();
437
438 if leading_items && !constraints.is_empty() {
439 write!(&mut items, ", ")?;
440 }
441 write!(&mut items, "{}", display_comma_separated(constraints))?;
442 leading_items |= !constraints.is_empty();
443
444 if leading_items && !watermarks.is_empty() {
445 write!(&mut items, ", ")?;
446 }
447 write!(&mut items, "{}", display_comma_separated(watermarks))?;
448 has_items.then(|| write!(&mut items, ")"));
452 Ok(items)
453}
454
455impl fmt::Display for CreateSourceStatement {
456 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
457 let mut v: Vec<String> = vec![];
458 impl_fmt_display!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], v, self);
459 impl_fmt_display!(source_name, v, self);
460
461 let items = fmt_create_items(
462 &self.columns,
463 &self.constraints,
464 &self.source_watermarks,
465 self.wildcard_idx,
466 )?;
467 if !items.is_empty() {
468 v.push(items);
469 }
470
471 for item in &self.include_column_options {
472 v.push(format!("{}", item));
473 }
474 impl_fmt_display!(with_properties, v, self);
475 impl_fmt_display!(format_encode, v, self);
476 v.iter().join(" ").fmt(f)
477 }
478}
479
480#[derive(Debug, Clone, PartialEq, Eq, Hash)]
481#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
482pub enum CreateSink {
483 From(ObjectName),
484 AsQuery(Box<Query>),
485}
486
487impl fmt::Display for CreateSink {
488 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
489 match self {
490 Self::From(mv) => write!(f, "FROM {}", mv),
491 Self::AsQuery(query) => write!(f, "AS {}", query),
492 }
493 }
494}
495#[derive(Debug, Clone, PartialEq, Eq, Hash)]
503#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
504pub struct CreateSinkStatement {
505 pub if_not_exists: bool,
506 pub sink_name: ObjectName,
507 pub with_properties: WithProperties,
508 pub sink_from: CreateSink,
509 pub columns: Vec<Ident>,
510 pub emit_mode: Option<EmitMode>,
511 pub sink_schema: Option<FormatEncodeOptions>,
512 pub into_table_name: Option<ObjectName>,
513}
514
515impl ParseTo for CreateSinkStatement {
516 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
517 impl_parse_to!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], p);
518 impl_parse_to!(sink_name: ObjectName, p);
519
520 let into_table_name = if p.parse_keyword(Keyword::INTO) {
521 impl_parse_to!(into_table_name: ObjectName, p);
522 Some(into_table_name)
523 } else {
524 None
525 };
526
527 let columns = p.parse_parenthesized_column_list(IsOptional::Optional)?;
528
529 let sink_from = if p.parse_keyword(Keyword::FROM) {
530 impl_parse_to!(from_name: ObjectName, p);
531 CreateSink::From(from_name)
532 } else if p.parse_keyword(Keyword::AS) {
533 let query = Box::new(p.parse_query()?);
534 CreateSink::AsQuery(query)
535 } else {
536 p.expected("FROM or AS after CREATE SINK sink_name")?
537 };
538
539 let emit_mode: Option<EmitMode> = p.parse_emit_mode()?;
540
541 if !p.peek_nth_any_of_keywords(0, &[Keyword::WITH]) && into_table_name.is_none() {
544 p.expected("WITH")?
545 }
546 impl_parse_to!(with_properties: WithProperties, p);
547
548 if with_properties.0.is_empty() && into_table_name.is_none() {
549 parser_err!("sink properties not provided");
550 }
551
552 let sink_schema = p.parse_schema()?;
553
554 Ok(Self {
555 if_not_exists,
556 sink_name,
557 with_properties,
558 sink_from,
559 columns,
560 emit_mode,
561 sink_schema,
562 into_table_name,
563 })
564 }
565}
566
567impl fmt::Display for CreateSinkStatement {
568 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
569 let mut v: Vec<String> = vec![];
570 impl_fmt_display!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], v, self);
571 impl_fmt_display!(sink_name, v, self);
572 if let Some(into_table) = &self.into_table_name {
573 impl_fmt_display!([Keyword::INTO], v);
574 impl_fmt_display!([into_table], v);
575 }
576 impl_fmt_display!(sink_from, v, self);
577 if let Some(ref emit_mode) = self.emit_mode {
578 v.push(format!("EMIT {}", emit_mode));
579 }
580 impl_fmt_display!(with_properties, v, self);
581 if let Some(schema) = &self.sink_schema {
582 v.push(format!("{}", schema));
583 }
584 v.iter().join(" ").fmt(f)
585 }
586}
587
588#[derive(Debug, Clone, PartialEq, Eq, Hash)]
596#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
597pub struct CreateSubscriptionStatement {
598 pub if_not_exists: bool,
599 pub subscription_name: ObjectName,
600 pub with_properties: WithProperties,
601 pub subscription_from: ObjectName,
602 }
604
605impl ParseTo for CreateSubscriptionStatement {
606 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
607 impl_parse_to!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], p);
608 impl_parse_to!(subscription_name: ObjectName, p);
609
610 let subscription_from = if p.parse_keyword(Keyword::FROM) {
611 impl_parse_to!(from_name: ObjectName, p);
612 from_name
613 } else {
614 p.expected("FROM after CREATE SUBSCRIPTION subscription_name")?
615 };
616
617 if !p.peek_nth_any_of_keywords(0, &[Keyword::WITH]) {
622 p.expected("WITH")?
623 }
624 impl_parse_to!(with_properties: WithProperties, p);
625
626 if with_properties.0.is_empty() {
627 parser_err!("subscription properties not provided");
628 }
629
630 Ok(Self {
631 if_not_exists,
632 subscription_name,
633 with_properties,
634 subscription_from,
635 })
636 }
637}
638
639impl fmt::Display for CreateSubscriptionStatement {
640 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
641 let mut v: Vec<String> = vec![];
642 impl_fmt_display!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], v, self);
643 impl_fmt_display!(subscription_name, v, self);
644 v.push(format!("FROM {}", self.subscription_from));
645 impl_fmt_display!(with_properties, v, self);
646 v.iter().join(" ").fmt(f)
647 }
648}
649
650#[derive(Debug, Clone, PartialEq, Eq, Hash)]
651#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
652pub enum DeclareCursor {
653 Query(Box<Query>),
654 Subscription(ObjectName, Since),
655}
656
657impl fmt::Display for DeclareCursor {
658 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
659 let mut v: Vec<String> = vec![];
660 match self {
661 DeclareCursor::Query(query) => v.push(format!("{}", query.as_ref())),
662 DeclareCursor::Subscription(name, since) => {
663 v.push(format!("{}", name));
664 v.push(format!("{:?}", since));
665 }
666 }
667 v.iter().join(" ").fmt(f)
668 }
669}
670
671#[derive(Debug, Clone, PartialEq, Eq, Hash)]
681#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
682pub struct DeclareCursorStatement {
683 pub cursor_name: Ident,
684 pub declare_cursor: DeclareCursor,
685}
686
687impl ParseTo for DeclareCursorStatement {
688 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
689 let cursor_name = p.parse_identifier_non_reserved()?;
690
691 let declare_cursor = if !p.parse_keyword(Keyword::SUBSCRIPTION) {
692 p.expect_keyword(Keyword::CURSOR)?;
693 p.expect_keyword(Keyword::FOR)?;
694 DeclareCursor::Query(Box::new(p.parse_query()?))
695 } else {
696 p.expect_keyword(Keyword::CURSOR)?;
697 p.expect_keyword(Keyword::FOR)?;
698 let cursor_for_name = p.parse_object_name()?;
699 let rw_timestamp = p.parse_since()?;
700 DeclareCursor::Subscription(cursor_for_name, rw_timestamp)
701 };
702
703 Ok(Self {
704 cursor_name,
705 declare_cursor,
706 })
707 }
708}
709
710impl fmt::Display for DeclareCursorStatement {
711 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
712 let mut v: Vec<String> = vec![];
713 impl_fmt_display!(cursor_name, v, self);
714 match &self.declare_cursor {
715 DeclareCursor::Query(_) => {
716 v.push("CURSOR FOR ".to_owned());
717 }
718 DeclareCursor::Subscription { .. } => {
719 v.push("SUBSCRIPTION CURSOR FOR ".to_owned());
720 }
721 }
722 impl_fmt_display!(declare_cursor, v, self);
723 v.iter().join(" ").fmt(f)
724 }
725}
726
727#[derive(Debug, Clone, PartialEq, Eq, Hash)]
731#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
732pub struct FetchCursorStatement {
733 pub cursor_name: Ident,
734 pub count: u32,
735 pub with_properties: WithProperties,
736}
737
738impl ParseTo for FetchCursorStatement {
739 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
740 let count = if p.parse_keyword(Keyword::NEXT) {
741 1
742 } else {
743 literal_u32(p)?
744 };
745 p.expect_keyword(Keyword::FROM)?;
746 let cursor_name = p.parse_identifier_non_reserved()?;
747 impl_parse_to!(with_properties: WithProperties, p);
748
749 Ok(Self {
750 cursor_name,
751 count,
752 with_properties,
753 })
754 }
755}
756
757impl fmt::Display for FetchCursorStatement {
758 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
759 let mut v: Vec<String> = vec![];
760 if self.count == 1 {
761 v.push("NEXT ".to_owned());
762 } else {
763 impl_fmt_display!(count, v, self);
764 }
765 v.push("FROM ".to_owned());
766 impl_fmt_display!(cursor_name, v, self);
767 v.iter().join(" ").fmt(f)
768 }
769}
770
771#[derive(Debug, Clone, PartialEq, Eq, Hash)]
775#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
776pub struct CloseCursorStatement {
777 pub cursor_name: Option<Ident>,
778}
779
780impl ParseTo for CloseCursorStatement {
781 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
782 let cursor_name = if p.parse_keyword(Keyword::ALL) {
783 None
784 } else {
785 Some(p.parse_identifier_non_reserved()?)
786 };
787
788 Ok(Self { cursor_name })
789 }
790}
791
792impl fmt::Display for CloseCursorStatement {
793 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
794 let mut v: Vec<String> = vec![];
795 if let Some(cursor_name) = &self.cursor_name {
796 v.push(format!("{}", cursor_name));
797 } else {
798 v.push("ALL".to_owned());
799 }
800 v.iter().join(" ").fmt(f)
801 }
802}
803
804#[derive(Debug, Clone, PartialEq, Eq, Hash)]
810#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
811pub struct CreateConnectionStatement {
812 pub if_not_exists: bool,
813 pub connection_name: ObjectName,
814 pub with_properties: WithProperties,
815}
816
817impl ParseTo for CreateConnectionStatement {
818 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
819 impl_parse_to!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], p);
820 impl_parse_to!(connection_name: ObjectName, p);
821 impl_parse_to!(with_properties: WithProperties, p);
822 if with_properties.0.is_empty() {
823 parser_err!("connection properties not provided");
824 }
825
826 Ok(Self {
827 if_not_exists,
828 connection_name,
829 with_properties,
830 })
831 }
832}
833
834impl fmt::Display for CreateConnectionStatement {
835 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
836 let mut v: Vec<String> = vec![];
837 impl_fmt_display!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], v, self);
838 impl_fmt_display!(connection_name, v, self);
839 impl_fmt_display!(with_properties, v, self);
840 v.iter().join(" ").fmt(f)
841 }
842}
843
844#[derive(Debug, Clone, PartialEq, Eq, Hash)]
845#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
846pub struct CreateSecretStatement {
847 pub if_not_exists: bool,
848 pub secret_name: ObjectName,
849 pub credential: Value,
850 pub with_properties: WithProperties,
851}
852
853impl ParseTo for CreateSecretStatement {
854 fn parse_to(parser: &mut Parser<'_>) -> ModalResult<Self> {
855 impl_parse_to!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], parser);
856 impl_parse_to!(secret_name: ObjectName, parser);
857 impl_parse_to!(with_properties: WithProperties, parser);
858 let mut credential = Value::Null;
859 if parser.parse_keyword(Keyword::AS) {
860 credential = parser.ensure_parse_value()?;
861 }
862 Ok(Self {
863 if_not_exists,
864 secret_name,
865 credential,
866 with_properties,
867 })
868 }
869}
870
871impl fmt::Display for CreateSecretStatement {
872 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
873 let mut v: Vec<String> = vec![];
874 impl_fmt_display!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], v, self);
875 impl_fmt_display!(secret_name, v, self);
876 impl_fmt_display!(with_properties, v, self);
877 if self.credential != Value::Null {
878 v.push("AS".to_owned());
879 impl_fmt_display!(credential, v, self);
880 }
881 v.iter().join(" ").fmt(f)
882 }
883}
884
885#[derive(Debug, Clone, PartialEq, Eq, Hash)]
886#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
887pub struct WithProperties(pub Vec<SqlOption>);
888
889impl ParseTo for WithProperties {
890 fn parse_to(parser: &mut Parser<'_>) -> ModalResult<Self> {
891 Ok(Self(
892 parser.parse_options_with_preceding_keyword(Keyword::WITH)?,
893 ))
894 }
895}
896
897impl fmt::Display for WithProperties {
898 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
899 if !self.0.is_empty() {
900 write!(f, "WITH ({})", display_comma_separated(self.0.as_slice()))
901 } else {
902 Ok(())
903 }
904 }
905}
906
907#[derive(Debug, Clone, PartialEq, Eq, Hash)]
908#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
909pub enum Since {
910 TimestampMsNum(u64),
911 ProcessTime,
912 Begin,
913 Full,
914}
915
916impl fmt::Display for Since {
917 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
918 use Since::*;
919 match self {
920 TimestampMsNum(ts) => write!(f, " SINCE {}", ts),
921 ProcessTime => write!(f, " SINCE PROCTIME()"),
922 Begin => write!(f, " SINCE BEGIN()"),
923 Full => write!(f, " FULL"),
924 }
925 }
926}
927
928#[derive(Debug, Clone, PartialEq, Eq, Hash)]
929#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
930pub struct RowSchemaLocation {
931 pub value: AstString,
932}
933
934impl ParseTo for RowSchemaLocation {
935 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
936 impl_parse_to!([Keyword::ROW, Keyword::SCHEMA, Keyword::LOCATION], p);
937 impl_parse_to!(value: AstString, p);
938 Ok(Self { value })
939 }
940}
941
942impl fmt::Display for RowSchemaLocation {
943 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
944 let mut v = vec![];
945 impl_fmt_display!([Keyword::ROW, Keyword::SCHEMA, Keyword::LOCATION], v);
946 impl_fmt_display!(value, v, self);
947 v.iter().join(" ").fmt(f)
948 }
949}
950
951#[derive(Debug, Clone, PartialEq, Eq, Hash)]
954#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
955pub struct AstString(pub String);
956
957impl ParseTo for AstString {
958 fn parse_to(parser: &mut Parser<'_>) -> ModalResult<Self> {
959 Ok(Self(parser.parse_literal_string()?))
960 }
961}
962
963impl fmt::Display for AstString {
964 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
965 write!(f, "'{}'", self.0)
966 }
967}
968
969#[derive(Debug, Clone, PartialEq, Eq, Hash)]
972#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
973pub enum AstOption<T> {
974 None,
976 Some(T),
978}
979
980impl<T: ParseTo> ParseTo for AstOption<T> {
981 fn parse_to(parser: &mut Parser<'_>) -> ModalResult<Self> {
982 match T::parse_to(parser) {
983 Ok(t) => Ok(AstOption::Some(t)),
984 Err(_) => Ok(AstOption::None),
985 }
986 }
987}
988
989impl<T: fmt::Display> fmt::Display for AstOption<T> {
990 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
991 match &self {
992 AstOption::Some(t) => t.fmt(f),
993 AstOption::None => Ok(()),
994 }
995 }
996}
997
998impl<T> From<AstOption<T>> for Option<T> {
999 fn from(val: AstOption<T>) -> Self {
1000 match val {
1001 AstOption::Some(t) => Some(t),
1002 AstOption::None => None,
1003 }
1004 }
1005}
1006
1007#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1008#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
1009pub struct CreateUserStatement {
1010 pub user_name: ObjectName,
1011 pub with_options: UserOptions,
1012}
1013
1014#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1015#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
1016pub struct AlterUserStatement {
1017 pub user_name: ObjectName,
1018 pub mode: AlterUserMode,
1019}
1020
1021#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1022#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
1023pub enum AlterUserMode {
1024 Options(UserOptions),
1025 Rename(ObjectName),
1026}
1027
1028#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1029#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
1030pub enum UserOption {
1031 SuperUser,
1032 NoSuperUser,
1033 CreateDB,
1034 NoCreateDB,
1035 CreateUser,
1036 NoCreateUser,
1037 Login,
1038 NoLogin,
1039 EncryptedPassword(AstString),
1040 Password(Option<AstString>),
1041 OAuth(Vec<SqlOption>),
1042}
1043
1044impl fmt::Display for UserOption {
1045 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1046 match self {
1047 UserOption::SuperUser => write!(f, "SUPERUSER"),
1048 UserOption::NoSuperUser => write!(f, "NOSUPERUSER"),
1049 UserOption::CreateDB => write!(f, "CREATEDB"),
1050 UserOption::NoCreateDB => write!(f, "NOCREATEDB"),
1051 UserOption::CreateUser => write!(f, "CREATEUSER"),
1052 UserOption::NoCreateUser => write!(f, "NOCREATEUSER"),
1053 UserOption::Login => write!(f, "LOGIN"),
1054 UserOption::NoLogin => write!(f, "NOLOGIN"),
1055 UserOption::EncryptedPassword(p) => write!(f, "ENCRYPTED PASSWORD {}", p),
1056 UserOption::Password(None) => write!(f, "PASSWORD NULL"),
1057 UserOption::Password(Some(p)) => write!(f, "PASSWORD {}", p),
1058 UserOption::OAuth(options) => {
1059 write!(f, "({})", display_comma_separated(options.as_slice()))
1060 }
1061 }
1062 }
1063}
1064
1065#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1066#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
1067pub struct UserOptions(pub Vec<UserOption>);
1068
1069#[derive(Default)]
1070struct UserOptionsBuilder {
1071 super_user: Option<UserOption>,
1072 create_db: Option<UserOption>,
1073 create_user: Option<UserOption>,
1074 login: Option<UserOption>,
1075 password: Option<UserOption>,
1076}
1077
1078impl UserOptionsBuilder {
1079 fn build(self) -> UserOptions {
1080 let mut options = vec![];
1081 if let Some(option) = self.super_user {
1082 options.push(option);
1083 }
1084 if let Some(option) = self.create_db {
1085 options.push(option);
1086 }
1087 if let Some(option) = self.create_user {
1088 options.push(option);
1089 }
1090 if let Some(option) = self.login {
1091 options.push(option);
1092 }
1093 if let Some(option) = self.password {
1094 options.push(option);
1095 }
1096 UserOptions(options)
1097 }
1098}
1099
1100impl ParseTo for UserOptions {
1101 fn parse_to(parser: &mut Parser<'_>) -> ModalResult<Self> {
1102 let mut builder = UserOptionsBuilder::default();
1103 let add_option = |item: &mut Option<UserOption>, user_option| {
1104 let old_value = item.replace(user_option);
1105 if old_value.is_some() {
1106 parser_err!("conflicting or redundant options");
1107 }
1108 Ok(())
1109 };
1110 let _ = parser.parse_keyword(Keyword::WITH);
1111 loop {
1112 let token = parser.peek_token();
1113 if token == Token::EOF || token == Token::SemiColon {
1114 break;
1115 }
1116
1117 if let Token::Word(ref w) = token.token {
1118 let checkpoint = *parser;
1119 parser.next_token();
1120 let (item_mut_ref, user_option) = match w.keyword {
1121 Keyword::SUPERUSER => (&mut builder.super_user, UserOption::SuperUser),
1122 Keyword::NOSUPERUSER => (&mut builder.super_user, UserOption::NoSuperUser),
1123 Keyword::CREATEDB => (&mut builder.create_db, UserOption::CreateDB),
1124 Keyword::NOCREATEDB => (&mut builder.create_db, UserOption::NoCreateDB),
1125 Keyword::CREATEUSER => (&mut builder.create_user, UserOption::CreateUser),
1126 Keyword::NOCREATEUSER => (&mut builder.create_user, UserOption::NoCreateUser),
1127 Keyword::LOGIN => (&mut builder.login, UserOption::Login),
1128 Keyword::NOLOGIN => (&mut builder.login, UserOption::NoLogin),
1129 Keyword::PASSWORD => {
1130 if parser.parse_keyword(Keyword::NULL) {
1131 (&mut builder.password, UserOption::Password(None))
1132 } else {
1133 (
1134 &mut builder.password,
1135 UserOption::Password(Some(AstString::parse_to(parser)?)),
1136 )
1137 }
1138 }
1139 Keyword::ENCRYPTED => {
1140 parser.expect_keyword(Keyword::PASSWORD)?;
1141 (
1142 &mut builder.password,
1143 UserOption::EncryptedPassword(AstString::parse_to(parser)?),
1144 )
1145 }
1146 Keyword::OAUTH => {
1147 let options = parser.parse_options()?;
1148 (&mut builder.password, UserOption::OAuth(options))
1149 }
1150 _ => {
1151 parser.expected_at(
1152 checkpoint,
1153 "SUPERUSER | NOSUPERUSER | CREATEDB | NOCREATEDB | LOGIN \
1154 | NOLOGIN | CREATEUSER | NOCREATEUSER | [ENCRYPTED] PASSWORD | NULL | OAUTH",
1155 )?;
1156 unreachable!()
1157 }
1158 };
1159 add_option(item_mut_ref, user_option)?;
1160 } else {
1161 parser.expected(
1162 "SUPERUSER | NOSUPERUSER | CREATEDB | NOCREATEDB | LOGIN | NOLOGIN \
1163 | CREATEUSER | NOCREATEUSER | [ENCRYPTED] PASSWORD | NULL | OAUTH",
1164 )?
1165 }
1166 }
1167 Ok(builder.build())
1168 }
1169}
1170
1171impl fmt::Display for UserOptions {
1172 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1173 if !self.0.is_empty() {
1174 write!(f, "WITH {}", display_separated(self.0.as_slice(), " "))
1175 } else {
1176 Ok(())
1177 }
1178 }
1179}
1180
1181impl ParseTo for CreateUserStatement {
1182 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
1183 impl_parse_to!(user_name: ObjectName, p);
1184 impl_parse_to!(with_options: UserOptions, p);
1185
1186 Ok(CreateUserStatement {
1187 user_name,
1188 with_options,
1189 })
1190 }
1191}
1192
1193impl fmt::Display for CreateUserStatement {
1194 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1195 let mut v: Vec<String> = vec![];
1196 impl_fmt_display!(user_name, v, self);
1197 impl_fmt_display!(with_options, v, self);
1198 v.iter().join(" ").fmt(f)
1199 }
1200}
1201
1202impl fmt::Display for AlterUserMode {
1203 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1204 match self {
1205 AlterUserMode::Options(options) => {
1206 write!(f, "{}", options)
1207 }
1208 AlterUserMode::Rename(new_name) => {
1209 write!(f, "RENAME TO {}", new_name)
1210 }
1211 }
1212 }
1213}
1214
1215impl fmt::Display for AlterUserStatement {
1216 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1217 let mut v: Vec<String> = vec![];
1218 impl_fmt_display!(user_name, v, self);
1219 impl_fmt_display!(mode, v, self);
1220 v.iter().join(" ").fmt(f)
1221 }
1222}
1223
1224impl ParseTo for AlterUserStatement {
1225 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
1226 impl_parse_to!(user_name: ObjectName, p);
1227 impl_parse_to!(mode: AlterUserMode, p);
1228
1229 Ok(AlterUserStatement { user_name, mode })
1230 }
1231}
1232
1233impl ParseTo for AlterUserMode {
1234 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
1235 if p.parse_keyword(Keyword::RENAME) {
1236 p.expect_keyword(Keyword::TO)?;
1237 impl_parse_to!(new_name: ObjectName, p);
1238 Ok(AlterUserMode::Rename(new_name))
1239 } else {
1240 impl_parse_to!(with_options: UserOptions, p);
1241 Ok(AlterUserMode::Options(with_options))
1242 }
1243 }
1244}
1245
1246#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1247#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
1248pub struct DropStatement {
1249 pub object_type: ObjectType,
1251 pub if_exists: bool,
1253 pub object_name: ObjectName,
1255 pub drop_mode: AstOption<DropMode>,
1258}
1259
1260impl ParseTo for DropStatement {
1267 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
1268 impl_parse_to!(object_type: ObjectType, p);
1269 impl_parse_to!(if_exists => [Keyword::IF, Keyword::EXISTS], p);
1270 let object_name = p.parse_object_name()?;
1271 impl_parse_to!(drop_mode: AstOption<DropMode>, p);
1272 Ok(Self {
1273 object_type,
1274 if_exists,
1275 object_name,
1276 drop_mode,
1277 })
1278 }
1279}
1280
1281impl fmt::Display for DropStatement {
1282 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1283 let mut v: Vec<String> = vec![];
1284 impl_fmt_display!(object_type, v, self);
1285 impl_fmt_display!(if_exists => [Keyword::IF, Keyword::EXISTS], v, self);
1286 impl_fmt_display!(object_name, v, self);
1287 impl_fmt_display!(drop_mode, v, self);
1288 v.iter().join(" ").fmt(f)
1289 }
1290}
1291
1292#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1293#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
1294pub enum DropMode {
1295 Cascade,
1296 Restrict,
1297}
1298
1299impl ParseTo for DropMode {
1300 fn parse_to(parser: &mut Parser<'_>) -> ModalResult<Self> {
1301 let drop_mode = if parser.parse_keyword(Keyword::CASCADE) {
1302 DropMode::Cascade
1303 } else if parser.parse_keyword(Keyword::RESTRICT) {
1304 DropMode::Restrict
1305 } else {
1306 return parser.expected("CASCADE | RESTRICT");
1307 };
1308 Ok(drop_mode)
1309 }
1310}
1311
1312impl fmt::Display for DropMode {
1313 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1314 f.write_str(match self {
1315 DropMode::Cascade => "CASCADE",
1316 DropMode::Restrict => "RESTRICT",
1317 })
1318 }
1319}