1use core::fmt;
16use core::fmt::Formatter;
17use std::fmt::Write;
18
19use itertools::Itertools;
20#[cfg(feature = "serde")]
21use serde::{Deserialize, Serialize};
22use winnow::ModalResult;
23
24use super::ddl::SourceWatermark;
25use super::legacy_source::{CompatibleFormatEncode, parse_format_encode};
26use super::{EmitMode, Ident, ObjectType, Query, Value};
27use crate::ast::{
28 ColumnDef, ObjectName, SqlOption, TableConstraint, display_comma_separated, display_separated,
29};
30use crate::keywords::Keyword;
31use crate::parser::{IncludeOption, IsOptional, Parser};
32use crate::parser_err;
33use crate::parser_v2::literal_u32;
34use crate::tokenizer::Token;
35
36pub trait ParseTo: Sized {
38 fn parse_to(parser: &mut Parser<'_>) -> ModalResult<Self>;
39}
40
41#[macro_export]
42macro_rules! impl_parse_to {
43 () => {};
44 ($field:ident : $field_type:ty, $parser:ident) => {
45 let $field = <$field_type>::parse_to($parser)?;
46 };
47 ($field:ident => [$($arr:tt)+], $parser:ident) => {
48 let $field = $parser.parse_keywords(&[$($arr)+]);
49 };
50 ([$($arr:tt)+], $parser:ident) => {
51 $parser.expect_keywords(&[$($arr)+])?;
52 };
53}
54
55#[macro_export]
56macro_rules! impl_fmt_display {
57 () => {};
58 ($field:ident, $v:ident, $self:ident) => {{
59 let s = format!("{}", $self.$field);
60 if !s.is_empty() {
61 $v.push(s);
62 }
63 }};
64 ($field:ident => [$($arr:tt)+], $v:ident, $self:ident) => {
65 if $self.$field {
66 $v.push(format!("{}", display_separated(&[$($arr)+], " ")));
67 }
68 };
69 ([$($arr:tt)+], $v:ident) => {
70 $v.push(format!("{}", display_separated(&[$($arr)+], " ")));
71 };
72}
73
74#[derive(Debug, Clone, PartialEq, Eq, Hash)]
83#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
84pub struct CreateSourceStatement {
85 pub temporary: bool,
86 pub if_not_exists: bool,
87 pub columns: Vec<ColumnDef>,
88 pub wildcard_idx: Option<usize>,
90 pub constraints: Vec<TableConstraint>,
91 pub source_name: ObjectName,
92 pub with_properties: WithProperties,
93 pub format_encode: CompatibleFormatEncode,
94 pub source_watermarks: Vec<SourceWatermark>,
95 pub include_column_options: IncludeOption,
96}
97
98#[derive(Debug, Clone, PartialEq, Eq, Hash)]
102#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
103pub enum Format {
104 Native,
107 None,
109 Debezium,
111 DebeziumMongo,
113 Maxwell,
115 Canal,
117 Upsert,
119 Plain,
121}
122
123impl fmt::Display for Format {
125 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
126 write!(
127 f,
128 "{}",
129 match self {
130 Format::Native => "NATIVE",
131 Format::Debezium => "DEBEZIUM",
132 Format::DebeziumMongo => "DEBEZIUM_MONGO",
133 Format::Maxwell => "MAXWELL",
134 Format::Canal => "CANAL",
135 Format::Upsert => "UPSERT",
136 Format::Plain => "PLAIN",
137 Format::None => "NONE",
138 }
139 )
140 }
141}
142
143impl Format {
144 pub fn from_keyword(s: &str) -> ModalResult<Self> {
145 Ok(match s {
146 "DEBEZIUM" => Format::Debezium,
147 "DEBEZIUM_MONGO" => Format::DebeziumMongo,
148 "MAXWELL" => Format::Maxwell,
149 "CANAL" => Format::Canal,
150 "PLAIN" => Format::Plain,
151 "UPSERT" => Format::Upsert,
152 "NATIVE" => Format::Native,
153 "NONE" => Format::None,
154 _ => parser_err!(
155 "expected CANAL | PROTOBUF | DEBEZIUM | MAXWELL | PLAIN | NATIVE | NONE after FORMAT"
156 ),
157 })
158 }
159}
160
161#[derive(Debug, Clone, PartialEq, Eq, Hash)]
163#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
164pub enum Encode {
165 Avro, Csv, Protobuf, Json, Bytes, None,
172 Text, Native,
176 Template,
177 Parquet,
178}
179
180impl fmt::Display for Encode {
182 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
183 write!(
184 f,
185 "{}",
186 match self {
187 Encode::Avro => "AVRO",
188 Encode::Csv => "CSV",
189 Encode::Protobuf => "PROTOBUF",
190 Encode::Json => "JSON",
191 Encode::Bytes => "BYTES",
192 Encode::Native => "NATIVE",
193 Encode::Template => "TEMPLATE",
194 Encode::None => "NONE",
195 Encode::Parquet => "PARQUET",
196 Encode::Text => "TEXT",
197 }
198 )
199 }
200}
201
202impl Encode {
203 pub fn from_keyword(s: &str) -> ModalResult<Self> {
204 Ok(match s {
205 "AVRO" => Encode::Avro,
206 "TEXT" => Encode::Text,
207 "BYTES" => Encode::Bytes,
208 "CSV" => Encode::Csv,
209 "PROTOBUF" => Encode::Protobuf,
210 "JSON" => Encode::Json,
211 "TEMPLATE" => Encode::Template,
212 "PARQUET" => Encode::Parquet,
213 "NATIVE" => Encode::Native,
214 "NONE" => Encode::None,
215 _ => parser_err!(
216 "expected AVRO | BYTES | CSV | PROTOBUF | JSON | NATIVE | TEMPLATE | PARQUET | NONE after Encode"
217 ),
218 })
219 }
220}
221
222#[derive(Debug, Clone, PartialEq, Eq, Hash)]
224#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
225pub struct FormatEncodeOptions {
226 pub format: Format,
227 pub row_encode: Encode,
228 pub row_options: Vec<SqlOption>,
229
230 pub key_encode: Option<Encode>,
231}
232
233impl Parser<'_> {
234 fn peek_format_encode_format(&mut self) -> bool {
236 (self.peek_nth_any_of_keywords(0, &[Keyword::ROW])
237 && self.peek_nth_any_of_keywords(1, &[Keyword::FORMAT])) || self.peek_nth_any_of_keywords(0, &[Keyword::FORMAT]) }
240
241 pub fn parse_format_encode_with_connector(
243 &mut self,
244 connector: &str,
245 cdc_source_job: bool,
246 ) -> ModalResult<CompatibleFormatEncode> {
247 if connector.contains("-cdc") {
252 let expected = if cdc_source_job {
253 FormatEncodeOptions::plain_json()
254 } else if connector.contains("mongodb") {
255 FormatEncodeOptions::debezium_mongo_json()
256 } else {
257 FormatEncodeOptions::debezium_json()
258 };
259
260 if self.peek_format_encode_format() {
261 let schema = parse_format_encode(self)?.into_v2();
262 if schema != expected {
263 parser_err!(
264 "Row format for CDC connectors should be \
265 either omitted or set to `{expected}`",
266 );
267 }
268 }
269 Ok(expected.into())
270 } else if connector.contains("nexmark") {
271 let expected = FormatEncodeOptions::native();
272 if self.peek_format_encode_format() {
273 let schema = parse_format_encode(self)?.into_v2();
274 if schema != expected {
275 parser_err!(
276 "Row format for nexmark connectors should be \
277 either omitted or set to `{expected}`",
278 );
279 }
280 }
281 Ok(expected.into())
282 } else if connector.contains("datagen") {
283 Ok(if self.peek_format_encode_format() {
284 parse_format_encode(self)?
285 } else {
286 FormatEncodeOptions::native().into()
287 })
288 } else if connector.contains("iceberg") {
289 let expected = FormatEncodeOptions::none();
290 if self.peek_format_encode_format() {
291 let schema = parse_format_encode(self)?.into_v2();
292 if schema != expected {
293 parser_err!(
294 "Row format for iceberg connectors should be \
295 either omitted or set to `{expected}`",
296 );
297 }
298 }
299 Ok(expected.into())
300 } else if connector.contains("webhook") {
301 parser_err!(
302 "Source with webhook connector is not supported. \
303 Please use the `CREATE TABLE ... WITH ...` statement instead.",
304 );
305 } else {
306 Ok(parse_format_encode(self)?)
307 }
308 }
309
310 pub fn parse_schema(&mut self) -> ModalResult<Option<FormatEncodeOptions>> {
312 if !self.parse_keyword(Keyword::FORMAT) {
313 return Ok(None);
314 }
315
316 let id = self.parse_identifier()?;
317 let s = id.value.to_ascii_uppercase();
318 let format = Format::from_keyword(&s)?;
319 self.expect_keyword(Keyword::ENCODE)?;
320 let id = self.parse_identifier()?;
321 let s = id.value.to_ascii_uppercase();
322 let row_encode = Encode::from_keyword(&s)?;
323 let row_options = self.parse_options()?;
324
325 let key_encode = if self.parse_keywords(&[Keyword::KEY, Keyword::ENCODE]) {
326 Some(Encode::from_keyword(
327 self.parse_identifier()?.value.to_ascii_uppercase().as_str(),
328 )?)
329 } else {
330 None
331 };
332
333 Ok(Some(FormatEncodeOptions {
334 format,
335 row_encode,
336 row_options,
337 key_encode,
338 }))
339 }
340}
341
342impl FormatEncodeOptions {
343 pub const fn plain_json() -> Self {
344 FormatEncodeOptions {
345 format: Format::Plain,
346 row_encode: Encode::Json,
347 row_options: Vec::new(),
348 key_encode: None,
349 }
350 }
351
352 pub const fn debezium_json() -> Self {
354 FormatEncodeOptions {
355 format: Format::Debezium,
356 row_encode: Encode::Json,
357 row_options: Vec::new(),
358 key_encode: None,
359 }
360 }
361
362 pub const fn debezium_mongo_json() -> Self {
363 FormatEncodeOptions {
364 format: Format::DebeziumMongo,
365 row_encode: Encode::Json,
366 row_options: Vec::new(),
367 key_encode: None,
368 }
369 }
370
371 pub const fn native() -> Self {
373 FormatEncodeOptions {
374 format: Format::Native,
375 row_encode: Encode::Native,
376 row_options: Vec::new(),
377 key_encode: None,
378 }
379 }
380
381 pub const fn none() -> Self {
384 FormatEncodeOptions {
385 format: Format::None,
386 row_encode: Encode::None,
387 row_options: Vec::new(),
388 key_encode: None,
389 }
390 }
391
392 pub fn row_options(&self) -> &[SqlOption] {
393 self.row_options.as_ref()
394 }
395}
396
397impl fmt::Display for FormatEncodeOptions {
398 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
399 write!(f, "FORMAT {} ENCODE {}", self.format, self.row_encode)?;
400
401 if !self.row_options().is_empty() {
402 write!(f, " ({})", display_comma_separated(self.row_options()))?;
403 }
404
405 if let Some(key_encode) = &self.key_encode {
406 write!(f, " KEY ENCODE {}", key_encode)?;
407 }
408
409 Ok(())
410 }
411}
412
413pub(super) fn fmt_create_items(
414 columns: &[ColumnDef],
415 constraints: &[TableConstraint],
416 watermarks: &[SourceWatermark],
417 wildcard_idx: Option<usize>,
418) -> std::result::Result<String, fmt::Error> {
419 let mut items = String::new();
420 let has_items = !columns.is_empty()
421 || !constraints.is_empty()
422 || !watermarks.is_empty()
423 || wildcard_idx.is_some();
424 has_items.then(|| write!(&mut items, "("));
425
426 if let Some(wildcard_idx) = wildcard_idx {
427 let (columns_l, columns_r) = columns.split_at(wildcard_idx);
428 write!(&mut items, "{}", display_comma_separated(columns_l))?;
429 if !columns_l.is_empty() {
430 write!(&mut items, ", ")?;
431 }
432 write!(&mut items, "{}", Token::Mul)?;
433 if !columns_r.is_empty() {
434 write!(&mut items, ", ")?;
435 }
436 write!(&mut items, "{}", display_comma_separated(columns_r))?;
437 } else {
438 write!(&mut items, "{}", display_comma_separated(columns))?;
439 }
440 let mut leading_items = !columns.is_empty() || wildcard_idx.is_some();
441
442 if leading_items && !constraints.is_empty() {
443 write!(&mut items, ", ")?;
444 }
445 write!(&mut items, "{}", display_comma_separated(constraints))?;
446 leading_items |= !constraints.is_empty();
447
448 if leading_items && !watermarks.is_empty() {
449 write!(&mut items, ", ")?;
450 }
451 write!(&mut items, "{}", display_comma_separated(watermarks))?;
452 has_items.then(|| write!(&mut items, ")"));
456 Ok(items)
457}
458
459impl fmt::Display for CreateSourceStatement {
460 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
461 let mut v: Vec<String> = vec![];
462 impl_fmt_display!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], v, self);
463 impl_fmt_display!(source_name, v, self);
464
465 let items = fmt_create_items(
466 &self.columns,
467 &self.constraints,
468 &self.source_watermarks,
469 self.wildcard_idx,
470 )?;
471 if !items.is_empty() {
472 v.push(items);
473 }
474
475 for item in &self.include_column_options {
476 v.push(format!("{}", item));
477 }
478 impl_fmt_display!(with_properties, v, self);
479 impl_fmt_display!(format_encode, v, self);
480 v.iter().join(" ").fmt(f)
481 }
482}
483
484#[derive(Debug, Clone, PartialEq, Eq, Hash)]
485#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
486pub enum CreateSink {
487 From(ObjectName),
488 AsQuery(Box<Query>),
489}
490
491impl fmt::Display for CreateSink {
492 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
493 match self {
494 Self::From(mv) => write!(f, "FROM {}", mv),
495 Self::AsQuery(query) => write!(f, "AS {}", query),
496 }
497 }
498}
499#[derive(Debug, Clone, PartialEq, Eq, Hash)]
507#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
508pub struct CreateSinkStatement {
509 pub if_not_exists: bool,
510 pub sink_name: ObjectName,
511 pub with_properties: WithProperties,
512 pub sink_from: CreateSink,
513
514 pub columns: Vec<Ident>,
517 pub emit_mode: Option<EmitMode>,
518 pub sink_schema: Option<FormatEncodeOptions>,
519 pub into_table_name: Option<ObjectName>,
520}
521
522impl ParseTo for CreateSinkStatement {
523 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
524 impl_parse_to!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], p);
525 impl_parse_to!(sink_name: ObjectName, p);
526
527 let mut target_spec_columns = Vec::new();
528 let into_table_name = if p.parse_keyword(Keyword::INTO) {
529 impl_parse_to!(into_table_name: ObjectName, p);
530
531 target_spec_columns = p.parse_parenthesized_column_list(IsOptional::Optional)?;
533 Some(into_table_name)
534 } else {
535 None
536 };
537
538 let sink_from = if p.parse_keyword(Keyword::FROM) {
539 impl_parse_to!(from_name: ObjectName, p);
540 CreateSink::From(from_name)
541 } else if p.parse_keyword(Keyword::AS) {
542 let query = Box::new(p.parse_query()?);
543 CreateSink::AsQuery(query)
544 } else {
545 p.expected("FROM or AS after CREATE SINK sink_name")?
546 };
547
548 let emit_mode: Option<EmitMode> = p.parse_emit_mode()?;
549
550 if !p.peek_nth_any_of_keywords(0, &[Keyword::WITH]) && into_table_name.is_none() {
553 p.expected("WITH")?
554 }
555 impl_parse_to!(with_properties: WithProperties, p);
556
557 if with_properties.0.is_empty() && into_table_name.is_none() {
558 parser_err!("sink properties not provided");
559 }
560
561 let sink_schema = p.parse_schema()?;
562
563 Ok(Self {
564 if_not_exists,
565 sink_name,
566 with_properties,
567 sink_from,
568 columns: target_spec_columns,
569 emit_mode,
570 sink_schema,
571 into_table_name,
572 })
573 }
574}
575
576impl fmt::Display for CreateSinkStatement {
577 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
578 let mut v: Vec<String> = vec![];
579 impl_fmt_display!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], v, self);
580 impl_fmt_display!(sink_name, v, self);
581 if let Some(into_table) = &self.into_table_name {
582 impl_fmt_display!([Keyword::INTO], v);
583 impl_fmt_display!([into_table], v);
584 if !self.columns.is_empty() {
585 v.push(format!("({})", display_comma_separated(&self.columns)));
586 }
587 }
588 impl_fmt_display!(sink_from, v, self);
589 if let Some(ref emit_mode) = self.emit_mode {
590 v.push(format!("EMIT {}", emit_mode));
591 }
592 impl_fmt_display!(with_properties, v, self);
593 if let Some(schema) = &self.sink_schema {
594 v.push(format!("{}", schema));
595 }
596 v.iter().join(" ").fmt(f)
597 }
598}
599
600#[derive(Debug, Clone, PartialEq, Eq, Hash)]
608#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
609pub struct CreateSubscriptionStatement {
610 pub if_not_exists: bool,
611 pub subscription_name: ObjectName,
612 pub with_properties: WithProperties,
613 pub subscription_from: ObjectName,
614 }
616
617impl ParseTo for CreateSubscriptionStatement {
618 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
619 impl_parse_to!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], p);
620 impl_parse_to!(subscription_name: ObjectName, p);
621
622 let subscription_from = if p.parse_keyword(Keyword::FROM) {
623 impl_parse_to!(from_name: ObjectName, p);
624 from_name
625 } else {
626 p.expected("FROM after CREATE SUBSCRIPTION subscription_name")?
627 };
628
629 if !p.peek_nth_any_of_keywords(0, &[Keyword::WITH]) {
634 p.expected("WITH")?
635 }
636 impl_parse_to!(with_properties: WithProperties, p);
637
638 if with_properties.0.is_empty() {
639 parser_err!("subscription properties not provided");
640 }
641
642 Ok(Self {
643 if_not_exists,
644 subscription_name,
645 with_properties,
646 subscription_from,
647 })
648 }
649}
650
651impl fmt::Display for CreateSubscriptionStatement {
652 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
653 let mut v: Vec<String> = vec![];
654 impl_fmt_display!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], v, self);
655 impl_fmt_display!(subscription_name, v, self);
656 v.push(format!("FROM {}", self.subscription_from));
657 impl_fmt_display!(with_properties, v, self);
658 v.iter().join(" ").fmt(f)
659 }
660}
661
662#[derive(Debug, Clone, PartialEq, Eq, Hash)]
663#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
664pub enum DeclareCursor {
665 Query(Box<Query>),
666 Subscription(ObjectName, Since),
667}
668
669impl fmt::Display for DeclareCursor {
670 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
671 let mut v: Vec<String> = vec![];
672 match self {
673 DeclareCursor::Query(query) => v.push(format!("{}", query.as_ref())),
674 DeclareCursor::Subscription(name, since) => {
675 v.push(format!("{}", name));
676 v.push(format!("{:?}", since));
677 }
678 }
679 v.iter().join(" ").fmt(f)
680 }
681}
682
683#[derive(Debug, Clone, PartialEq, Eq, Hash)]
693#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
694pub struct DeclareCursorStatement {
695 pub cursor_name: Ident,
696 pub declare_cursor: DeclareCursor,
697}
698
699impl ParseTo for DeclareCursorStatement {
700 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
701 let cursor_name = p.parse_identifier_non_reserved()?;
702
703 let declare_cursor = if !p.parse_keyword(Keyword::SUBSCRIPTION) {
704 p.expect_keyword(Keyword::CURSOR)?;
705 p.expect_keyword(Keyword::FOR)?;
706 DeclareCursor::Query(Box::new(p.parse_query()?))
707 } else {
708 p.expect_keyword(Keyword::CURSOR)?;
709 p.expect_keyword(Keyword::FOR)?;
710 let cursor_for_name = p.parse_object_name()?;
711 let rw_timestamp = p.parse_since()?;
712 DeclareCursor::Subscription(cursor_for_name, rw_timestamp)
713 };
714
715 Ok(Self {
716 cursor_name,
717 declare_cursor,
718 })
719 }
720}
721
722impl fmt::Display for DeclareCursorStatement {
723 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
724 let mut v: Vec<String> = vec![];
725 impl_fmt_display!(cursor_name, v, self);
726 match &self.declare_cursor {
727 DeclareCursor::Query(_) => {
728 v.push("CURSOR FOR ".to_owned());
729 }
730 DeclareCursor::Subscription { .. } => {
731 v.push("SUBSCRIPTION CURSOR FOR ".to_owned());
732 }
733 }
734 impl_fmt_display!(declare_cursor, v, self);
735 v.iter().join(" ").fmt(f)
736 }
737}
738
739#[derive(Debug, Clone, PartialEq, Eq, Hash)]
743#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
744pub struct FetchCursorStatement {
745 pub cursor_name: Ident,
746 pub count: u32,
747 pub with_properties: WithProperties,
748}
749
750impl ParseTo for FetchCursorStatement {
751 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
752 let count = if p.parse_keyword(Keyword::NEXT) {
753 1
754 } else {
755 literal_u32(p)?
756 };
757 p.expect_keyword(Keyword::FROM)?;
758 let cursor_name = p.parse_identifier_non_reserved()?;
759 impl_parse_to!(with_properties: WithProperties, p);
760
761 Ok(Self {
762 cursor_name,
763 count,
764 with_properties,
765 })
766 }
767}
768
769impl fmt::Display for FetchCursorStatement {
770 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
771 let mut v: Vec<String> = vec![];
772 if self.count == 1 {
773 v.push("NEXT ".to_owned());
774 } else {
775 impl_fmt_display!(count, v, self);
776 }
777 v.push("FROM ".to_owned());
778 impl_fmt_display!(cursor_name, v, self);
779 v.iter().join(" ").fmt(f)
780 }
781}
782
783#[derive(Debug, Clone, PartialEq, Eq, Hash)]
787#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
788pub struct CloseCursorStatement {
789 pub cursor_name: Option<Ident>,
790}
791
792impl ParseTo for CloseCursorStatement {
793 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
794 let cursor_name = if p.parse_keyword(Keyword::ALL) {
795 None
796 } else {
797 Some(p.parse_identifier_non_reserved()?)
798 };
799
800 Ok(Self { cursor_name })
801 }
802}
803
804impl fmt::Display for CloseCursorStatement {
805 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
806 let mut v: Vec<String> = vec![];
807 if let Some(cursor_name) = &self.cursor_name {
808 v.push(format!("{}", cursor_name));
809 } else {
810 v.push("ALL".to_owned());
811 }
812 v.iter().join(" ").fmt(f)
813 }
814}
815
816#[derive(Debug, Clone, PartialEq, Eq, Hash)]
822#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
823pub struct CreateConnectionStatement {
824 pub if_not_exists: bool,
825 pub connection_name: ObjectName,
826 pub with_properties: WithProperties,
827}
828
829impl ParseTo for CreateConnectionStatement {
830 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
831 impl_parse_to!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], p);
832 impl_parse_to!(connection_name: ObjectName, p);
833 impl_parse_to!(with_properties: WithProperties, p);
834 if with_properties.0.is_empty() {
835 parser_err!("connection properties not provided");
836 }
837
838 Ok(Self {
839 if_not_exists,
840 connection_name,
841 with_properties,
842 })
843 }
844}
845
846impl fmt::Display for CreateConnectionStatement {
847 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
848 let mut v: Vec<String> = vec![];
849 impl_fmt_display!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], v, self);
850 impl_fmt_display!(connection_name, v, self);
851 impl_fmt_display!(with_properties, v, self);
852 v.iter().join(" ").fmt(f)
853 }
854}
855
856#[derive(Debug, Clone, PartialEq, Eq, Hash)]
857#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
858pub struct CreateSecretStatement {
859 pub if_not_exists: bool,
860 pub secret_name: ObjectName,
861 pub credential: Value,
862 pub with_properties: WithProperties,
863}
864
865impl ParseTo for CreateSecretStatement {
866 fn parse_to(parser: &mut Parser<'_>) -> ModalResult<Self> {
867 impl_parse_to!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], parser);
868 impl_parse_to!(secret_name: ObjectName, parser);
869 impl_parse_to!(with_properties: WithProperties, parser);
870 let mut credential = Value::Null;
871 if parser.parse_keyword(Keyword::AS) {
872 credential = parser.ensure_parse_value()?;
873 }
874 Ok(Self {
875 if_not_exists,
876 secret_name,
877 credential,
878 with_properties,
879 })
880 }
881}
882
883impl fmt::Display for CreateSecretStatement {
884 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
885 let mut v: Vec<String> = vec![];
886 impl_fmt_display!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], v, self);
887 impl_fmt_display!(secret_name, v, self);
888 impl_fmt_display!(with_properties, v, self);
889 if self.credential != Value::Null {
890 v.push("AS".to_owned());
891 impl_fmt_display!(credential, v, self);
892 }
893 v.iter().join(" ").fmt(f)
894 }
895}
896
897#[derive(Debug, Clone, PartialEq, Eq, Hash)]
898#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
899pub struct WithProperties(pub Vec<SqlOption>);
900
901impl ParseTo for WithProperties {
902 fn parse_to(parser: &mut Parser<'_>) -> ModalResult<Self> {
903 Ok(Self(
904 parser.parse_options_with_preceding_keyword(Keyword::WITH)?,
905 ))
906 }
907}
908
909impl fmt::Display for WithProperties {
910 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
911 if !self.0.is_empty() {
912 write!(f, "WITH ({})", display_comma_separated(self.0.as_slice()))
913 } else {
914 Ok(())
915 }
916 }
917}
918
919#[derive(Debug, Clone, PartialEq, Eq, Hash)]
920#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
921pub enum Since {
922 TimestampMsNum(u64),
923 ProcessTime,
924 Begin,
925 Full,
926}
927
928impl fmt::Display for Since {
929 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
930 use Since::*;
931 match self {
932 TimestampMsNum(ts) => write!(f, " SINCE {}", ts),
933 ProcessTime => write!(f, " SINCE PROCTIME()"),
934 Begin => write!(f, " SINCE BEGIN()"),
935 Full => write!(f, " FULL"),
936 }
937 }
938}
939
940#[derive(Debug, Clone, PartialEq, Eq, Hash)]
941#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
942pub struct RowSchemaLocation {
943 pub value: AstString,
944}
945
946impl ParseTo for RowSchemaLocation {
947 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
948 impl_parse_to!([Keyword::ROW, Keyword::SCHEMA, Keyword::LOCATION], p);
949 impl_parse_to!(value: AstString, p);
950 Ok(Self { value })
951 }
952}
953
954impl fmt::Display for RowSchemaLocation {
955 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
956 let mut v = vec![];
957 impl_fmt_display!([Keyword::ROW, Keyword::SCHEMA, Keyword::LOCATION], v);
958 impl_fmt_display!(value, v, self);
959 v.iter().join(" ").fmt(f)
960 }
961}
962
963#[derive(Debug, Clone, PartialEq, Eq, Hash)]
966#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
967pub struct AstString(pub String);
968
969impl ParseTo for AstString {
970 fn parse_to(parser: &mut Parser<'_>) -> ModalResult<Self> {
971 Ok(Self(parser.parse_literal_string()?))
972 }
973}
974
975impl fmt::Display for AstString {
976 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
977 write!(f, "'{}'", self.0)
978 }
979}
980
981#[derive(Debug, Clone, PartialEq, Eq, Hash)]
984#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
985pub enum AstOption<T> {
986 None,
988 Some(T),
990}
991
992impl<T: ParseTo> ParseTo for AstOption<T> {
993 fn parse_to(parser: &mut Parser<'_>) -> ModalResult<Self> {
994 match T::parse_to(parser) {
995 Ok(t) => Ok(AstOption::Some(t)),
996 Err(_) => Ok(AstOption::None),
997 }
998 }
999}
1000
1001impl<T: fmt::Display> fmt::Display for AstOption<T> {
1002 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1003 match &self {
1004 AstOption::Some(t) => t.fmt(f),
1005 AstOption::None => Ok(()),
1006 }
1007 }
1008}
1009
1010impl<T> From<AstOption<T>> for Option<T> {
1011 fn from(val: AstOption<T>) -> Self {
1012 match val {
1013 AstOption::Some(t) => Some(t),
1014 AstOption::None => None,
1015 }
1016 }
1017}
1018
1019#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1020#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
1021pub struct CreateUserStatement {
1022 pub user_name: ObjectName,
1023 pub with_options: UserOptions,
1024}
1025
1026#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1027#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
1028pub struct AlterUserStatement {
1029 pub user_name: ObjectName,
1030 pub mode: AlterUserMode,
1031}
1032
1033#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1034#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
1035pub enum AlterUserMode {
1036 Options(UserOptions),
1037 Rename(ObjectName),
1038}
1039
1040#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1041#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
1042pub enum UserOption {
1043 SuperUser,
1044 NoSuperUser,
1045 CreateDB,
1046 NoCreateDB,
1047 CreateUser,
1048 NoCreateUser,
1049 Login,
1050 NoLogin,
1051 EncryptedPassword(AstString),
1052 Password(Option<AstString>),
1053 OAuth(Vec<SqlOption>),
1054}
1055
1056impl fmt::Display for UserOption {
1057 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1058 match self {
1059 UserOption::SuperUser => write!(f, "SUPERUSER"),
1060 UserOption::NoSuperUser => write!(f, "NOSUPERUSER"),
1061 UserOption::CreateDB => write!(f, "CREATEDB"),
1062 UserOption::NoCreateDB => write!(f, "NOCREATEDB"),
1063 UserOption::CreateUser => write!(f, "CREATEUSER"),
1064 UserOption::NoCreateUser => write!(f, "NOCREATEUSER"),
1065 UserOption::Login => write!(f, "LOGIN"),
1066 UserOption::NoLogin => write!(f, "NOLOGIN"),
1067 UserOption::EncryptedPassword(p) => write!(f, "ENCRYPTED PASSWORD {}", p),
1068 UserOption::Password(None) => write!(f, "PASSWORD NULL"),
1069 UserOption::Password(Some(p)) => write!(f, "PASSWORD {}", p),
1070 UserOption::OAuth(options) => {
1071 write!(f, "({})", display_comma_separated(options.as_slice()))
1072 }
1073 }
1074 }
1075}
1076
1077#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1078#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
1079pub struct UserOptions(pub Vec<UserOption>);
1080
1081#[derive(Default)]
1082struct UserOptionsBuilder {
1083 super_user: Option<UserOption>,
1084 create_db: Option<UserOption>,
1085 create_user: Option<UserOption>,
1086 login: Option<UserOption>,
1087 password: Option<UserOption>,
1088}
1089
1090impl UserOptionsBuilder {
1091 fn build(self) -> UserOptions {
1092 let mut options = vec![];
1093 if let Some(option) = self.super_user {
1094 options.push(option);
1095 }
1096 if let Some(option) = self.create_db {
1097 options.push(option);
1098 }
1099 if let Some(option) = self.create_user {
1100 options.push(option);
1101 }
1102 if let Some(option) = self.login {
1103 options.push(option);
1104 }
1105 if let Some(option) = self.password {
1106 options.push(option);
1107 }
1108 UserOptions(options)
1109 }
1110}
1111
1112impl ParseTo for UserOptions {
1113 fn parse_to(parser: &mut Parser<'_>) -> ModalResult<Self> {
1114 let mut builder = UserOptionsBuilder::default();
1115 let add_option = |item: &mut Option<UserOption>, user_option| {
1116 let old_value = item.replace(user_option);
1117 if old_value.is_some() {
1118 parser_err!("conflicting or redundant options");
1119 }
1120 Ok(())
1121 };
1122 let _ = parser.parse_keyword(Keyword::WITH);
1123 loop {
1124 let token = parser.peek_token();
1125 if token == Token::EOF || token == Token::SemiColon {
1126 break;
1127 }
1128
1129 if let Token::Word(ref w) = token.token {
1130 let checkpoint = *parser;
1131 parser.next_token();
1132 let (item_mut_ref, user_option) = match w.keyword {
1133 Keyword::SUPERUSER => (&mut builder.super_user, UserOption::SuperUser),
1134 Keyword::NOSUPERUSER => (&mut builder.super_user, UserOption::NoSuperUser),
1135 Keyword::CREATEDB => (&mut builder.create_db, UserOption::CreateDB),
1136 Keyword::NOCREATEDB => (&mut builder.create_db, UserOption::NoCreateDB),
1137 Keyword::CREATEUSER => (&mut builder.create_user, UserOption::CreateUser),
1138 Keyword::NOCREATEUSER => (&mut builder.create_user, UserOption::NoCreateUser),
1139 Keyword::LOGIN => (&mut builder.login, UserOption::Login),
1140 Keyword::NOLOGIN => (&mut builder.login, UserOption::NoLogin),
1141 Keyword::PASSWORD => {
1142 if parser.parse_keyword(Keyword::NULL) {
1143 (&mut builder.password, UserOption::Password(None))
1144 } else {
1145 (
1146 &mut builder.password,
1147 UserOption::Password(Some(AstString::parse_to(parser)?)),
1148 )
1149 }
1150 }
1151 Keyword::ENCRYPTED => {
1152 parser.expect_keyword(Keyword::PASSWORD)?;
1153 (
1154 &mut builder.password,
1155 UserOption::EncryptedPassword(AstString::parse_to(parser)?),
1156 )
1157 }
1158 Keyword::OAUTH => {
1159 let options = parser.parse_options()?;
1160 (&mut builder.password, UserOption::OAuth(options))
1161 }
1162 _ => {
1163 parser.expected_at(
1164 checkpoint,
1165 "SUPERUSER | NOSUPERUSER | CREATEDB | NOCREATEDB | LOGIN \
1166 | NOLOGIN | CREATEUSER | NOCREATEUSER | [ENCRYPTED] PASSWORD | NULL | OAUTH",
1167 )?;
1168 unreachable!()
1169 }
1170 };
1171 add_option(item_mut_ref, user_option)?;
1172 } else {
1173 parser.expected(
1174 "SUPERUSER | NOSUPERUSER | CREATEDB | NOCREATEDB | LOGIN | NOLOGIN \
1175 | CREATEUSER | NOCREATEUSER | [ENCRYPTED] PASSWORD | NULL | OAUTH",
1176 )?
1177 }
1178 }
1179 Ok(builder.build())
1180 }
1181}
1182
1183impl fmt::Display for UserOptions {
1184 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1185 if !self.0.is_empty() {
1186 write!(f, "WITH {}", display_separated(self.0.as_slice(), " "))
1187 } else {
1188 Ok(())
1189 }
1190 }
1191}
1192
1193impl ParseTo for CreateUserStatement {
1194 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
1195 impl_parse_to!(user_name: ObjectName, p);
1196 impl_parse_to!(with_options: UserOptions, p);
1197
1198 Ok(CreateUserStatement {
1199 user_name,
1200 with_options,
1201 })
1202 }
1203}
1204
1205impl fmt::Display for CreateUserStatement {
1206 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1207 let mut v: Vec<String> = vec![];
1208 impl_fmt_display!(user_name, v, self);
1209 impl_fmt_display!(with_options, v, self);
1210 v.iter().join(" ").fmt(f)
1211 }
1212}
1213
1214impl fmt::Display for AlterUserMode {
1215 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1216 match self {
1217 AlterUserMode::Options(options) => {
1218 write!(f, "{}", options)
1219 }
1220 AlterUserMode::Rename(new_name) => {
1221 write!(f, "RENAME TO {}", new_name)
1222 }
1223 }
1224 }
1225}
1226
1227impl fmt::Display for AlterUserStatement {
1228 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1229 let mut v: Vec<String> = vec![];
1230 impl_fmt_display!(user_name, v, self);
1231 impl_fmt_display!(mode, v, self);
1232 v.iter().join(" ").fmt(f)
1233 }
1234}
1235
1236impl ParseTo for AlterUserStatement {
1237 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
1238 impl_parse_to!(user_name: ObjectName, p);
1239 impl_parse_to!(mode: AlterUserMode, p);
1240
1241 Ok(AlterUserStatement { user_name, mode })
1242 }
1243}
1244
1245impl ParseTo for AlterUserMode {
1246 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
1247 if p.parse_keyword(Keyword::RENAME) {
1248 p.expect_keyword(Keyword::TO)?;
1249 impl_parse_to!(new_name: ObjectName, p);
1250 Ok(AlterUserMode::Rename(new_name))
1251 } else {
1252 impl_parse_to!(with_options: UserOptions, p);
1253 Ok(AlterUserMode::Options(with_options))
1254 }
1255 }
1256}
1257
1258#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1259#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
1260pub struct DropStatement {
1261 pub object_type: ObjectType,
1263 pub if_exists: bool,
1265 pub object_name: ObjectName,
1267 pub drop_mode: AstOption<DropMode>,
1270}
1271
1272impl ParseTo for DropStatement {
1279 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
1280 impl_parse_to!(object_type: ObjectType, p);
1281 impl_parse_to!(if_exists => [Keyword::IF, Keyword::EXISTS], p);
1282 let object_name = p.parse_object_name()?;
1283 impl_parse_to!(drop_mode: AstOption<DropMode>, p);
1284 Ok(Self {
1285 object_type,
1286 if_exists,
1287 object_name,
1288 drop_mode,
1289 })
1290 }
1291}
1292
1293impl fmt::Display for DropStatement {
1294 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1295 let mut v: Vec<String> = vec![];
1296 impl_fmt_display!(object_type, v, self);
1297 impl_fmt_display!(if_exists => [Keyword::IF, Keyword::EXISTS], v, self);
1298 impl_fmt_display!(object_name, v, self);
1299 impl_fmt_display!(drop_mode, v, self);
1300 v.iter().join(" ").fmt(f)
1301 }
1302}
1303
1304#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1305#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
1306pub enum DropMode {
1307 Cascade,
1308 Restrict,
1309}
1310
1311impl ParseTo for DropMode {
1312 fn parse_to(parser: &mut Parser<'_>) -> ModalResult<Self> {
1313 let drop_mode = if parser.parse_keyword(Keyword::CASCADE) {
1314 DropMode::Cascade
1315 } else if parser.parse_keyword(Keyword::RESTRICT) {
1316 DropMode::Restrict
1317 } else {
1318 return parser.expected("CASCADE | RESTRICT");
1319 };
1320 Ok(drop_mode)
1321 }
1322}
1323
1324impl fmt::Display for DropMode {
1325 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1326 f.write_str(match self {
1327 DropMode::Cascade => "CASCADE",
1328 DropMode::Restrict => "RESTRICT",
1329 })
1330 }
1331}