1use std::fmt;
16use std::fmt::{Formatter, Write};
17
18use itertools::Itertools;
19use winnow::ModalResult;
20
21use super::ddl::SourceWatermark;
22use super::legacy_source::{CompatibleFormatEncode, parse_format_encode};
23use super::{EmitMode, Ident, ObjectType, Query, Value};
24use crate::ast::{
25 ColumnDef, ObjectName, REDACT_SQL_OPTION_KEYWORDS, SqlOption, TableConstraint,
26 display_comma_separated, display_separated,
27};
28use crate::keywords::Keyword;
29use crate::parser::{IncludeOption, IsOptional, Parser};
30use crate::parser_err;
31use crate::parser_v2::literal_u32;
32use crate::tokenizer::Token;
33
34pub trait ParseTo: Sized {
36 fn parse_to(parser: &mut Parser<'_>) -> ModalResult<Self>;
37}
38
39#[macro_export]
40macro_rules! impl_parse_to {
41 () => {};
42 ($field:ident : $field_type:ty, $parser:ident) => {
43 let $field = <$field_type>::parse_to($parser)?;
44 };
45 ($field:ident => [$($arr:tt)+], $parser:ident) => {
46 let $field = $parser.parse_keywords(&[$($arr)+]);
47 };
48 ([$($arr:tt)+], $parser:ident) => {
49 $parser.expect_keywords(&[$($arr)+])?;
50 };
51}
52
53#[macro_export]
54macro_rules! impl_fmt_display {
55 () => {};
56 ($field:ident, $v:ident, $self:ident) => {{
57 let s = format!("{}", $self.$field);
58 if !s.is_empty() {
59 $v.push(s);
60 }
61 }};
62 ($field:ident => [$($arr:tt)+], $v:ident, $self:ident) => {
63 if $self.$field {
64 $v.push(format!("{}", display_separated(&[$($arr)+], " ")));
65 }
66 };
67 ([$($arr:tt)+], $v:ident) => {
68 $v.push(format!("{}", display_separated(&[$($arr)+], " ")));
69 };
70}
71
72#[derive(Debug, Clone, PartialEq, Eq, Hash)]
81pub struct CreateSourceStatement {
82 pub temporary: bool,
83 pub if_not_exists: bool,
84 pub columns: Vec<ColumnDef>,
85 pub wildcard_idx: Option<usize>,
87 pub constraints: Vec<TableConstraint>,
88 pub source_name: ObjectName,
89 pub with_properties: WithProperties,
90 pub format_encode: CompatibleFormatEncode,
91 pub source_watermarks: Vec<SourceWatermark>,
92 pub include_column_options: IncludeOption,
93}
94
95#[derive(Debug, Clone, PartialEq, Eq, Hash)]
99pub enum Format {
100 Native,
103 None,
105 Debezium,
107 DebeziumMongo,
109 Maxwell,
111 Canal,
113 Upsert,
115 Plain,
117}
118
119impl fmt::Display for Format {
121 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
122 write!(
123 f,
124 "{}",
125 match self {
126 Format::Native => "NATIVE",
127 Format::Debezium => "DEBEZIUM",
128 Format::DebeziumMongo => "DEBEZIUM_MONGO",
129 Format::Maxwell => "MAXWELL",
130 Format::Canal => "CANAL",
131 Format::Upsert => "UPSERT",
132 Format::Plain => "PLAIN",
133 Format::None => "NONE",
134 }
135 )
136 }
137}
138
139impl Format {
140 pub fn from_keyword(s: &str) -> ModalResult<Self> {
141 Ok(match s {
142 "DEBEZIUM" => Format::Debezium,
143 "DEBEZIUM_MONGO" => Format::DebeziumMongo,
144 "MAXWELL" => Format::Maxwell,
145 "CANAL" => Format::Canal,
146 "PLAIN" => Format::Plain,
147 "UPSERT" => Format::Upsert,
148 "NATIVE" => Format::Native,
149 "NONE" => Format::None,
150 _ => parser_err!("expected PROTOBUF | DEBEZIUM | PLAIN | NATIVE | NONE after FORMAT"),
151 })
152 }
153}
154
155#[derive(Debug, Clone, PartialEq, Eq, Hash)]
157pub enum Encode {
158 Avro, Csv, Protobuf, Json, Bytes, None,
165 Text, Native,
169 Template,
170 Parquet,
171}
172
173impl fmt::Display for Encode {
175 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
176 write!(
177 f,
178 "{}",
179 match self {
180 Encode::Avro => "AVRO",
181 Encode::Csv => "CSV",
182 Encode::Protobuf => "PROTOBUF",
183 Encode::Json => "JSON",
184 Encode::Bytes => "BYTES",
185 Encode::Native => "NATIVE",
186 Encode::Template => "TEMPLATE",
187 Encode::None => "NONE",
188 Encode::Parquet => "PARQUET",
189 Encode::Text => "TEXT",
190 }
191 )
192 }
193}
194
195impl Encode {
196 pub fn from_keyword(s: &str) -> ModalResult<Self> {
197 Ok(match s {
198 "AVRO" => Encode::Avro,
199 "TEXT" => Encode::Text,
200 "BYTES" => Encode::Bytes,
201 "CSV" => Encode::Csv,
202 "PROTOBUF" => Encode::Protobuf,
203 "JSON" => Encode::Json,
204 "TEMPLATE" => Encode::Template,
205 "PARQUET" => Encode::Parquet,
206 "NATIVE" => Encode::Native,
207 "NONE" => Encode::None,
208 _ => parser_err!(
209 "expected AVRO | BYTES | CSV | PROTOBUF | JSON | NATIVE | TEMPLATE | PARQUET | NONE after Encode"
210 ),
211 })
212 }
213}
214
215#[derive(Debug, Clone, PartialEq, Eq, Hash)]
217pub struct FormatEncodeOptions {
218 pub format: Format,
219 pub row_encode: Encode,
220 pub row_options: Vec<SqlOption>,
221
222 pub key_encode: Option<Encode>,
223}
224
225impl Parser<'_> {
226 fn peek_format_encode_format(&mut self) -> bool {
228 (self.peek_nth_any_of_keywords(0, &[Keyword::ROW])
229 && self.peek_nth_any_of_keywords(1, &[Keyword::FORMAT])) || self.peek_nth_any_of_keywords(0, &[Keyword::FORMAT]) }
232
233 pub fn parse_format_encode_with_connector(
235 &mut self,
236 connector: &str,
237 cdc_source_job: bool,
238 ) -> ModalResult<CompatibleFormatEncode> {
239 if connector.contains("-cdc") {
244 let expected = if cdc_source_job {
245 FormatEncodeOptions::plain_json()
246 } else if connector.contains("mongodb") {
247 FormatEncodeOptions::debezium_mongo_json()
248 } else {
249 FormatEncodeOptions::debezium_json()
250 };
251
252 if self.peek_format_encode_format() {
253 let schema = parse_format_encode(self)?.into_v2();
254 if schema != expected {
255 parser_err!(
256 "Row format for CDC connectors should be \
257 either omitted or set to `{expected}`",
258 );
259 }
260 }
261 Ok(expected.into())
262 } else if connector.contains("nexmark") {
263 let expected = FormatEncodeOptions::native();
264 if self.peek_format_encode_format() {
265 let schema = parse_format_encode(self)?.into_v2();
266 if schema != expected {
267 parser_err!(
268 "Row format for nexmark connectors should be \
269 either omitted or set to `{expected}`",
270 );
271 }
272 }
273 Ok(expected.into())
274 } else if connector.contains("datagen") {
275 Ok(if self.peek_format_encode_format() {
276 parse_format_encode(self)?
277 } else {
278 FormatEncodeOptions::native().into()
279 })
280 } else if connector.contains("iceberg") || connector.contains("adbc_snowflake") {
281 let expected = FormatEncodeOptions::none();
282 if self.peek_format_encode_format() {
283 let schema = parse_format_encode(self)?.into_v2();
284 if schema != expected {
285 parser_err!(
286 "Row format for iceberg connectors should be \
287 either omitted or set to `{expected}`",
288 );
289 }
290 }
291 Ok(expected.into())
292 } else if connector.contains("webhook") {
293 parser_err!(
294 "Source with webhook connector is not supported. \
295 Please use the `CREATE TABLE ... WITH ...` statement instead.",
296 );
297 } else {
298 Ok(parse_format_encode(self)?)
299 }
300 }
301
302 pub fn parse_schema(&mut self) -> ModalResult<Option<FormatEncodeOptions>> {
304 if !self.parse_keyword(Keyword::FORMAT) {
305 return Ok(None);
306 }
307
308 let id = self.parse_identifier()?;
309 let s = id.value.to_ascii_uppercase();
310 let format = Format::from_keyword(&s)?;
311 self.expect_keyword(Keyword::ENCODE)?;
312 let id = self.parse_identifier()?;
313 let s = id.value.to_ascii_uppercase();
314 let row_encode = Encode::from_keyword(&s)?;
315 let row_options = self.parse_options()?;
316
317 let key_encode = if self.parse_keywords(&[Keyword::KEY, Keyword::ENCODE]) {
318 Some(Encode::from_keyword(
319 self.parse_identifier()?.value.to_ascii_uppercase().as_str(),
320 )?)
321 } else {
322 None
323 };
324
325 Ok(Some(FormatEncodeOptions {
326 format,
327 row_encode,
328 row_options,
329 key_encode,
330 }))
331 }
332}
333
334impl FormatEncodeOptions {
335 pub const fn plain_json() -> Self {
336 FormatEncodeOptions {
337 format: Format::Plain,
338 row_encode: Encode::Json,
339 row_options: Vec::new(),
340 key_encode: None,
341 }
342 }
343
344 pub const fn debezium_json() -> Self {
346 FormatEncodeOptions {
347 format: Format::Debezium,
348 row_encode: Encode::Json,
349 row_options: Vec::new(),
350 key_encode: None,
351 }
352 }
353
354 pub const fn debezium_mongo_json() -> Self {
355 FormatEncodeOptions {
356 format: Format::DebeziumMongo,
357 row_encode: Encode::Json,
358 row_options: Vec::new(),
359 key_encode: None,
360 }
361 }
362
363 pub const fn native() -> Self {
365 FormatEncodeOptions {
366 format: Format::Native,
367 row_encode: Encode::Native,
368 row_options: Vec::new(),
369 key_encode: None,
370 }
371 }
372
373 pub const fn none() -> Self {
376 FormatEncodeOptions {
377 format: Format::None,
378 row_encode: Encode::None,
379 row_options: Vec::new(),
380 key_encode: None,
381 }
382 }
383
384 pub fn row_options(&self) -> &[SqlOption] {
385 self.row_options.as_ref()
386 }
387}
388
389impl fmt::Display for FormatEncodeOptions {
390 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
391 write!(f, "FORMAT {} ENCODE {}", self.format, self.row_encode)?;
392
393 if !self.row_options().is_empty() {
394 write!(f, " ({})", display_comma_separated(self.row_options()))?;
395 }
396
397 if let Some(key_encode) = &self.key_encode {
398 write!(f, " KEY ENCODE {}", key_encode)?;
399 }
400
401 Ok(())
402 }
403}
404
405pub(super) fn fmt_create_items(
406 columns: &[ColumnDef],
407 constraints: &[TableConstraint],
408 watermarks: &[SourceWatermark],
409 wildcard_idx: Option<usize>,
410) -> std::result::Result<String, fmt::Error> {
411 let mut items = String::new();
412 let has_items = !columns.is_empty()
413 || !constraints.is_empty()
414 || !watermarks.is_empty()
415 || wildcard_idx.is_some();
416 has_items.then(|| write!(&mut items, "("));
417
418 if let Some(wildcard_idx) = wildcard_idx {
419 let (columns_l, columns_r) = columns.split_at(wildcard_idx);
420 write!(&mut items, "{}", display_comma_separated(columns_l))?;
421 if !columns_l.is_empty() {
422 write!(&mut items, ", ")?;
423 }
424 write!(&mut items, "{}", Token::Mul)?;
425 if !columns_r.is_empty() {
426 write!(&mut items, ", ")?;
427 }
428 write!(&mut items, "{}", display_comma_separated(columns_r))?;
429 } else {
430 write!(&mut items, "{}", display_comma_separated(columns))?;
431 }
432 let mut leading_items = !columns.is_empty() || wildcard_idx.is_some();
433
434 if leading_items && !constraints.is_empty() {
435 write!(&mut items, ", ")?;
436 }
437 write!(&mut items, "{}", display_comma_separated(constraints))?;
438 leading_items |= !constraints.is_empty();
439
440 if leading_items && !watermarks.is_empty() {
441 write!(&mut items, ", ")?;
442 }
443 write!(&mut items, "{}", display_comma_separated(watermarks))?;
444 has_items.then(|| write!(&mut items, ")"));
448 Ok(items)
449}
450
451impl fmt::Display for CreateSourceStatement {
452 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
453 let mut v: Vec<String> = vec![];
454 impl_fmt_display!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], v, self);
455 impl_fmt_display!(source_name, v, self);
456
457 let items = fmt_create_items(
458 &self.columns,
459 &self.constraints,
460 &self.source_watermarks,
461 self.wildcard_idx,
462 )?;
463 if !items.is_empty() {
464 v.push(items);
465 }
466
467 for item in &self.include_column_options {
468 v.push(format!("{}", item));
469 }
470
471 let is_cdc_source = self.with_properties.0.iter().any(|option| {
473 option.name.real_value().eq_ignore_ascii_case("connector")
474 && option.value.to_string().contains("cdc")
475 });
476
477 impl_fmt_display!(with_properties, v, self);
478 if !is_cdc_source {
479 impl_fmt_display!(format_encode, v, self);
480 }
481 v.iter().join(" ").fmt(f)
482 }
483}
484
485#[derive(Debug, Clone, PartialEq, Eq, Hash)]
486pub enum CreateSink {
487 From(ObjectName),
488 AsQuery(Box<Query>),
489}
490
491impl fmt::Display for CreateSink {
492 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
493 match self {
494 Self::From(mv) => write!(f, "FROM {}", mv),
495 Self::AsQuery(query) => write!(f, "AS {}", query),
496 }
497 }
498}
499#[derive(Debug, Clone, PartialEq, Eq, Hash)]
507pub struct CreateSinkStatement {
508 pub or_replace: bool,
509 pub if_not_exists: bool,
510 pub sink_name: ObjectName,
511 pub with_properties: WithProperties,
512 pub sink_from: CreateSink,
513
514 pub columns: Vec<Ident>,
517 pub emit_mode: Option<EmitMode>,
518 pub sink_schema: Option<FormatEncodeOptions>,
519 pub into_table_name: Option<ObjectName>,
520}
521
522impl ParseTo for CreateSinkStatement {
523 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
524 Self::parse_to_with_or_replace(p, false)
525 }
526}
527
528impl CreateSinkStatement {
529 pub fn parse_to_with_or_replace(p: &mut Parser<'_>, or_replace: bool) -> ModalResult<Self> {
530 impl_parse_to!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], p);
531 impl_parse_to!(sink_name: ObjectName, p);
532
533 let mut target_spec_columns = Vec::new();
534 let into_table_name = if p.parse_keyword(Keyword::INTO) {
535 impl_parse_to!(into_table_name: ObjectName, p);
536
537 target_spec_columns = p.parse_parenthesized_column_list(IsOptional::Optional)?;
539 Some(into_table_name)
540 } else {
541 None
542 };
543
544 let sink_from = if p.parse_keyword(Keyword::FROM) {
545 impl_parse_to!(from_name: ObjectName, p);
546 CreateSink::From(from_name)
547 } else if p.parse_keyword(Keyword::AS) {
548 let query = Box::new(p.parse_query()?);
549 CreateSink::AsQuery(query)
550 } else {
551 p.expected(if or_replace {
552 "FROM or AS after REPLACE SINK sink_name"
553 } else {
554 "FROM or AS after CREATE SINK sink_name"
555 })?
556 };
557
558 let emit_mode: Option<EmitMode> = p.parse_emit_mode()?;
559
560 if !p.peek_nth_any_of_keywords(0, &[Keyword::WITH]) && into_table_name.is_none() {
563 p.expected("WITH")?
564 }
565 impl_parse_to!(with_properties: WithProperties, p);
566
567 if with_properties.0.is_empty() && into_table_name.is_none() {
568 parser_err!("sink properties not provided");
569 }
570
571 let sink_schema = p.parse_schema()?;
572
573 Ok(Self {
574 or_replace,
575 if_not_exists,
576 sink_name,
577 with_properties,
578 sink_from,
579 columns: target_spec_columns,
580 emit_mode,
581 sink_schema,
582 into_table_name,
583 })
584 }
585}
586
587impl fmt::Display for CreateSinkStatement {
588 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
589 let mut v: Vec<String> = vec![];
590 impl_fmt_display!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], v, self);
591 impl_fmt_display!(sink_name, v, self);
592 if let Some(into_table) = &self.into_table_name {
593 impl_fmt_display!([Keyword::INTO], v);
594 impl_fmt_display!([into_table], v);
595 if !self.columns.is_empty() {
596 v.push(format!("({})", display_comma_separated(&self.columns)));
597 }
598 }
599 impl_fmt_display!(sink_from, v, self);
600 if let Some(ref emit_mode) = self.emit_mode {
601 v.push(format!("EMIT {}", emit_mode));
602 }
603 impl_fmt_display!(with_properties, v, self);
604 if let Some(schema) = &self.sink_schema {
605 v.push(format!("{}", schema));
606 }
607 v.iter().join(" ").fmt(f)
608 }
609}
610
611#[derive(Debug, Clone, PartialEq, Eq, Hash)]
619pub struct CreateSubscriptionStatement {
620 pub if_not_exists: bool,
621 pub subscription_name: ObjectName,
622 pub with_properties: WithProperties,
623 pub subscription_from: ObjectName,
624 }
626
627impl ParseTo for CreateSubscriptionStatement {
628 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
629 impl_parse_to!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], p);
630 impl_parse_to!(subscription_name: ObjectName, p);
631
632 let subscription_from = if p.parse_keyword(Keyword::FROM) {
633 impl_parse_to!(from_name: ObjectName, p);
634 from_name
635 } else {
636 p.expected("FROM after CREATE SUBSCRIPTION subscription_name")?
637 };
638
639 if !p.peek_nth_any_of_keywords(0, &[Keyword::WITH]) {
644 p.expected("WITH")?
645 }
646 impl_parse_to!(with_properties: WithProperties, p);
647
648 if with_properties.0.is_empty() {
649 parser_err!("subscription properties not provided");
650 }
651
652 Ok(Self {
653 if_not_exists,
654 subscription_name,
655 with_properties,
656 subscription_from,
657 })
658 }
659}
660
661impl fmt::Display for CreateSubscriptionStatement {
662 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
663 let mut v: Vec<String> = vec![];
664 impl_fmt_display!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], v, self);
665 impl_fmt_display!(subscription_name, v, self);
666 v.push(format!("FROM {}", self.subscription_from));
667 impl_fmt_display!(with_properties, v, self);
668 v.iter().join(" ").fmt(f)
669 }
670}
671
672#[derive(Debug, Clone, PartialEq, Eq, Hash)]
673pub enum DeclareCursor {
674 Query(Box<Query>),
675 Subscription(ObjectName, Since),
676}
677
678impl fmt::Display for DeclareCursor {
679 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
680 let mut v: Vec<String> = vec![];
681 match self {
682 DeclareCursor::Query(query) => v.push(format!("{}", query.as_ref())),
683 DeclareCursor::Subscription(name, since) => {
684 v.push(format!("{}", name));
685 v.push(format!("{:?}", since));
686 }
687 }
688 v.iter().join(" ").fmt(f)
689 }
690}
691
692#[derive(Debug, Clone, PartialEq, Eq, Hash)]
702pub struct DeclareCursorStatement {
703 pub cursor_name: Ident,
704 pub declare_cursor: DeclareCursor,
705}
706
707impl ParseTo for DeclareCursorStatement {
708 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
709 let cursor_name = p.parse_identifier_non_reserved()?;
710
711 let declare_cursor = if !p.parse_keyword(Keyword::SUBSCRIPTION) {
712 p.expect_keyword(Keyword::CURSOR)?;
713 p.expect_keyword(Keyword::FOR)?;
714 DeclareCursor::Query(Box::new(p.parse_query()?))
715 } else {
716 p.expect_keyword(Keyword::CURSOR)?;
717 p.expect_keyword(Keyword::FOR)?;
718 let cursor_for_name = p.parse_object_name()?;
719 let rw_timestamp = p.parse_since()?;
720 DeclareCursor::Subscription(cursor_for_name, rw_timestamp)
721 };
722
723 Ok(Self {
724 cursor_name,
725 declare_cursor,
726 })
727 }
728}
729
730impl fmt::Display for DeclareCursorStatement {
731 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
732 let mut v: Vec<String> = vec![];
733 impl_fmt_display!(cursor_name, v, self);
734 match &self.declare_cursor {
735 DeclareCursor::Query(_) => {
736 v.push("CURSOR FOR ".to_owned());
737 }
738 DeclareCursor::Subscription { .. } => {
739 v.push("SUBSCRIPTION CURSOR FOR ".to_owned());
740 }
741 }
742 impl_fmt_display!(declare_cursor, v, self);
743 v.iter().join(" ").fmt(f)
744 }
745}
746
747#[derive(Debug, Clone, PartialEq, Eq, Hash)]
751pub struct FetchCursorStatement {
752 pub cursor_name: Ident,
753 pub count: u32,
754 pub with_properties: WithProperties,
755}
756
757impl ParseTo for FetchCursorStatement {
758 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
759 let count = if p.parse_keyword(Keyword::NEXT) {
760 1
761 } else {
762 literal_u32(p)?
763 };
764 p.expect_keyword(Keyword::FROM)?;
765 let cursor_name = p.parse_identifier_non_reserved()?;
766 impl_parse_to!(with_properties: WithProperties, p);
767
768 Ok(Self {
769 cursor_name,
770 count,
771 with_properties,
772 })
773 }
774}
775
776impl fmt::Display for FetchCursorStatement {
777 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
778 let mut v: Vec<String> = vec![];
779 if self.count == 1 {
780 v.push("NEXT ".to_owned());
781 } else {
782 impl_fmt_display!(count, v, self);
783 }
784 v.push("FROM ".to_owned());
785 impl_fmt_display!(cursor_name, v, self);
786 v.iter().join(" ").fmt(f)
787 }
788}
789
790#[derive(Debug, Clone, PartialEq, Eq, Hash)]
794pub struct CloseCursorStatement {
795 pub cursor_name: Option<Ident>,
796}
797
798impl ParseTo for CloseCursorStatement {
799 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
800 let cursor_name = if p.parse_keyword(Keyword::ALL) {
801 None
802 } else {
803 Some(p.parse_identifier_non_reserved()?)
804 };
805
806 Ok(Self { cursor_name })
807 }
808}
809
810impl fmt::Display for CloseCursorStatement {
811 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
812 let mut v: Vec<String> = vec![];
813 if let Some(cursor_name) = &self.cursor_name {
814 v.push(format!("{}", cursor_name));
815 } else {
816 v.push("ALL".to_owned());
817 }
818 v.iter().join(" ").fmt(f)
819 }
820}
821
822#[derive(Debug, Clone, PartialEq, Eq, Hash)]
828pub struct CreateConnectionStatement {
829 pub if_not_exists: bool,
830 pub connection_name: ObjectName,
831 pub with_properties: WithProperties,
832}
833
834impl ParseTo for CreateConnectionStatement {
835 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
836 impl_parse_to!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], p);
837 impl_parse_to!(connection_name: ObjectName, p);
838 impl_parse_to!(with_properties: WithProperties, p);
839 if with_properties.0.is_empty() {
840 parser_err!("connection properties not provided");
841 }
842
843 Ok(Self {
844 if_not_exists,
845 connection_name,
846 with_properties,
847 })
848 }
849}
850
851impl fmt::Display for CreateConnectionStatement {
852 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
853 let mut v: Vec<String> = vec![];
854 impl_fmt_display!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], v, self);
855 impl_fmt_display!(connection_name, v, self);
856 impl_fmt_display!(with_properties, v, self);
857 v.iter().join(" ").fmt(f)
858 }
859}
860
861#[derive(Debug, Clone, PartialEq, Eq, Hash)]
862pub struct CreateSecretStatement {
863 pub if_not_exists: bool,
864 pub secret_name: ObjectName,
865 pub credential: Value,
866 pub with_properties: WithProperties,
867}
868
869impl ParseTo for CreateSecretStatement {
870 fn parse_to(parser: &mut Parser<'_>) -> ModalResult<Self> {
871 impl_parse_to!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], parser);
872 impl_parse_to!(secret_name: ObjectName, parser);
873 impl_parse_to!(with_properties: WithProperties, parser);
874 let mut credential = Value::Null;
875 if parser.parse_keyword(Keyword::AS) {
876 credential = parser.ensure_parse_value()?;
877 }
878 Ok(Self {
879 if_not_exists,
880 secret_name,
881 credential,
882 with_properties,
883 })
884 }
885}
886
887impl fmt::Display for CreateSecretStatement {
888 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
889 let mut v: Vec<String> = vec![];
890 impl_fmt_display!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], v, self);
891 impl_fmt_display!(secret_name, v, self);
892 impl_fmt_display!(with_properties, v, self);
893 if self.credential != Value::Null {
894 v.push("AS".to_owned());
895 impl_fmt_display!(credential, v, self);
896 }
897 v.iter().join(" ").fmt(f)
898 }
899}
900
901#[derive(Debug, Clone, PartialEq, Eq, Hash)]
902pub struct WithProperties(pub Vec<SqlOption>);
903
904impl ParseTo for WithProperties {
905 fn parse_to(parser: &mut Parser<'_>) -> ModalResult<Self> {
906 Ok(Self(
907 parser.parse_options_with_preceding_keyword(Keyword::WITH)?,
908 ))
909 }
910}
911
912impl fmt::Display for WithProperties {
913 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
914 if !self.0.is_empty() {
915 write!(f, "WITH ({})", display_comma_separated(self.0.as_slice()))
916 } else {
917 Ok(())
918 }
919 }
920}
921
922#[derive(Debug, Clone, PartialEq, Eq, Hash)]
923pub enum Since {
924 TimestampMsNum(u64),
925 ProcessTime,
926 Begin,
927 Full,
928}
929
930impl fmt::Display for Since {
931 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
932 use Since::*;
933 match self {
934 TimestampMsNum(ts) => write!(f, " SINCE {}", ts),
935 ProcessTime => write!(f, " SINCE PROCTIME()"),
936 Begin => write!(f, " SINCE BEGIN()"),
937 Full => write!(f, " FULL"),
938 }
939 }
940}
941
942#[derive(Debug, Clone, PartialEq, Eq, Hash)]
943pub struct RowSchemaLocation {
944 pub value: AstString,
945}
946
947impl ParseTo for RowSchemaLocation {
948 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
949 impl_parse_to!([Keyword::ROW, Keyword::SCHEMA, Keyword::LOCATION], p);
950 impl_parse_to!(value: AstString, p);
951 Ok(Self { value })
952 }
953}
954
955impl fmt::Display for RowSchemaLocation {
956 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
957 let mut v = vec![];
958 impl_fmt_display!([Keyword::ROW, Keyword::SCHEMA, Keyword::LOCATION], v);
959 impl_fmt_display!(value, v, self);
960 v.iter().join(" ").fmt(f)
961 }
962}
963
964#[derive(Debug, Clone, PartialEq, Eq, Hash)]
967pub struct AstString(pub String);
968
969impl ParseTo for AstString {
970 fn parse_to(parser: &mut Parser<'_>) -> ModalResult<Self> {
971 Ok(Self(parser.parse_literal_string()?))
972 }
973}
974
975impl fmt::Display for AstString {
976 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
977 write!(f, "'{}'", self.0)
978 }
979}
980
981#[derive(Debug, Clone, PartialEq, Eq, Hash)]
984pub enum AstOption<T> {
985 None,
987 Some(T),
989}
990
991impl<T: ParseTo> ParseTo for AstOption<T> {
992 fn parse_to(parser: &mut Parser<'_>) -> ModalResult<Self> {
993 match T::parse_to(parser) {
994 Ok(t) => Ok(AstOption::Some(t)),
995 Err(_) => Ok(AstOption::None),
996 }
997 }
998}
999
1000impl<T: fmt::Display> fmt::Display for AstOption<T> {
1001 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1002 match &self {
1003 AstOption::Some(t) => t.fmt(f),
1004 AstOption::None => Ok(()),
1005 }
1006 }
1007}
1008
1009impl<T> From<AstOption<T>> for Option<T> {
1010 fn from(val: AstOption<T>) -> Self {
1011 match val {
1012 AstOption::Some(t) => Some(t),
1013 AstOption::None => None,
1014 }
1015 }
1016}
1017
1018#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1019pub struct CreateUserStatement {
1020 pub user_name: ObjectName,
1021 pub with_options: UserOptions,
1022}
1023
1024#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1025pub struct AlterUserStatement {
1026 pub user_name: ObjectName,
1027 pub mode: AlterUserMode,
1028}
1029
1030#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1031pub enum AlterUserMode {
1032 Options(UserOptions),
1033 Rename(ObjectName),
1034}
1035
1036#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1037pub enum UserOption {
1038 SuperUser,
1039 NoSuperUser,
1040 CreateDB,
1041 NoCreateDB,
1042 CreateUser,
1043 NoCreateUser,
1044 Login,
1045 NoLogin,
1046 Admin,
1047 NoAdmin,
1048 EncryptedPassword(AstString),
1049 Password(Option<AstString>),
1050 OAuth(Vec<SqlOption>),
1051}
1052
1053impl fmt::Display for UserOption {
1054 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1055 match self {
1056 UserOption::SuperUser => write!(f, "SUPERUSER"),
1057 UserOption::NoSuperUser => write!(f, "NOSUPERUSER"),
1058 UserOption::CreateDB => write!(f, "CREATEDB"),
1059 UserOption::NoCreateDB => write!(f, "NOCREATEDB"),
1060 UserOption::CreateUser => write!(f, "CREATEUSER"),
1061 UserOption::NoCreateUser => write!(f, "NOCREATEUSER"),
1062 UserOption::Login => write!(f, "LOGIN"),
1063 UserOption::NoLogin => write!(f, "NOLOGIN"),
1064 UserOption::Admin => write!(f, "ADMIN"),
1065 UserOption::NoAdmin => write!(f, "NOADMIN"),
1066 UserOption::EncryptedPassword(p) => {
1067 if should_redact_user_password() {
1068 write!(f, "ENCRYPTED PASSWORD [REDACTED]")
1069 } else {
1070 write!(f, "ENCRYPTED PASSWORD {}", p)
1071 }
1072 }
1073 UserOption::Password(None) => write!(f, "PASSWORD NULL"),
1074 UserOption::Password(Some(p)) => {
1075 if should_redact_user_password() {
1076 write!(f, "PASSWORD [REDACTED]")
1077 } else {
1078 write!(f, "PASSWORD {}", p)
1079 }
1080 }
1081 UserOption::OAuth(options) => {
1082 write!(f, "({})", display_comma_separated(options.as_slice()))
1083 }
1084 }
1085 }
1086}
1087
1088#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1089pub struct UserOptions(pub Vec<UserOption>, pub bool);
1090
1091fn should_redact_user_password() -> bool {
1092 REDACT_SQL_OPTION_KEYWORDS.try_with(|_| ()).is_ok()
1093}
1094
1095#[derive(Default)]
1096struct UserOptionsBuilder {
1097 with_prefix: bool,
1098 super_user: Option<UserOption>,
1099 create_db: Option<UserOption>,
1100 create_user: Option<UserOption>,
1101 login: Option<UserOption>,
1102 admin: Option<UserOption>,
1103 password: Option<UserOption>,
1104}
1105
1106impl UserOptionsBuilder {
1107 fn build(self) -> UserOptions {
1108 let mut options = vec![];
1109 if let Some(option) = self.super_user {
1110 options.push(option);
1111 }
1112 if let Some(option) = self.create_db {
1113 options.push(option);
1114 }
1115 if let Some(option) = self.create_user {
1116 options.push(option);
1117 }
1118 if let Some(option) = self.login {
1119 options.push(option);
1120 }
1121 if let Some(option) = self.admin {
1122 options.push(option);
1123 }
1124 if let Some(option) = self.password {
1125 options.push(option);
1126 }
1127 UserOptions(options, self.with_prefix)
1128 }
1129}
1130
1131impl ParseTo for UserOptions {
1132 fn parse_to(parser: &mut Parser<'_>) -> ModalResult<Self> {
1133 let mut builder = UserOptionsBuilder {
1134 with_prefix: parser.parse_keyword(Keyword::WITH),
1135 ..Default::default()
1136 };
1137 let add_option = |item: &mut Option<UserOption>, user_option| {
1138 let old_value = item.replace(user_option);
1139 if old_value.is_some() {
1140 parser_err!("conflicting or redundant options");
1141 }
1142 Ok(())
1143 };
1144 loop {
1145 let token = parser.peek_token();
1146 if token == Token::EOF || token == Token::SemiColon {
1147 break;
1148 }
1149
1150 let option = parser.parse_identifier()?;
1151 let s = option.real_value();
1152 let (item_mut_ref, user_option) = match &s[..] {
1153 "superuser" => (&mut builder.super_user, UserOption::SuperUser),
1154 "nosuperuser" => (&mut builder.super_user, UserOption::NoSuperUser),
1155 "createdb" => (&mut builder.create_db, UserOption::CreateDB),
1156 "nocreatedb" => (&mut builder.create_db, UserOption::NoCreateDB),
1157 "createuser" => (&mut builder.create_user, UserOption::CreateUser),
1158 "nocreateuser" => (&mut builder.create_user, UserOption::NoCreateUser),
1159 "login" => (&mut builder.login, UserOption::Login),
1160 "nologin" => (&mut builder.login, UserOption::NoLogin),
1161 "admin" => (&mut builder.admin, UserOption::Admin),
1162 "noadmin" => (&mut builder.admin, UserOption::NoAdmin),
1163 "password" => {
1164 if parser.parse_keyword(Keyword::NULL) {
1165 (&mut builder.password, UserOption::Password(None))
1166 } else {
1167 (
1168 &mut builder.password,
1169 UserOption::Password(Some(AstString::parse_to(parser)?)),
1170 )
1171 }
1172 }
1173 "encrypted" => {
1174 let option = parser.parse_identifier()?;
1175 let s = option.real_value();
1176 if s != "password" {
1177 parser_err!("expected PASSWORD after ENCRYPTED, found {}", option);
1178 }
1179 (
1180 &mut builder.password,
1181 UserOption::EncryptedPassword(AstString::parse_to(parser)?),
1182 )
1183 }
1184 "oauth" => {
1185 let options = parser.parse_options()?;
1186 (&mut builder.password, UserOption::OAuth(options))
1187 }
1188 _ => {
1189 parser_err!("unexpected user option: {}", option);
1190 }
1191 };
1192 add_option(item_mut_ref, user_option)?;
1193 }
1194 Ok(builder.build())
1195 }
1196}
1197
1198impl fmt::Display for UserOptions {
1199 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1200 if !self.0.is_empty() {
1201 if self.1 {
1202 write!(f, "WITH ")?;
1203 }
1204 write!(f, "{}", display_separated(self.0.as_slice(), " "))
1205 } else {
1206 Ok(())
1207 }
1208 }
1209}
1210
1211impl ParseTo for CreateUserStatement {
1212 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
1213 impl_parse_to!(user_name: ObjectName, p);
1214 impl_parse_to!(with_options: UserOptions, p);
1215
1216 Ok(CreateUserStatement {
1217 user_name,
1218 with_options,
1219 })
1220 }
1221}
1222
1223impl fmt::Display for CreateUserStatement {
1224 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1225 let mut v: Vec<String> = vec![];
1226 impl_fmt_display!(user_name, v, self);
1227 impl_fmt_display!(with_options, v, self);
1228 v.iter().join(" ").fmt(f)
1229 }
1230}
1231
1232impl fmt::Display for AlterUserMode {
1233 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1234 match self {
1235 AlterUserMode::Options(options) => {
1236 write!(f, "{}", options)
1237 }
1238 AlterUserMode::Rename(new_name) => {
1239 write!(f, "RENAME TO {}", new_name)
1240 }
1241 }
1242 }
1243}
1244
1245impl fmt::Display for AlterUserStatement {
1246 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1247 let mut v: Vec<String> = vec![];
1248 impl_fmt_display!(user_name, v, self);
1249 impl_fmt_display!(mode, v, self);
1250 v.iter().join(" ").fmt(f)
1251 }
1252}
1253
1254impl ParseTo for AlterUserStatement {
1255 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
1256 impl_parse_to!(user_name: ObjectName, p);
1257 impl_parse_to!(mode: AlterUserMode, p);
1258
1259 Ok(AlterUserStatement { user_name, mode })
1260 }
1261}
1262
1263impl ParseTo for AlterUserMode {
1264 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
1265 if p.parse_keyword(Keyword::RENAME) {
1266 p.expect_keyword(Keyword::TO)?;
1267 impl_parse_to!(new_name: ObjectName, p);
1268 Ok(AlterUserMode::Rename(new_name))
1269 } else {
1270 impl_parse_to!(with_options: UserOptions, p);
1271 Ok(AlterUserMode::Options(with_options))
1272 }
1273 }
1274}
1275
1276#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1277pub struct DropStatement {
1278 pub object_type: ObjectType,
1280 pub if_exists: bool,
1282 pub object_name: ObjectName,
1284 pub drop_mode: AstOption<DropMode>,
1287}
1288
1289impl ParseTo for DropStatement {
1296 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
1297 impl_parse_to!(object_type: ObjectType, p);
1298 impl_parse_to!(if_exists => [Keyword::IF, Keyword::EXISTS], p);
1299 let object_name = p.parse_object_name()?;
1300 impl_parse_to!(drop_mode: AstOption<DropMode>, p);
1301 Ok(Self {
1302 object_type,
1303 if_exists,
1304 object_name,
1305 drop_mode,
1306 })
1307 }
1308}
1309
1310impl fmt::Display for DropStatement {
1311 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1312 let mut v: Vec<String> = vec![];
1313 impl_fmt_display!(object_type, v, self);
1314 impl_fmt_display!(if_exists => [Keyword::IF, Keyword::EXISTS], v, self);
1315 impl_fmt_display!(object_name, v, self);
1316 impl_fmt_display!(drop_mode, v, self);
1317 v.iter().join(" ").fmt(f)
1318 }
1319}
1320
1321#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1322pub enum DropMode {
1323 Cascade,
1324 Restrict,
1325}
1326
1327impl ParseTo for DropMode {
1328 fn parse_to(parser: &mut Parser<'_>) -> ModalResult<Self> {
1329 let drop_mode = if parser.parse_keyword(Keyword::CASCADE) {
1330 DropMode::Cascade
1331 } else if parser.parse_keyword(Keyword::RESTRICT) {
1332 DropMode::Restrict
1333 } else {
1334 return parser.expected("CASCADE | RESTRICT");
1335 };
1336 Ok(drop_mode)
1337 }
1338}
1339
1340impl fmt::Display for DropMode {
1341 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1342 f.write_str(match self {
1343 DropMode::Cascade => "CASCADE",
1344 DropMode::Restrict => "RESTRICT",
1345 })
1346 }
1347}