1use std::fmt;
17
18use super::{ConfigParam, FormatEncodeOptions, SqlOption};
19use crate::ast::{
20 DataType, Expr, Ident, ObjectName, Query, SecretRefValue, SetVariableValue, Value,
21 display_comma_separated, display_separated,
22};
23use crate::tokenizer::Token;
24
25#[derive(Debug, Clone, PartialEq, Eq, Hash)]
26pub enum AlterDatabaseOperation {
27 ChangeOwner {
28 new_owner_name: Ident,
29 },
30 RenameDatabase {
31 database_name: ObjectName,
32 },
33 SetParam(ConfigParam),
34 SetResourceGroup {
37 resource_group: Option<SetVariableValue>,
38 deferred: bool,
39 },
40}
41
42#[derive(Debug, Clone, PartialEq, Eq, Hash)]
43pub enum AlterSchemaOperation {
44 ChangeOwner { new_owner_name: Ident },
45 RenameSchema { schema_name: ObjectName },
46 SwapRenameSchema { target_schema: ObjectName },
47}
48
49#[derive(Debug, Clone, PartialEq, Eq, Hash)]
51pub enum AlterTableOperation {
52 AddConstraint(TableConstraint),
54 AddColumn {
56 column_def: ColumnDef,
57 },
58 DropConstraint {
60 name: Ident,
61 },
62 DropColumn {
64 column_name: Ident,
65 if_exists: bool,
66 cascade: bool,
67 },
68 RenameColumn {
70 old_column_name: Ident,
71 new_column_name: Ident,
72 },
73 RenameTable {
75 table_name: ObjectName,
76 },
77 ChangeColumn {
79 old_name: Ident,
80 new_name: Ident,
81 data_type: DataType,
82 options: Vec<ColumnOption>,
83 },
84 RenameConstraint {
88 old_name: Ident,
89 new_name: Ident,
90 },
91 AlterColumn {
93 column_name: Ident,
94 op: AlterColumnOperation,
95 },
96 AlterWatermark {
98 column_name: Ident,
99 expr: Expr,
100 with_ttl: bool,
101 },
102 ChangeOwner {
104 new_owner_name: Ident,
105 },
106 SetSchema {
108 new_schema_name: ObjectName,
109 },
110 SetParallelism {
112 parallelism: SetVariableValue,
113 deferred: bool,
114 },
115 SetBackfillParallelism {
117 parallelism: SetVariableValue,
118 deferred: bool,
119 },
120 SetConfig {
122 entries: Vec<SqlOption>,
123 },
124 ResetConfig {
126 keys: Vec<ObjectName>,
127 },
128 RefreshSchema,
129 SetSourceRateLimit {
131 rate_limit: i32,
132 },
133 SetBackfillRateLimit {
135 rate_limit: i32,
136 },
137 SetDmlRateLimit {
139 rate_limit: i32,
140 },
141 SwapRenameTable {
143 target_table: ObjectName,
144 },
145 DropConnector,
147
148 AlterConnectorProps {
150 alter_props: Vec<SqlOption>,
151 },
152}
153
154#[derive(Debug, Clone, PartialEq, Eq, Hash)]
155pub enum AlterIndexOperation {
156 RenameIndex {
157 index_name: ObjectName,
158 },
159 SetParallelism {
161 parallelism: SetVariableValue,
162 deferred: bool,
163 },
164 SetBackfillParallelism {
166 parallelism: SetVariableValue,
167 deferred: bool,
168 },
169 SetResourceGroup {
172 resource_group: Option<SetVariableValue>,
173 deferred: bool,
174 },
175 SetConfig {
177 entries: Vec<SqlOption>,
178 },
179 ResetConfig {
181 keys: Vec<ObjectName>,
182 },
183}
184
185#[derive(Debug, Clone, PartialEq, Eq, Hash)]
186pub enum AlterViewOperation {
187 RenameView {
188 view_name: ObjectName,
189 },
190 ChangeOwner {
191 new_owner_name: Ident,
192 },
193 SetSchema {
194 new_schema_name: ObjectName,
195 },
196 SetParallelism {
198 parallelism: SetVariableValue,
199 deferred: bool,
200 },
201 SetBackfillParallelism {
203 parallelism: SetVariableValue,
204 deferred: bool,
205 },
206 SetResourceGroup {
209 resource_group: Option<SetVariableValue>,
210 deferred: bool,
211 },
212 SetBackfillRateLimit {
214 rate_limit: i32,
215 },
216 SwapRenameView {
218 target_view: ObjectName,
219 },
220 SetStreamingEnableUnalignedJoin {
221 enable: bool,
222 },
223 AsQuery {
225 query: Box<Query>,
226 },
227 SetConfig {
229 entries: Vec<SqlOption>,
230 },
231 ResetConfig {
233 keys: Vec<ObjectName>,
234 },
235}
236
237#[derive(Debug, Clone, PartialEq, Eq, Hash)]
238pub enum AlterSinkOperation {
239 RenameSink {
240 sink_name: ObjectName,
241 },
242 ChangeOwner {
243 new_owner_name: Ident,
244 },
245 SetSchema {
246 new_schema_name: ObjectName,
247 },
248 SetParallelism {
250 parallelism: SetVariableValue,
251 deferred: bool,
252 },
253 SetBackfillParallelism {
255 parallelism: SetVariableValue,
256 deferred: bool,
257 },
258 SetResourceGroup {
261 resource_group: Option<SetVariableValue>,
262 deferred: bool,
263 },
264 SetConfig {
266 entries: Vec<SqlOption>,
267 },
268 ResetConfig {
270 keys: Vec<ObjectName>,
271 },
272 SwapRenameSink {
274 target_sink: ObjectName,
275 },
276 SetSinkRateLimit {
277 rate_limit: i32,
278 },
279 SetBackfillRateLimit {
280 rate_limit: i32,
281 },
282 AlterConnectorProps {
283 alter_props: Vec<SqlOption>,
284 },
285 SetStreamingEnableUnalignedJoin {
286 enable: bool,
287 },
288}
289
290#[derive(Debug, Clone, PartialEq, Eq, Hash)]
291pub enum AlterSubscriptionOperation {
292 RenameSubscription { subscription_name: ObjectName },
293 ChangeOwner { new_owner_name: Ident },
294 SetSchema { new_schema_name: ObjectName },
295 SetRetention { retention: Value },
296 SwapRenameSubscription { target_subscription: ObjectName },
297}
298
299#[derive(Debug, Clone, PartialEq, Eq, Hash)]
300pub enum AlterSourceOperation {
301 RenameSource {
302 source_name: ObjectName,
303 },
304 AddColumn {
305 column_def: ColumnDef,
306 },
307 ChangeOwner {
308 new_owner_name: Ident,
309 },
310 SetSchema {
311 new_schema_name: ObjectName,
312 },
313 FormatEncode {
314 format_encode: FormatEncodeOptions,
315 },
316 RefreshSchema,
317 SetSourceRateLimit {
318 rate_limit: i32,
319 },
320 SwapRenameSource {
321 target_source: ObjectName,
322 },
323 SetParallelism {
325 parallelism: SetVariableValue,
326 deferred: bool,
327 },
328 SetBackfillParallelism {
330 parallelism: SetVariableValue,
331 deferred: bool,
332 },
333 SetConfig {
335 entries: Vec<SqlOption>,
336 },
337 ResetConfig {
339 keys: Vec<ObjectName>,
340 },
341 ResetSource,
343 AlterConnectorProps {
344 alter_props: Vec<SqlOption>,
345 },
346}
347
348#[derive(Debug, Clone, PartialEq, Eq, Hash)]
349pub enum AlterFunctionOperation {
350 SetSchema { new_schema_name: ObjectName },
351 ChangeOwner { new_owner_name: Ident },
352}
353
354#[derive(Debug, Clone, PartialEq, Eq, Hash)]
355pub enum AlterConnectionOperation {
356 SetSchema { new_schema_name: ObjectName },
357 ChangeOwner { new_owner_name: Ident },
358 AlterConnectorProps { alter_props: Vec<SqlOption> },
359}
360
361#[derive(Debug, Clone, PartialEq, Eq, Hash)]
362pub enum AlterSecretOperation {
363 ChangeCredential {
364 with_options: Vec<SqlOption>,
365 new_credential: Value,
366 },
367 ChangeOwner {
368 new_owner_name: Ident,
369 },
370}
371
372#[derive(Debug, Clone, PartialEq, Eq, Hash)]
373pub enum AlterFragmentOperation {
374 AlterBackfillRateLimit { rate_limit: i32 },
375 SetParallelism { parallelism: SetVariableValue },
376}
377
378#[derive(Debug, Clone, PartialEq, Eq, Hash)]
379pub enum AlterCompactionGroupOperation {
380 Set { configs: Vec<ConfigParam> },
381}
382
383impl fmt::Display for AlterDatabaseOperation {
384 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
385 match self {
386 AlterDatabaseOperation::ChangeOwner { new_owner_name } => {
387 write!(f, "OWNER TO {}", new_owner_name)
388 }
389 AlterDatabaseOperation::RenameDatabase { database_name } => {
390 write!(f, "RENAME TO {}", database_name)
391 }
392 AlterDatabaseOperation::SetParam(ConfigParam { param, value }) => {
393 write!(f, "SET {} TO {}", param, value)
394 }
395 AlterDatabaseOperation::SetResourceGroup {
396 resource_group,
397 deferred,
398 } => {
399 let deferred = if *deferred { " DEFERRED" } else { "" };
400
401 if let Some(resource_group) = resource_group {
402 write!(f, "SET RESOURCE_GROUP TO {}{}", resource_group, deferred)
403 } else {
404 write!(f, "RESET RESOURCE_GROUP{}", deferred)
405 }
406 }
407 }
408 }
409}
410
411impl fmt::Display for AlterSchemaOperation {
412 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
413 match self {
414 AlterSchemaOperation::ChangeOwner { new_owner_name } => {
415 write!(f, "OWNER TO {}", new_owner_name)
416 }
417 AlterSchemaOperation::RenameSchema { schema_name } => {
418 write!(f, "RENAME TO {}", schema_name)
419 }
420 AlterSchemaOperation::SwapRenameSchema { target_schema } => {
421 write!(f, "SWAP WITH {}", target_schema)
422 }
423 }
424 }
425}
426
427impl fmt::Display for AlterTableOperation {
428 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
429 match self {
430 AlterTableOperation::AddConstraint(c) => write!(f, "ADD {}", c),
431 AlterTableOperation::AddColumn { column_def } => {
432 write!(f, "ADD COLUMN {}", column_def)
433 }
434 AlterTableOperation::AlterColumn { column_name, op } => {
435 write!(f, "ALTER COLUMN {} {}", column_name, op)
436 }
437 AlterTableOperation::AlterWatermark {
438 column_name,
439 expr,
440 with_ttl,
441 } => {
442 write!(f, "ALTER WATERMARK FOR {} AS {}", column_name, expr)?;
443 if *with_ttl {
444 write!(f, " WITH TTL")?;
445 }
446 Ok(())
447 }
448 AlterTableOperation::DropConstraint { name } => write!(f, "DROP CONSTRAINT {}", name),
449 AlterTableOperation::DropColumn {
450 column_name,
451 if_exists,
452 cascade,
453 } => write!(
454 f,
455 "DROP COLUMN {}{}{}",
456 if *if_exists { "IF EXISTS " } else { "" },
457 column_name,
458 if *cascade { " CASCADE" } else { "" }
459 ),
460 AlterTableOperation::RenameColumn {
461 old_column_name,
462 new_column_name,
463 } => write!(
464 f,
465 "RENAME COLUMN {} TO {}",
466 old_column_name, new_column_name
467 ),
468 AlterTableOperation::RenameTable { table_name } => {
469 write!(f, "RENAME TO {}", table_name)
470 }
471 AlterTableOperation::ChangeColumn {
472 old_name,
473 new_name,
474 data_type,
475 options,
476 } => {
477 write!(f, "CHANGE COLUMN {} {} {}", old_name, new_name, data_type)?;
478 if options.is_empty() {
479 Ok(())
480 } else {
481 write!(f, " {}", display_separated(options, " "))
482 }
483 }
484 AlterTableOperation::RenameConstraint { old_name, new_name } => {
485 write!(f, "RENAME CONSTRAINT {} TO {}", old_name, new_name)
486 }
487 AlterTableOperation::ChangeOwner { new_owner_name } => {
488 write!(f, "OWNER TO {}", new_owner_name)
489 }
490 AlterTableOperation::SetSchema { new_schema_name } => {
491 write!(f, "SET SCHEMA {}", new_schema_name)
492 }
493 AlterTableOperation::SetParallelism {
494 parallelism,
495 deferred,
496 } => {
497 write!(
498 f,
499 "SET PARALLELISM TO {}{}",
500 parallelism,
501 if *deferred { " DEFERRED" } else { "" }
502 )
503 }
504 AlterTableOperation::SetBackfillParallelism {
505 parallelism,
506 deferred,
507 } => {
508 write!(
509 f,
510 "SET BACKFILL_PARALLELISM TO {}{}",
511 parallelism,
512 if *deferred { " DEFERRED" } else { "" }
513 )
514 }
515 AlterTableOperation::SetConfig { entries } => {
516 write!(f, "SET CONFIG ({})", display_comma_separated(entries))
517 }
518 AlterTableOperation::ResetConfig { keys } => {
519 write!(f, "RESET CONFIG ({})", display_comma_separated(keys))
520 }
521 AlterTableOperation::RefreshSchema => {
522 write!(f, "REFRESH SCHEMA")
523 }
524 AlterTableOperation::SetSourceRateLimit { rate_limit } => {
525 write!(f, "SET SOURCE_RATE_LIMIT TO {}", rate_limit)
526 }
527 AlterTableOperation::SetBackfillRateLimit { rate_limit } => {
528 write!(f, "SET BACKFILL_RATE_LIMIT TO {}", rate_limit)
529 }
530 AlterTableOperation::SetDmlRateLimit { rate_limit } => {
531 write!(f, "SET DML_RATE_LIMIT TO {}", rate_limit)
532 }
533 AlterTableOperation::SwapRenameTable { target_table } => {
534 write!(f, "SWAP WITH {}", target_table)
535 }
536 AlterTableOperation::DropConnector => {
537 write!(f, "DROP CONNECTOR")
538 }
539 AlterTableOperation::AlterConnectorProps { alter_props } => {
540 write!(
541 f,
542 "CONNECTOR WITH ({})",
543 display_comma_separated(alter_props)
544 )
545 }
546 }
547 }
548}
549
550impl fmt::Display for AlterIndexOperation {
551 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
552 match self {
553 AlterIndexOperation::RenameIndex { index_name } => {
554 write!(f, "RENAME TO {index_name}")
555 }
556 AlterIndexOperation::SetParallelism {
557 parallelism,
558 deferred,
559 } => {
560 write!(
561 f,
562 "SET PARALLELISM TO {}{}",
563 parallelism,
564 if *deferred { " DEFERRED" } else { "" }
565 )
566 }
567 AlterIndexOperation::SetBackfillParallelism {
568 parallelism,
569 deferred,
570 } => {
571 write!(
572 f,
573 "SET BACKFILL_PARALLELISM TO {}{}",
574 parallelism,
575 if *deferred { " DEFERRED" } else { "" }
576 )
577 }
578 AlterIndexOperation::SetResourceGroup {
579 resource_group,
580 deferred,
581 } => {
582 let deferred = if *deferred { " DEFERRED" } else { "" };
583
584 if let Some(resource_group) = resource_group {
585 write!(f, "SET RESOURCE_GROUP TO {}{}", resource_group, deferred)
586 } else {
587 write!(f, "RESET RESOURCE_GROUP{}", deferred)
588 }
589 }
590 AlterIndexOperation::SetConfig { entries } => {
591 write!(f, "SET CONFIG ({})", display_comma_separated(entries))
592 }
593 AlterIndexOperation::ResetConfig { keys } => {
594 write!(f, "RESET CONFIG ({})", display_comma_separated(keys))
595 }
596 }
597 }
598}
599
600impl fmt::Display for AlterViewOperation {
601 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
602 match self {
603 AlterViewOperation::RenameView { view_name } => {
604 write!(f, "RENAME TO {view_name}")
605 }
606 AlterViewOperation::ChangeOwner { new_owner_name } => {
607 write!(f, "OWNER TO {}", new_owner_name)
608 }
609 AlterViewOperation::SetSchema { new_schema_name } => {
610 write!(f, "SET SCHEMA {}", new_schema_name)
611 }
612 AlterViewOperation::SetParallelism {
613 parallelism,
614 deferred,
615 } => {
616 write!(
617 f,
618 "SET PARALLELISM TO {}{}",
619 parallelism,
620 if *deferred { " DEFERRED" } else { "" }
621 )
622 }
623 AlterViewOperation::SetBackfillParallelism {
624 parallelism,
625 deferred,
626 } => {
627 write!(
628 f,
629 "SET BACKFILL_PARALLELISM TO {}{}",
630 parallelism,
631 if *deferred { " DEFERRED" } else { "" }
632 )
633 }
634 AlterViewOperation::SetBackfillRateLimit { rate_limit } => {
635 write!(f, "SET BACKFILL_RATE_LIMIT TO {}", rate_limit)
636 }
637 AlterViewOperation::SwapRenameView { target_view } => {
638 write!(f, "SWAP WITH {}", target_view)
639 }
640 AlterViewOperation::SetResourceGroup {
641 resource_group,
642 deferred,
643 } => {
644 let deferred = if *deferred { " DEFERRED" } else { "" };
645
646 if let Some(resource_group) = resource_group {
647 write!(f, "SET RESOURCE_GROUP TO {}{}", resource_group, deferred)
648 } else {
649 write!(f, "RESET RESOURCE_GROUP{}", deferred)
650 }
651 }
652 AlterViewOperation::SetStreamingEnableUnalignedJoin { enable } => {
653 write!(f, "SET STREAMING_ENABLE_UNALIGNED_JOIN TO {}", enable)
654 }
655 AlterViewOperation::AsQuery { query } => {
656 write!(f, "AS {}", query)
657 }
658 AlterViewOperation::SetConfig { entries } => {
659 write!(f, "SET CONFIG ({})", display_comma_separated(entries))
660 }
661 AlterViewOperation::ResetConfig { keys } => {
662 write!(f, "RESET CONFIG ({})", display_comma_separated(keys))
663 }
664 }
665 }
666}
667
668impl fmt::Display for AlterSinkOperation {
669 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
670 match self {
671 AlterSinkOperation::RenameSink { sink_name } => {
672 write!(f, "RENAME TO {sink_name}")
673 }
674 AlterSinkOperation::ChangeOwner { new_owner_name } => {
675 write!(f, "OWNER TO {}", new_owner_name)
676 }
677 AlterSinkOperation::SetSchema { new_schema_name } => {
678 write!(f, "SET SCHEMA {}", new_schema_name)
679 }
680 AlterSinkOperation::SetParallelism {
681 parallelism,
682 deferred,
683 } => {
684 write!(
685 f,
686 "SET PARALLELISM TO {}{}",
687 parallelism,
688 if *deferred { " DEFERRED" } else { "" }
689 )
690 }
691 AlterSinkOperation::SetBackfillParallelism {
692 parallelism,
693 deferred,
694 } => {
695 write!(
696 f,
697 "SET BACKFILL_PARALLELISM TO {}{}",
698 parallelism,
699 if *deferred { " DEFERRED" } else { "" }
700 )
701 }
702 AlterSinkOperation::SetResourceGroup {
703 resource_group,
704 deferred,
705 } => {
706 let deferred = if *deferred { " DEFERRED" } else { "" };
707
708 if let Some(resource_group) = resource_group {
709 write!(f, "SET RESOURCE_GROUP TO {}{}", resource_group, deferred)
710 } else {
711 write!(f, "RESET RESOURCE_GROUP{}", deferred)
712 }
713 }
714 AlterSinkOperation::SetConfig { entries } => {
715 write!(f, "SET CONFIG ({})", display_comma_separated(entries))
716 }
717 AlterSinkOperation::ResetConfig { keys } => {
718 write!(f, "RESET CONFIG ({})", display_comma_separated(keys))
719 }
720 AlterSinkOperation::SwapRenameSink { target_sink } => {
721 write!(f, "SWAP WITH {}", target_sink)
722 }
723 AlterSinkOperation::SetSinkRateLimit { rate_limit } => {
724 write!(f, "SET SINK_RATE_LIMIT TO {}", rate_limit)
725 }
726 AlterSinkOperation::SetBackfillRateLimit { rate_limit } => {
727 write!(f, "SET BACKFILL_RATE_LIMIT TO {}", rate_limit)
728 }
729 AlterSinkOperation::AlterConnectorProps {
730 alter_props: changed_props,
731 } => {
732 write!(
733 f,
734 "CONNECTOR WITH ({})",
735 display_comma_separated(changed_props)
736 )
737 }
738 AlterSinkOperation::SetStreamingEnableUnalignedJoin { enable } => {
739 write!(f, "SET STREAMING_ENABLE_UNALIGNED_JOIN TO {}", enable)
740 }
741 }
742 }
743}
744
745impl fmt::Display for AlterSubscriptionOperation {
746 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
747 match self {
748 AlterSubscriptionOperation::RenameSubscription { subscription_name } => {
749 write!(f, "RENAME TO {subscription_name}")
750 }
751 AlterSubscriptionOperation::ChangeOwner { new_owner_name } => {
752 write!(f, "OWNER TO {}", new_owner_name)
753 }
754 AlterSubscriptionOperation::SetSchema { new_schema_name } => {
755 write!(f, "SET SCHEMA {}", new_schema_name)
756 }
757 AlterSubscriptionOperation::SetRetention { retention } => {
758 write!(f, "SET RETENTION TO {}", retention)
759 }
760 AlterSubscriptionOperation::SwapRenameSubscription {
761 target_subscription,
762 } => {
763 write!(f, "SWAP WITH {}", target_subscription)
764 }
765 }
766 }
767}
768
769impl fmt::Display for AlterSourceOperation {
770 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
771 match self {
772 AlterSourceOperation::RenameSource { source_name } => {
773 write!(f, "RENAME TO {source_name}")
774 }
775 AlterSourceOperation::AddColumn { column_def } => {
776 write!(f, "ADD COLUMN {column_def}")
777 }
778 AlterSourceOperation::ChangeOwner { new_owner_name } => {
779 write!(f, "OWNER TO {}", new_owner_name)
780 }
781 AlterSourceOperation::SetSchema { new_schema_name } => {
782 write!(f, "SET SCHEMA {}", new_schema_name)
783 }
784 AlterSourceOperation::FormatEncode { format_encode } => {
785 write!(f, "{format_encode}")
786 }
787 AlterSourceOperation::RefreshSchema => {
788 write!(f, "REFRESH SCHEMA")
789 }
790 AlterSourceOperation::SetSourceRateLimit { rate_limit } => {
791 write!(f, "SET SOURCE_RATE_LIMIT TO {}", rate_limit)
792 }
793 AlterSourceOperation::SwapRenameSource { target_source } => {
794 write!(f, "SWAP WITH {}", target_source)
795 }
796 AlterSourceOperation::SetParallelism {
797 parallelism,
798 deferred,
799 } => {
800 write!(
801 f,
802 "SET PARALLELISM TO {}{}",
803 parallelism,
804 if *deferred { " DEFERRED" } else { "" }
805 )
806 }
807 AlterSourceOperation::SetBackfillParallelism {
808 parallelism,
809 deferred,
810 } => {
811 write!(
812 f,
813 "SET BACKFILL_PARALLELISM TO {}{}",
814 parallelism,
815 if *deferred { " DEFERRED" } else { "" }
816 )
817 }
818 AlterSourceOperation::SetConfig { entries } => {
819 write!(f, "SET CONFIG ({})", display_comma_separated(entries))
820 }
821 AlterSourceOperation::ResetConfig { keys } => {
822 write!(f, "RESET CONFIG ({})", display_comma_separated(keys))
823 }
824 AlterSourceOperation::ResetSource => {
825 write!(f, "RESET")
826 }
827 AlterSourceOperation::AlterConnectorProps { alter_props } => {
828 write!(
829 f,
830 "CONNECTOR WITH ({})",
831 display_comma_separated(alter_props)
832 )
833 }
834 }
835 }
836}
837
838impl fmt::Display for AlterFunctionOperation {
839 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
840 match self {
841 AlterFunctionOperation::SetSchema { new_schema_name } => {
842 write!(f, "SET SCHEMA {new_schema_name}")
843 }
844 AlterFunctionOperation::ChangeOwner { new_owner_name } => {
845 write!(f, "OWNER TO {new_owner_name}")
846 }
847 }
848 }
849}
850
851impl fmt::Display for AlterConnectionOperation {
852 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
853 match self {
854 AlterConnectionOperation::SetSchema { new_schema_name } => {
855 write!(f, "SET SCHEMA {new_schema_name}")
856 }
857 AlterConnectionOperation::ChangeOwner { new_owner_name } => {
858 write!(f, "OWNER TO {new_owner_name}")
859 }
860 AlterConnectionOperation::AlterConnectorProps { alter_props } => {
861 write!(
862 f,
863 "CONNECTOR WITH ({})",
864 display_comma_separated(alter_props)
865 )
866 }
867 }
868 }
869}
870
871impl fmt::Display for AlterSecretOperation {
872 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
873 match self {
874 AlterSecretOperation::ChangeCredential {
875 new_credential,
876 with_options,
877 } => {
878 write!(
879 f,
880 "WITH ({}) AS {}",
881 display_comma_separated(with_options),
882 new_credential
883 )
884 }
885 AlterSecretOperation::ChangeOwner { new_owner_name } => {
886 write!(f, "OWNER TO {new_owner_name}")
887 }
888 }
889 }
890}
891
892#[derive(Debug, Clone, PartialEq, Eq, Hash)]
894pub enum AlterColumnOperation {
895 SetNotNull,
897 DropNotNull,
899 SetDefault { value: Expr },
901 DropDefault,
903 SetDataType {
905 data_type: DataType,
906 using: Option<Expr>,
908 },
909}
910
911impl fmt::Display for AlterColumnOperation {
912 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
913 match self {
914 AlterColumnOperation::SetNotNull => write!(f, "SET NOT NULL",),
915 AlterColumnOperation::DropNotNull => write!(f, "DROP NOT NULL",),
916 AlterColumnOperation::SetDefault { value } => {
917 write!(f, "SET DEFAULT {}", value)
918 }
919 AlterColumnOperation::DropDefault => {
920 write!(f, "DROP DEFAULT")
921 }
922 AlterColumnOperation::SetDataType { data_type, using } => {
923 if let Some(expr) = using {
924 write!(f, "SET DATA TYPE {} USING {}", data_type, expr)
925 } else {
926 write!(f, "SET DATA TYPE {}", data_type)
927 }
928 }
929 }
930 }
931}
932
933impl fmt::Display for AlterFragmentOperation {
934 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
935 match self {
936 AlterFragmentOperation::AlterBackfillRateLimit { rate_limit } => {
937 write!(f, "SET BACKFILL_RATE_LIMIT TO {}", rate_limit)
938 }
939 AlterFragmentOperation::SetParallelism { parallelism } => {
940 write!(f, "SET PARALLELISM TO {}", parallelism)
941 }
942 }
943 }
944}
945
946impl fmt::Display for AlterCompactionGroupOperation {
947 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
948 match self {
949 AlterCompactionGroupOperation::Set { configs } => {
950 struct Assign<'a>(&'a ConfigParam);
951
952 impl fmt::Display for Assign<'_> {
953 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
954 write!(f, "{} = {}", self.0.param, self.0.value)
955 }
956 }
957
958 let assigns: Vec<Assign<'_>> = configs.iter().map(Assign).collect();
959 write!(f, "SET {}", display_comma_separated(&assigns))
960 }
961 }
962 }
963}
964
965#[derive(Debug, Clone, PartialEq, Eq, Hash)]
968pub struct SourceWatermark {
969 pub column: Ident,
970 pub expr: Expr,
971 pub with_ttl: bool,
973}
974
975impl fmt::Display for SourceWatermark {
976 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
977 write!(f, "WATERMARK FOR {} AS {}", self.column, self.expr,)?;
978 if self.with_ttl {
979 write!(f, " WITH TTL")?;
980 }
981 Ok(())
982 }
983}
984
985#[derive(Debug, Clone, PartialEq, Eq, Hash)]
988pub enum TableConstraint {
989 Unique {
991 name: Option<Ident>,
992 columns: Vec<Ident>,
993 is_primary: bool,
995 },
996 ForeignKey {
1002 name: Option<Ident>,
1003 columns: Vec<Ident>,
1004 foreign_table: ObjectName,
1005 referred_columns: Vec<Ident>,
1006 on_delete: Option<ReferentialAction>,
1007 on_update: Option<ReferentialAction>,
1008 },
1009 Check {
1011 name: Option<Ident>,
1012 expr: Box<Expr>,
1013 },
1014}
1015
1016impl fmt::Display for TableConstraint {
1017 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1018 match self {
1019 TableConstraint::Unique {
1020 name,
1021 columns,
1022 is_primary,
1023 } => write!(
1024 f,
1025 "{}{} ({})",
1026 display_constraint_name(name),
1027 if *is_primary { "PRIMARY KEY" } else { "UNIQUE" },
1028 display_comma_separated(columns)
1029 ),
1030 TableConstraint::ForeignKey {
1031 name,
1032 columns,
1033 foreign_table,
1034 referred_columns,
1035 on_delete,
1036 on_update,
1037 } => {
1038 write!(
1039 f,
1040 "{}FOREIGN KEY ({}) REFERENCES {}({})",
1041 display_constraint_name(name),
1042 display_comma_separated(columns),
1043 foreign_table,
1044 display_comma_separated(referred_columns),
1045 )?;
1046 if let Some(action) = on_delete {
1047 write!(f, " ON DELETE {}", action)?;
1048 }
1049 if let Some(action) = on_update {
1050 write!(f, " ON UPDATE {}", action)?;
1051 }
1052 Ok(())
1053 }
1054 TableConstraint::Check { name, expr } => {
1055 write!(f, "{}CHECK ({})", display_constraint_name(name), expr)
1056 }
1057 }
1058 }
1059}
1060
1061#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1063pub struct ColumnDef {
1064 pub name: Ident,
1065 pub data_type: Option<DataType>,
1066 pub collation: Option<ObjectName>,
1067 pub options: Vec<ColumnOptionDef>,
1068}
1069
1070impl ColumnDef {
1071 pub fn new(
1072 name: Ident,
1073 data_type: DataType,
1074 collation: Option<ObjectName>,
1075 options: Vec<ColumnOptionDef>,
1076 ) -> Self {
1077 ColumnDef {
1078 name,
1079 data_type: Some(data_type),
1080 collation,
1081 options,
1082 }
1083 }
1084
1085 pub fn is_generated(&self) -> bool {
1086 self.options
1087 .iter()
1088 .any(|option| matches!(option.option, ColumnOption::GeneratedColumns(_)))
1089 }
1090}
1091
1092impl fmt::Display for ColumnDef {
1093 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1094 write!(
1095 f,
1096 "{} {}",
1097 self.name,
1098 if let Some(data_type) = &self.data_type {
1099 data_type.to_string()
1100 } else {
1101 "None".to_owned()
1102 }
1103 )?;
1104 for option in &self.options {
1105 write!(f, " {}", option)?;
1106 }
1107 Ok(())
1108 }
1109}
1110
1111#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1128pub struct ColumnOptionDef {
1129 pub name: Option<Ident>,
1130 pub option: ColumnOption,
1131}
1132
1133impl fmt::Display for ColumnOptionDef {
1134 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1135 write!(f, "{}{}", display_constraint_name(&self.name), self.option)
1136 }
1137}
1138
1139#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1142pub enum ColumnOption {
1143 Null,
1145 NotNull,
1147 DefaultValue(Expr),
1149 DefaultValueInternal {
1152 persisted: Box<[u8]>,
1154 expr: Option<Expr>,
1158 },
1159 Unique { is_primary: bool },
1161 ForeignKey {
1167 foreign_table: ObjectName,
1168 referred_columns: Vec<Ident>,
1169 on_delete: Option<ReferentialAction>,
1170 on_update: Option<ReferentialAction>,
1171 },
1172 Check(Expr),
1174 DialectSpecific(Vec<Token>),
1178 GeneratedColumns(Expr),
1180}
1181
1182impl fmt::Display for ColumnOption {
1183 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1184 use ColumnOption::*;
1185 match self {
1186 Null => write!(f, "NULL"),
1187 NotNull => write!(f, "NOT NULL"),
1188 DefaultValue(expr) => write!(f, "DEFAULT {}", expr),
1189 DefaultValueInternal { persisted: _, expr } => {
1190 if let Some(expr) = expr {
1191 write!(f, "DEFAULT {}", expr)
1192 } else {
1193 write!(f, "DEFAULT INTERNAL")
1194 }
1195 }
1196 Unique { is_primary } => {
1197 write!(f, "{}", if *is_primary { "PRIMARY KEY" } else { "UNIQUE" })
1198 }
1199 ForeignKey {
1200 foreign_table,
1201 referred_columns,
1202 on_delete,
1203 on_update,
1204 } => {
1205 write!(f, "REFERENCES {}", foreign_table)?;
1206 if !referred_columns.is_empty() {
1207 write!(f, " ({})", display_comma_separated(referred_columns))?;
1208 }
1209 if let Some(action) = on_delete {
1210 write!(f, " ON DELETE {}", action)?;
1211 }
1212 if let Some(action) = on_update {
1213 write!(f, " ON UPDATE {}", action)?;
1214 }
1215 Ok(())
1216 }
1217 Check(expr) => write!(f, "CHECK ({})", expr),
1218 DialectSpecific(val) => write!(f, "{}", display_separated(val, " ")),
1219 GeneratedColumns(expr) => write!(f, "AS {}", expr),
1220 }
1221 }
1222}
1223
1224fn display_constraint_name(name: &'_ Option<Ident>) -> impl fmt::Display + '_ {
1225 struct ConstraintName<'a>(&'a Option<Ident>);
1226 impl fmt::Display for ConstraintName<'_> {
1227 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1228 if let Some(name) = self.0 {
1229 write!(f, "CONSTRAINT {} ", name)?;
1230 }
1231 Ok(())
1232 }
1233 }
1234 ConstraintName(name)
1235}
1236
1237#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1242pub enum ReferentialAction {
1243 Restrict,
1244 Cascade,
1245 SetNull,
1246 NoAction,
1247 SetDefault,
1248}
1249
1250impl fmt::Display for ReferentialAction {
1251 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1252 f.write_str(match self {
1253 ReferentialAction::Restrict => "RESTRICT",
1254 ReferentialAction::Cascade => "CASCADE",
1255 ReferentialAction::SetNull => "SET NULL",
1256 ReferentialAction::NoAction => "NO ACTION",
1257 ReferentialAction::SetDefault => "SET DEFAULT",
1258 })
1259 }
1260}
1261
1262#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1264pub struct WebhookSourceInfo {
1265 pub secret_ref: Option<SecretRefValue>,
1266 pub signature_expr: Option<Expr>,
1267 pub wait_for_persistence: bool,
1268 pub is_batched: bool,
1269}