1use std::fmt;
16use std::fmt::{Formatter, Write};
17
18use itertools::Itertools;
19use winnow::ModalResult;
20
21use super::ddl::SourceWatermark;
22use super::legacy_source::{CompatibleFormatEncode, parse_format_encode};
23use super::{EmitMode, Ident, ObjectType, Query, Value};
24use crate::ast::{
25 ColumnDef, ObjectName, REDACT_SQL_OPTION_KEYWORDS, SqlOption, TableConstraint,
26 display_comma_separated, display_separated,
27};
28use crate::keywords::Keyword;
29use crate::parser::{IncludeOption, IsOptional, Parser};
30use crate::parser_err;
31use crate::parser_v2::literal_u32;
32use crate::tokenizer::Token;
33
34pub trait ParseTo: Sized {
36 fn parse_to(parser: &mut Parser<'_>) -> ModalResult<Self>;
37}
38
39#[macro_export]
40macro_rules! impl_parse_to {
41 () => {};
42 ($field:ident : $field_type:ty, $parser:ident) => {
43 let $field = <$field_type>::parse_to($parser)?;
44 };
45 ($field:ident => [$($arr:tt)+], $parser:ident) => {
46 let $field = $parser.parse_keywords(&[$($arr)+]);
47 };
48 ([$($arr:tt)+], $parser:ident) => {
49 $parser.expect_keywords(&[$($arr)+])?;
50 };
51}
52
53#[macro_export]
54macro_rules! impl_fmt_display {
55 () => {};
56 ($field:ident, $v:ident, $self:ident) => {{
57 let s = format!("{}", $self.$field);
58 if !s.is_empty() {
59 $v.push(s);
60 }
61 }};
62 ($field:ident => [$($arr:tt)+], $v:ident, $self:ident) => {
63 if $self.$field {
64 $v.push(format!("{}", display_separated(&[$($arr)+], " ")));
65 }
66 };
67 ([$($arr:tt)+], $v:ident) => {
68 $v.push(format!("{}", display_separated(&[$($arr)+], " ")));
69 };
70}
71
72#[derive(Debug, Clone, PartialEq, Eq, Hash)]
81pub struct CreateSourceStatement {
82 pub temporary: bool,
83 pub if_not_exists: bool,
84 pub columns: Vec<ColumnDef>,
85 pub wildcard_idx: Option<usize>,
87 pub constraints: Vec<TableConstraint>,
88 pub source_name: ObjectName,
89 pub with_properties: WithProperties,
90 pub format_encode: CompatibleFormatEncode,
91 pub source_watermarks: Vec<SourceWatermark>,
92 pub include_column_options: IncludeOption,
93}
94
95#[derive(Debug, Clone, PartialEq, Eq, Hash)]
99pub enum Format {
100 Native,
103 None,
105 Debezium,
107 DebeziumMongo,
109 Maxwell,
111 Canal,
113 Upsert,
115 Plain,
117}
118
119impl fmt::Display for Format {
121 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
122 write!(
123 f,
124 "{}",
125 match self {
126 Format::Native => "NATIVE",
127 Format::Debezium => "DEBEZIUM",
128 Format::DebeziumMongo => "DEBEZIUM_MONGO",
129 Format::Maxwell => "MAXWELL",
130 Format::Canal => "CANAL",
131 Format::Upsert => "UPSERT",
132 Format::Plain => "PLAIN",
133 Format::None => "NONE",
134 }
135 )
136 }
137}
138
139impl Format {
140 pub fn from_keyword(s: &str) -> ModalResult<Self> {
141 Ok(match s {
142 "DEBEZIUM" => Format::Debezium,
143 "DEBEZIUM_MONGO" => Format::DebeziumMongo,
144 "MAXWELL" => Format::Maxwell,
145 "CANAL" => Format::Canal,
146 "PLAIN" => Format::Plain,
147 "UPSERT" => Format::Upsert,
148 "NATIVE" => Format::Native,
149 "NONE" => Format::None,
150 _ => parser_err!("expected PROTOBUF | DEBEZIUM | PLAIN | NATIVE | NONE after FORMAT"),
151 })
152 }
153}
154
155#[derive(Debug, Clone, PartialEq, Eq, Hash)]
157pub enum Encode {
158 Avro, Csv, Protobuf, Json, Bytes, None,
165 Text, Native,
169 Template,
170 Parquet,
171}
172
173impl fmt::Display for Encode {
175 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
176 write!(
177 f,
178 "{}",
179 match self {
180 Encode::Avro => "AVRO",
181 Encode::Csv => "CSV",
182 Encode::Protobuf => "PROTOBUF",
183 Encode::Json => "JSON",
184 Encode::Bytes => "BYTES",
185 Encode::Native => "NATIVE",
186 Encode::Template => "TEMPLATE",
187 Encode::None => "NONE",
188 Encode::Parquet => "PARQUET",
189 Encode::Text => "TEXT",
190 }
191 )
192 }
193}
194
195impl Encode {
196 pub fn from_keyword(s: &str) -> ModalResult<Self> {
197 Ok(match s {
198 "AVRO" => Encode::Avro,
199 "TEXT" => Encode::Text,
200 "BYTES" => Encode::Bytes,
201 "CSV" => Encode::Csv,
202 "PROTOBUF" => Encode::Protobuf,
203 "JSON" => Encode::Json,
204 "TEMPLATE" => Encode::Template,
205 "PARQUET" => Encode::Parquet,
206 "NATIVE" => Encode::Native,
207 "NONE" => Encode::None,
208 _ => parser_err!(
209 "expected AVRO | BYTES | CSV | PROTOBUF | JSON | NATIVE | TEMPLATE | PARQUET | NONE after Encode"
210 ),
211 })
212 }
213}
214
215#[derive(Debug, Clone, PartialEq, Eq, Hash)]
217pub struct FormatEncodeOptions {
218 pub format: Format,
219 pub row_encode: Encode,
220 pub row_options: Vec<SqlOption>,
221
222 pub key_encode: Option<Encode>,
223}
224
225impl Parser<'_> {
226 fn peek_format_encode_format(&mut self) -> bool {
228 (self.peek_nth_any_of_keywords(0, &[Keyword::ROW])
229 && self.peek_nth_any_of_keywords(1, &[Keyword::FORMAT])) || self.peek_nth_any_of_keywords(0, &[Keyword::FORMAT]) }
232
233 pub fn parse_format_encode_with_connector(
235 &mut self,
236 connector: &str,
237 cdc_source_job: bool,
238 ) -> ModalResult<CompatibleFormatEncode> {
239 if connector.contains("-cdc") {
244 let expected = if cdc_source_job {
245 FormatEncodeOptions::plain_json()
246 } else if connector.contains("mongodb") {
247 FormatEncodeOptions::debezium_mongo_json()
248 } else {
249 FormatEncodeOptions::debezium_json()
250 };
251
252 if self.peek_format_encode_format() {
253 let schema = parse_format_encode(self)?.into_v2();
254 if schema != expected {
255 parser_err!(
256 "Row format for CDC connectors should be \
257 either omitted or set to `{expected}`",
258 );
259 }
260 }
261 Ok(expected.into())
262 } else if connector.contains("nexmark") {
263 let expected = FormatEncodeOptions::native();
264 if self.peek_format_encode_format() {
265 let schema = parse_format_encode(self)?.into_v2();
266 if schema != expected {
267 parser_err!(
268 "Row format for nexmark connectors should be \
269 either omitted or set to `{expected}`",
270 );
271 }
272 }
273 Ok(expected.into())
274 } else if connector.contains("datagen") {
275 Ok(if self.peek_format_encode_format() {
276 parse_format_encode(self)?
277 } else {
278 FormatEncodeOptions::native().into()
279 })
280 } else if connector.contains("iceberg") || connector.contains("adbc_snowflake") {
281 let expected = FormatEncodeOptions::none();
282 if self.peek_format_encode_format() {
283 let schema = parse_format_encode(self)?.into_v2();
284 if schema != expected {
285 parser_err!(
286 "Row format for iceberg connectors should be \
287 either omitted or set to `{expected}`",
288 );
289 }
290 }
291 Ok(expected.into())
292 } else if connector.contains("webhook") {
293 parser_err!(
294 "Source with webhook connector is not supported. \
295 Please use the `CREATE TABLE ... WITH ...` statement instead.",
296 );
297 } else {
298 Ok(parse_format_encode(self)?)
299 }
300 }
301
302 pub fn parse_schema(&mut self) -> ModalResult<Option<FormatEncodeOptions>> {
304 if !self.parse_keyword(Keyword::FORMAT) {
305 return Ok(None);
306 }
307
308 let id = self.parse_identifier()?;
309 let s = id.value.to_ascii_uppercase();
310 let format = Format::from_keyword(&s)?;
311 self.expect_keyword(Keyword::ENCODE)?;
312 let id = self.parse_identifier()?;
313 let s = id.value.to_ascii_uppercase();
314 let row_encode = Encode::from_keyword(&s)?;
315 let row_options = self.parse_options()?;
316
317 let key_encode = if self.parse_keywords(&[Keyword::KEY, Keyword::ENCODE]) {
318 Some(Encode::from_keyword(
319 self.parse_identifier()?.value.to_ascii_uppercase().as_str(),
320 )?)
321 } else {
322 None
323 };
324
325 Ok(Some(FormatEncodeOptions {
326 format,
327 row_encode,
328 row_options,
329 key_encode,
330 }))
331 }
332}
333
334impl FormatEncodeOptions {
335 pub const fn plain_json() -> Self {
336 FormatEncodeOptions {
337 format: Format::Plain,
338 row_encode: Encode::Json,
339 row_options: Vec::new(),
340 key_encode: None,
341 }
342 }
343
344 pub const fn debezium_json() -> Self {
346 FormatEncodeOptions {
347 format: Format::Debezium,
348 row_encode: Encode::Json,
349 row_options: Vec::new(),
350 key_encode: None,
351 }
352 }
353
354 pub const fn debezium_mongo_json() -> Self {
355 FormatEncodeOptions {
356 format: Format::DebeziumMongo,
357 row_encode: Encode::Json,
358 row_options: Vec::new(),
359 key_encode: None,
360 }
361 }
362
363 pub const fn native() -> Self {
365 FormatEncodeOptions {
366 format: Format::Native,
367 row_encode: Encode::Native,
368 row_options: Vec::new(),
369 key_encode: None,
370 }
371 }
372
373 pub const fn none() -> Self {
376 FormatEncodeOptions {
377 format: Format::None,
378 row_encode: Encode::None,
379 row_options: Vec::new(),
380 key_encode: None,
381 }
382 }
383
384 pub fn row_options(&self) -> &[SqlOption] {
385 self.row_options.as_ref()
386 }
387}
388
389impl fmt::Display for FormatEncodeOptions {
390 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
391 write!(f, "FORMAT {} ENCODE {}", self.format, self.row_encode)?;
392
393 if !self.row_options().is_empty() {
394 write!(f, " ({})", display_comma_separated(self.row_options()))?;
395 }
396
397 if let Some(key_encode) = &self.key_encode {
398 write!(f, " KEY ENCODE {}", key_encode)?;
399 }
400
401 Ok(())
402 }
403}
404
405pub(super) fn fmt_create_items(
406 columns: &[ColumnDef],
407 constraints: &[TableConstraint],
408 watermarks: &[SourceWatermark],
409 wildcard_idx: Option<usize>,
410) -> std::result::Result<String, fmt::Error> {
411 let mut items = String::new();
412 let has_items = !columns.is_empty()
413 || !constraints.is_empty()
414 || !watermarks.is_empty()
415 || wildcard_idx.is_some();
416 has_items.then(|| write!(&mut items, "("));
417
418 if let Some(wildcard_idx) = wildcard_idx {
419 let (columns_l, columns_r) = columns.split_at(wildcard_idx);
420 write!(&mut items, "{}", display_comma_separated(columns_l))?;
421 if !columns_l.is_empty() {
422 write!(&mut items, ", ")?;
423 }
424 write!(&mut items, "{}", Token::Mul)?;
425 if !columns_r.is_empty() {
426 write!(&mut items, ", ")?;
427 }
428 write!(&mut items, "{}", display_comma_separated(columns_r))?;
429 } else {
430 write!(&mut items, "{}", display_comma_separated(columns))?;
431 }
432 let mut leading_items = !columns.is_empty() || wildcard_idx.is_some();
433
434 if leading_items && !constraints.is_empty() {
435 write!(&mut items, ", ")?;
436 }
437 write!(&mut items, "{}", display_comma_separated(constraints))?;
438 leading_items |= !constraints.is_empty();
439
440 if leading_items && !watermarks.is_empty() {
441 write!(&mut items, ", ")?;
442 }
443 write!(&mut items, "{}", display_comma_separated(watermarks))?;
444 has_items.then(|| write!(&mut items, ")"));
448 Ok(items)
449}
450
451impl fmt::Display for CreateSourceStatement {
452 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
453 let mut v: Vec<String> = vec![];
454 impl_fmt_display!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], v, self);
455 impl_fmt_display!(source_name, v, self);
456
457 let items = fmt_create_items(
458 &self.columns,
459 &self.constraints,
460 &self.source_watermarks,
461 self.wildcard_idx,
462 )?;
463 if !items.is_empty() {
464 v.push(items);
465 }
466
467 for item in &self.include_column_options {
468 v.push(format!("{}", item));
469 }
470
471 let is_cdc_source = self.with_properties.0.iter().any(|option| {
473 option.name.real_value().eq_ignore_ascii_case("connector")
474 && option.value.to_string().contains("cdc")
475 });
476
477 impl_fmt_display!(with_properties, v, self);
478 if !is_cdc_source {
479 impl_fmt_display!(format_encode, v, self);
480 }
481 v.iter().join(" ").fmt(f)
482 }
483}
484
485#[derive(Debug, Clone, PartialEq, Eq, Hash)]
486pub enum CreateSink {
487 From(ObjectName),
488 AsQuery(Box<Query>),
489}
490
491impl fmt::Display for CreateSink {
492 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
493 match self {
494 Self::From(mv) => write!(f, "FROM {}", mv),
495 Self::AsQuery(query) => write!(f, "AS {}", query),
496 }
497 }
498}
499#[derive(Debug, Clone, PartialEq, Eq, Hash)]
507pub struct CreateSinkStatement {
508 pub if_not_exists: bool,
509 pub sink_name: ObjectName,
510 pub with_properties: WithProperties,
511 pub sink_from: CreateSink,
512
513 pub columns: Vec<Ident>,
516 pub emit_mode: Option<EmitMode>,
517 pub sink_schema: Option<FormatEncodeOptions>,
518 pub into_table_name: Option<ObjectName>,
519}
520
521impl ParseTo for CreateSinkStatement {
522 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
523 impl_parse_to!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], p);
524 impl_parse_to!(sink_name: ObjectName, p);
525
526 let mut target_spec_columns = Vec::new();
527 let into_table_name = if p.parse_keyword(Keyword::INTO) {
528 impl_parse_to!(into_table_name: ObjectName, p);
529
530 target_spec_columns = p.parse_parenthesized_column_list(IsOptional::Optional)?;
532 Some(into_table_name)
533 } else {
534 None
535 };
536
537 let sink_from = if p.parse_keyword(Keyword::FROM) {
538 impl_parse_to!(from_name: ObjectName, p);
539 CreateSink::From(from_name)
540 } else if p.parse_keyword(Keyword::AS) {
541 let query = Box::new(p.parse_query()?);
542 CreateSink::AsQuery(query)
543 } else {
544 p.expected("FROM or AS after CREATE SINK sink_name")?
545 };
546
547 let emit_mode: Option<EmitMode> = p.parse_emit_mode()?;
548
549 if !p.peek_nth_any_of_keywords(0, &[Keyword::WITH]) && into_table_name.is_none() {
552 p.expected("WITH")?
553 }
554 impl_parse_to!(with_properties: WithProperties, p);
555
556 if with_properties.0.is_empty() && into_table_name.is_none() {
557 parser_err!("sink properties not provided");
558 }
559
560 let sink_schema = p.parse_schema()?;
561
562 Ok(Self {
563 if_not_exists,
564 sink_name,
565 with_properties,
566 sink_from,
567 columns: target_spec_columns,
568 emit_mode,
569 sink_schema,
570 into_table_name,
571 })
572 }
573}
574
575impl fmt::Display for CreateSinkStatement {
576 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
577 let mut v: Vec<String> = vec![];
578 impl_fmt_display!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], v, self);
579 impl_fmt_display!(sink_name, v, self);
580 if let Some(into_table) = &self.into_table_name {
581 impl_fmt_display!([Keyword::INTO], v);
582 impl_fmt_display!([into_table], v);
583 if !self.columns.is_empty() {
584 v.push(format!("({})", display_comma_separated(&self.columns)));
585 }
586 }
587 impl_fmt_display!(sink_from, v, self);
588 if let Some(ref emit_mode) = self.emit_mode {
589 v.push(format!("EMIT {}", emit_mode));
590 }
591 impl_fmt_display!(with_properties, v, self);
592 if let Some(schema) = &self.sink_schema {
593 v.push(format!("{}", schema));
594 }
595 v.iter().join(" ").fmt(f)
596 }
597}
598
599#[derive(Debug, Clone, PartialEq, Eq, Hash)]
607pub struct CreateSubscriptionStatement {
608 pub if_not_exists: bool,
609 pub subscription_name: ObjectName,
610 pub with_properties: WithProperties,
611 pub subscription_from: ObjectName,
612 }
614
615impl ParseTo for CreateSubscriptionStatement {
616 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
617 impl_parse_to!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], p);
618 impl_parse_to!(subscription_name: ObjectName, p);
619
620 let subscription_from = if p.parse_keyword(Keyword::FROM) {
621 impl_parse_to!(from_name: ObjectName, p);
622 from_name
623 } else {
624 p.expected("FROM after CREATE SUBSCRIPTION subscription_name")?
625 };
626
627 if !p.peek_nth_any_of_keywords(0, &[Keyword::WITH]) {
632 p.expected("WITH")?
633 }
634 impl_parse_to!(with_properties: WithProperties, p);
635
636 if with_properties.0.is_empty() {
637 parser_err!("subscription properties not provided");
638 }
639
640 Ok(Self {
641 if_not_exists,
642 subscription_name,
643 with_properties,
644 subscription_from,
645 })
646 }
647}
648
649impl fmt::Display for CreateSubscriptionStatement {
650 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
651 let mut v: Vec<String> = vec![];
652 impl_fmt_display!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], v, self);
653 impl_fmt_display!(subscription_name, v, self);
654 v.push(format!("FROM {}", self.subscription_from));
655 impl_fmt_display!(with_properties, v, self);
656 v.iter().join(" ").fmt(f)
657 }
658}
659
660#[derive(Debug, Clone, PartialEq, Eq, Hash)]
661pub enum DeclareCursor {
662 Query(Box<Query>),
663 Subscription(ObjectName, Since),
664}
665
666impl fmt::Display for DeclareCursor {
667 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
668 let mut v: Vec<String> = vec![];
669 match self {
670 DeclareCursor::Query(query) => v.push(format!("{}", query.as_ref())),
671 DeclareCursor::Subscription(name, since) => {
672 v.push(format!("{}", name));
673 v.push(format!("{:?}", since));
674 }
675 }
676 v.iter().join(" ").fmt(f)
677 }
678}
679
680#[derive(Debug, Clone, PartialEq, Eq, Hash)]
690pub struct DeclareCursorStatement {
691 pub cursor_name: Ident,
692 pub declare_cursor: DeclareCursor,
693}
694
695impl ParseTo for DeclareCursorStatement {
696 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
697 let cursor_name = p.parse_identifier_non_reserved()?;
698
699 let declare_cursor = if !p.parse_keyword(Keyword::SUBSCRIPTION) {
700 p.expect_keyword(Keyword::CURSOR)?;
701 p.expect_keyword(Keyword::FOR)?;
702 DeclareCursor::Query(Box::new(p.parse_query()?))
703 } else {
704 p.expect_keyword(Keyword::CURSOR)?;
705 p.expect_keyword(Keyword::FOR)?;
706 let cursor_for_name = p.parse_object_name()?;
707 let rw_timestamp = p.parse_since()?;
708 DeclareCursor::Subscription(cursor_for_name, rw_timestamp)
709 };
710
711 Ok(Self {
712 cursor_name,
713 declare_cursor,
714 })
715 }
716}
717
718impl fmt::Display for DeclareCursorStatement {
719 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
720 let mut v: Vec<String> = vec![];
721 impl_fmt_display!(cursor_name, v, self);
722 match &self.declare_cursor {
723 DeclareCursor::Query(_) => {
724 v.push("CURSOR FOR ".to_owned());
725 }
726 DeclareCursor::Subscription { .. } => {
727 v.push("SUBSCRIPTION CURSOR FOR ".to_owned());
728 }
729 }
730 impl_fmt_display!(declare_cursor, v, self);
731 v.iter().join(" ").fmt(f)
732 }
733}
734
735#[derive(Debug, Clone, PartialEq, Eq, Hash)]
739pub struct FetchCursorStatement {
740 pub cursor_name: Ident,
741 pub count: u32,
742 pub with_properties: WithProperties,
743}
744
745impl ParseTo for FetchCursorStatement {
746 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
747 let count = if p.parse_keyword(Keyword::NEXT) {
748 1
749 } else {
750 literal_u32(p)?
751 };
752 p.expect_keyword(Keyword::FROM)?;
753 let cursor_name = p.parse_identifier_non_reserved()?;
754 impl_parse_to!(with_properties: WithProperties, p);
755
756 Ok(Self {
757 cursor_name,
758 count,
759 with_properties,
760 })
761 }
762}
763
764impl fmt::Display for FetchCursorStatement {
765 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
766 let mut v: Vec<String> = vec![];
767 if self.count == 1 {
768 v.push("NEXT ".to_owned());
769 } else {
770 impl_fmt_display!(count, v, self);
771 }
772 v.push("FROM ".to_owned());
773 impl_fmt_display!(cursor_name, v, self);
774 v.iter().join(" ").fmt(f)
775 }
776}
777
778#[derive(Debug, Clone, PartialEq, Eq, Hash)]
782pub struct CloseCursorStatement {
783 pub cursor_name: Option<Ident>,
784}
785
786impl ParseTo for CloseCursorStatement {
787 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
788 let cursor_name = if p.parse_keyword(Keyword::ALL) {
789 None
790 } else {
791 Some(p.parse_identifier_non_reserved()?)
792 };
793
794 Ok(Self { cursor_name })
795 }
796}
797
798impl fmt::Display for CloseCursorStatement {
799 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
800 let mut v: Vec<String> = vec![];
801 if let Some(cursor_name) = &self.cursor_name {
802 v.push(format!("{}", cursor_name));
803 } else {
804 v.push("ALL".to_owned());
805 }
806 v.iter().join(" ").fmt(f)
807 }
808}
809
810#[derive(Debug, Clone, PartialEq, Eq, Hash)]
816pub struct CreateConnectionStatement {
817 pub if_not_exists: bool,
818 pub connection_name: ObjectName,
819 pub with_properties: WithProperties,
820}
821
822impl ParseTo for CreateConnectionStatement {
823 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
824 impl_parse_to!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], p);
825 impl_parse_to!(connection_name: ObjectName, p);
826 impl_parse_to!(with_properties: WithProperties, p);
827 if with_properties.0.is_empty() {
828 parser_err!("connection properties not provided");
829 }
830
831 Ok(Self {
832 if_not_exists,
833 connection_name,
834 with_properties,
835 })
836 }
837}
838
839impl fmt::Display for CreateConnectionStatement {
840 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
841 let mut v: Vec<String> = vec![];
842 impl_fmt_display!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], v, self);
843 impl_fmt_display!(connection_name, v, self);
844 impl_fmt_display!(with_properties, v, self);
845 v.iter().join(" ").fmt(f)
846 }
847}
848
849#[derive(Debug, Clone, PartialEq, Eq, Hash)]
850pub struct CreateSecretStatement {
851 pub if_not_exists: bool,
852 pub secret_name: ObjectName,
853 pub credential: Value,
854 pub with_properties: WithProperties,
855}
856
857impl ParseTo for CreateSecretStatement {
858 fn parse_to(parser: &mut Parser<'_>) -> ModalResult<Self> {
859 impl_parse_to!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], parser);
860 impl_parse_to!(secret_name: ObjectName, parser);
861 impl_parse_to!(with_properties: WithProperties, parser);
862 let mut credential = Value::Null;
863 if parser.parse_keyword(Keyword::AS) {
864 credential = parser.ensure_parse_value()?;
865 }
866 Ok(Self {
867 if_not_exists,
868 secret_name,
869 credential,
870 with_properties,
871 })
872 }
873}
874
875impl fmt::Display for CreateSecretStatement {
876 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
877 let mut v: Vec<String> = vec![];
878 impl_fmt_display!(if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], v, self);
879 impl_fmt_display!(secret_name, v, self);
880 impl_fmt_display!(with_properties, v, self);
881 if self.credential != Value::Null {
882 v.push("AS".to_owned());
883 impl_fmt_display!(credential, v, self);
884 }
885 v.iter().join(" ").fmt(f)
886 }
887}
888
889#[derive(Debug, Clone, PartialEq, Eq, Hash)]
890pub struct WithProperties(pub Vec<SqlOption>);
891
892impl ParseTo for WithProperties {
893 fn parse_to(parser: &mut Parser<'_>) -> ModalResult<Self> {
894 Ok(Self(
895 parser.parse_options_with_preceding_keyword(Keyword::WITH)?,
896 ))
897 }
898}
899
900impl fmt::Display for WithProperties {
901 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
902 if !self.0.is_empty() {
903 write!(f, "WITH ({})", display_comma_separated(self.0.as_slice()))
904 } else {
905 Ok(())
906 }
907 }
908}
909
910#[derive(Debug, Clone, PartialEq, Eq, Hash)]
911pub enum Since {
912 TimestampMsNum(u64),
913 ProcessTime,
914 Begin,
915 Full,
916}
917
918impl fmt::Display for Since {
919 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
920 use Since::*;
921 match self {
922 TimestampMsNum(ts) => write!(f, " SINCE {}", ts),
923 ProcessTime => write!(f, " SINCE PROCTIME()"),
924 Begin => write!(f, " SINCE BEGIN()"),
925 Full => write!(f, " FULL"),
926 }
927 }
928}
929
930#[derive(Debug, Clone, PartialEq, Eq, Hash)]
931pub struct RowSchemaLocation {
932 pub value: AstString,
933}
934
935impl ParseTo for RowSchemaLocation {
936 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
937 impl_parse_to!([Keyword::ROW, Keyword::SCHEMA, Keyword::LOCATION], p);
938 impl_parse_to!(value: AstString, p);
939 Ok(Self { value })
940 }
941}
942
943impl fmt::Display for RowSchemaLocation {
944 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
945 let mut v = vec![];
946 impl_fmt_display!([Keyword::ROW, Keyword::SCHEMA, Keyword::LOCATION], v);
947 impl_fmt_display!(value, v, self);
948 v.iter().join(" ").fmt(f)
949 }
950}
951
952#[derive(Debug, Clone, PartialEq, Eq, Hash)]
955pub struct AstString(pub String);
956
957impl ParseTo for AstString {
958 fn parse_to(parser: &mut Parser<'_>) -> ModalResult<Self> {
959 Ok(Self(parser.parse_literal_string()?))
960 }
961}
962
963impl fmt::Display for AstString {
964 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
965 write!(f, "'{}'", self.0)
966 }
967}
968
969#[derive(Debug, Clone, PartialEq, Eq, Hash)]
972pub enum AstOption<T> {
973 None,
975 Some(T),
977}
978
979impl<T: ParseTo> ParseTo for AstOption<T> {
980 fn parse_to(parser: &mut Parser<'_>) -> ModalResult<Self> {
981 match T::parse_to(parser) {
982 Ok(t) => Ok(AstOption::Some(t)),
983 Err(_) => Ok(AstOption::None),
984 }
985 }
986}
987
988impl<T: fmt::Display> fmt::Display for AstOption<T> {
989 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
990 match &self {
991 AstOption::Some(t) => t.fmt(f),
992 AstOption::None => Ok(()),
993 }
994 }
995}
996
997impl<T> From<AstOption<T>> for Option<T> {
998 fn from(val: AstOption<T>) -> Self {
999 match val {
1000 AstOption::Some(t) => Some(t),
1001 AstOption::None => None,
1002 }
1003 }
1004}
1005
1006#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1007pub struct CreateUserStatement {
1008 pub user_name: ObjectName,
1009 pub with_options: UserOptions,
1010}
1011
1012#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1013pub struct AlterUserStatement {
1014 pub user_name: ObjectName,
1015 pub mode: AlterUserMode,
1016}
1017
1018#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1019pub enum AlterUserMode {
1020 Options(UserOptions),
1021 Rename(ObjectName),
1022}
1023
1024#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1025pub enum UserOption {
1026 SuperUser,
1027 NoSuperUser,
1028 CreateDB,
1029 NoCreateDB,
1030 CreateUser,
1031 NoCreateUser,
1032 Login,
1033 NoLogin,
1034 Admin,
1035 NoAdmin,
1036 EncryptedPassword(AstString),
1037 Password(Option<AstString>),
1038 OAuth(Vec<SqlOption>),
1039}
1040
1041impl fmt::Display for UserOption {
1042 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1043 match self {
1044 UserOption::SuperUser => write!(f, "SUPERUSER"),
1045 UserOption::NoSuperUser => write!(f, "NOSUPERUSER"),
1046 UserOption::CreateDB => write!(f, "CREATEDB"),
1047 UserOption::NoCreateDB => write!(f, "NOCREATEDB"),
1048 UserOption::CreateUser => write!(f, "CREATEUSER"),
1049 UserOption::NoCreateUser => write!(f, "NOCREATEUSER"),
1050 UserOption::Login => write!(f, "LOGIN"),
1051 UserOption::NoLogin => write!(f, "NOLOGIN"),
1052 UserOption::Admin => write!(f, "ADMIN"),
1053 UserOption::NoAdmin => write!(f, "NOADMIN"),
1054 UserOption::EncryptedPassword(p) => {
1055 if should_redact_user_password() {
1056 write!(f, "ENCRYPTED PASSWORD [REDACTED]")
1057 } else {
1058 write!(f, "ENCRYPTED PASSWORD {}", p)
1059 }
1060 }
1061 UserOption::Password(None) => write!(f, "PASSWORD NULL"),
1062 UserOption::Password(Some(p)) => {
1063 if should_redact_user_password() {
1064 write!(f, "PASSWORD [REDACTED]")
1065 } else {
1066 write!(f, "PASSWORD {}", p)
1067 }
1068 }
1069 UserOption::OAuth(options) => {
1070 write!(f, "({})", display_comma_separated(options.as_slice()))
1071 }
1072 }
1073 }
1074}
1075
1076#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1077pub struct UserOptions(pub Vec<UserOption>, pub bool);
1078
1079fn should_redact_user_password() -> bool {
1080 REDACT_SQL_OPTION_KEYWORDS.try_with(|_| ()).is_ok()
1081}
1082
1083#[derive(Default)]
1084struct UserOptionsBuilder {
1085 with_prefix: bool,
1086 super_user: Option<UserOption>,
1087 create_db: Option<UserOption>,
1088 create_user: Option<UserOption>,
1089 login: Option<UserOption>,
1090 admin: Option<UserOption>,
1091 password: Option<UserOption>,
1092}
1093
1094impl UserOptionsBuilder {
1095 fn build(self) -> UserOptions {
1096 let mut options = vec![];
1097 if let Some(option) = self.super_user {
1098 options.push(option);
1099 }
1100 if let Some(option) = self.create_db {
1101 options.push(option);
1102 }
1103 if let Some(option) = self.create_user {
1104 options.push(option);
1105 }
1106 if let Some(option) = self.login {
1107 options.push(option);
1108 }
1109 if let Some(option) = self.admin {
1110 options.push(option);
1111 }
1112 if let Some(option) = self.password {
1113 options.push(option);
1114 }
1115 UserOptions(options, self.with_prefix)
1116 }
1117}
1118
1119impl ParseTo for UserOptions {
1120 fn parse_to(parser: &mut Parser<'_>) -> ModalResult<Self> {
1121 let mut builder = UserOptionsBuilder {
1122 with_prefix: parser.parse_keyword(Keyword::WITH),
1123 ..Default::default()
1124 };
1125 let add_option = |item: &mut Option<UserOption>, user_option| {
1126 let old_value = item.replace(user_option);
1127 if old_value.is_some() {
1128 parser_err!("conflicting or redundant options");
1129 }
1130 Ok(())
1131 };
1132 loop {
1133 let token = parser.peek_token();
1134 if token == Token::EOF || token == Token::SemiColon {
1135 break;
1136 }
1137
1138 let option = parser.parse_identifier()?;
1139 let s = option.real_value();
1140 let (item_mut_ref, user_option) = match &s[..] {
1141 "superuser" => (&mut builder.super_user, UserOption::SuperUser),
1142 "nosuperuser" => (&mut builder.super_user, UserOption::NoSuperUser),
1143 "createdb" => (&mut builder.create_db, UserOption::CreateDB),
1144 "nocreatedb" => (&mut builder.create_db, UserOption::NoCreateDB),
1145 "createuser" => (&mut builder.create_user, UserOption::CreateUser),
1146 "nocreateuser" => (&mut builder.create_user, UserOption::NoCreateUser),
1147 "login" => (&mut builder.login, UserOption::Login),
1148 "nologin" => (&mut builder.login, UserOption::NoLogin),
1149 "admin" => (&mut builder.admin, UserOption::Admin),
1150 "noadmin" => (&mut builder.admin, UserOption::NoAdmin),
1151 "password" => {
1152 if parser.parse_keyword(Keyword::NULL) {
1153 (&mut builder.password, UserOption::Password(None))
1154 } else {
1155 (
1156 &mut builder.password,
1157 UserOption::Password(Some(AstString::parse_to(parser)?)),
1158 )
1159 }
1160 }
1161 "encrypted" => {
1162 let option = parser.parse_identifier()?;
1163 let s = option.real_value();
1164 if s != "password" {
1165 parser_err!("expected PASSWORD after ENCRYPTED, found {}", option);
1166 }
1167 (
1168 &mut builder.password,
1169 UserOption::EncryptedPassword(AstString::parse_to(parser)?),
1170 )
1171 }
1172 "oauth" => {
1173 let options = parser.parse_options()?;
1174 (&mut builder.password, UserOption::OAuth(options))
1175 }
1176 _ => {
1177 parser_err!("unexpected user option: {}", option);
1178 }
1179 };
1180 add_option(item_mut_ref, user_option)?;
1181 }
1182 Ok(builder.build())
1183 }
1184}
1185
1186impl fmt::Display for UserOptions {
1187 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1188 if !self.0.is_empty() {
1189 if self.1 {
1190 write!(f, "WITH ")?;
1191 }
1192 write!(f, "{}", display_separated(self.0.as_slice(), " "))
1193 } else {
1194 Ok(())
1195 }
1196 }
1197}
1198
1199impl ParseTo for CreateUserStatement {
1200 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
1201 impl_parse_to!(user_name: ObjectName, p);
1202 impl_parse_to!(with_options: UserOptions, p);
1203
1204 Ok(CreateUserStatement {
1205 user_name,
1206 with_options,
1207 })
1208 }
1209}
1210
1211impl fmt::Display for CreateUserStatement {
1212 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1213 let mut v: Vec<String> = vec![];
1214 impl_fmt_display!(user_name, v, self);
1215 impl_fmt_display!(with_options, v, self);
1216 v.iter().join(" ").fmt(f)
1217 }
1218}
1219
1220impl fmt::Display for AlterUserMode {
1221 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1222 match self {
1223 AlterUserMode::Options(options) => {
1224 write!(f, "{}", options)
1225 }
1226 AlterUserMode::Rename(new_name) => {
1227 write!(f, "RENAME TO {}", new_name)
1228 }
1229 }
1230 }
1231}
1232
1233impl fmt::Display for AlterUserStatement {
1234 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1235 let mut v: Vec<String> = vec![];
1236 impl_fmt_display!(user_name, v, self);
1237 impl_fmt_display!(mode, v, self);
1238 v.iter().join(" ").fmt(f)
1239 }
1240}
1241
1242impl ParseTo for AlterUserStatement {
1243 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
1244 impl_parse_to!(user_name: ObjectName, p);
1245 impl_parse_to!(mode: AlterUserMode, p);
1246
1247 Ok(AlterUserStatement { user_name, mode })
1248 }
1249}
1250
1251impl ParseTo for AlterUserMode {
1252 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
1253 if p.parse_keyword(Keyword::RENAME) {
1254 p.expect_keyword(Keyword::TO)?;
1255 impl_parse_to!(new_name: ObjectName, p);
1256 Ok(AlterUserMode::Rename(new_name))
1257 } else {
1258 impl_parse_to!(with_options: UserOptions, p);
1259 Ok(AlterUserMode::Options(with_options))
1260 }
1261 }
1262}
1263
1264#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1265pub struct DropStatement {
1266 pub object_type: ObjectType,
1268 pub if_exists: bool,
1270 pub object_name: ObjectName,
1272 pub drop_mode: AstOption<DropMode>,
1275}
1276
1277impl ParseTo for DropStatement {
1284 fn parse_to(p: &mut Parser<'_>) -> ModalResult<Self> {
1285 impl_parse_to!(object_type: ObjectType, p);
1286 impl_parse_to!(if_exists => [Keyword::IF, Keyword::EXISTS], p);
1287 let object_name = p.parse_object_name()?;
1288 impl_parse_to!(drop_mode: AstOption<DropMode>, p);
1289 Ok(Self {
1290 object_type,
1291 if_exists,
1292 object_name,
1293 drop_mode,
1294 })
1295 }
1296}
1297
1298impl fmt::Display for DropStatement {
1299 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1300 let mut v: Vec<String> = vec![];
1301 impl_fmt_display!(object_type, v, self);
1302 impl_fmt_display!(if_exists => [Keyword::IF, Keyword::EXISTS], v, self);
1303 impl_fmt_display!(object_name, v, self);
1304 impl_fmt_display!(drop_mode, v, self);
1305 v.iter().join(" ").fmt(f)
1306 }
1307}
1308
1309#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1310pub enum DropMode {
1311 Cascade,
1312 Restrict,
1313}
1314
1315impl ParseTo for DropMode {
1316 fn parse_to(parser: &mut Parser<'_>) -> ModalResult<Self> {
1317 let drop_mode = if parser.parse_keyword(Keyword::CASCADE) {
1318 DropMode::Cascade
1319 } else if parser.parse_keyword(Keyword::RESTRICT) {
1320 DropMode::Restrict
1321 } else {
1322 return parser.expected("CASCADE | RESTRICT");
1323 };
1324 Ok(drop_mode)
1325 }
1326}
1327
1328impl fmt::Display for DropMode {
1329 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1330 f.write_str(match self {
1331 DropMode::Cascade => "CASCADE",
1332 DropMode::Restrict => "RESTRICT",
1333 })
1334 }
1335}