1use std::fmt;
17
18use super::{ConfigParam, FormatEncodeOptions, SqlOption};
19use crate::ast::{
20 DataType, Expr, Ident, ObjectName, Query, SecretRefValue, SetVariableValue, Value,
21 display_comma_separated, display_separated,
22};
23use crate::tokenizer::Token;
24
25#[derive(Debug, Clone, PartialEq, Eq, Hash)]
26pub enum AlterDatabaseOperation {
27 ChangeOwner { new_owner_name: Ident },
28 RenameDatabase { database_name: ObjectName },
29 SetParam(ConfigParam),
30}
31
32#[derive(Debug, Clone, PartialEq, Eq, Hash)]
33pub enum AlterSchemaOperation {
34 ChangeOwner { new_owner_name: Ident },
35 RenameSchema { schema_name: ObjectName },
36 SwapRenameSchema { target_schema: ObjectName },
37}
38
39#[derive(Debug, Clone, PartialEq, Eq, Hash)]
41pub enum AlterTableOperation {
42 AddConstraint(TableConstraint),
44 AddColumn {
46 column_def: ColumnDef,
47 },
48 DropConstraint {
50 name: Ident,
51 },
52 DropColumn {
54 column_name: Ident,
55 if_exists: bool,
56 cascade: bool,
57 },
58 RenameColumn {
60 old_column_name: Ident,
61 new_column_name: Ident,
62 },
63 RenameTable {
65 table_name: ObjectName,
66 },
67 ChangeColumn {
69 old_name: Ident,
70 new_name: Ident,
71 data_type: DataType,
72 options: Vec<ColumnOption>,
73 },
74 RenameConstraint {
78 old_name: Ident,
79 new_name: Ident,
80 },
81 AlterColumn {
83 column_name: Ident,
84 op: AlterColumnOperation,
85 },
86 AlterWatermark {
88 column_name: Ident,
89 expr: Expr,
90 with_ttl: bool,
91 },
92 ChangeOwner {
94 new_owner_name: Ident,
95 },
96 SetSchema {
98 new_schema_name: ObjectName,
99 },
100 SetParallelism {
102 parallelism: SetVariableValue,
103 deferred: bool,
104 },
105 SetBackfillParallelism {
107 parallelism: SetVariableValue,
108 deferred: bool,
109 },
110 SetConfig {
112 entries: Vec<SqlOption>,
113 },
114 ResetConfig {
116 keys: Vec<ObjectName>,
117 },
118 RefreshSchema,
119 SetSourceRateLimit {
121 rate_limit: i32,
122 },
123 SetBackfillRateLimit {
125 rate_limit: i32,
126 },
127 SetDmlRateLimit {
129 rate_limit: i32,
130 },
131 SwapRenameTable {
133 target_table: ObjectName,
134 },
135 DropConnector,
137
138 AlterConnectorProps {
140 alter_props: Vec<SqlOption>,
141 },
142}
143
144#[derive(Debug, Clone, PartialEq, Eq, Hash)]
145pub enum AlterIndexOperation {
146 RenameIndex {
147 index_name: ObjectName,
148 },
149 SetParallelism {
151 parallelism: SetVariableValue,
152 deferred: bool,
153 },
154 SetBackfillParallelism {
156 parallelism: SetVariableValue,
157 deferred: bool,
158 },
159 SetConfig {
161 entries: Vec<SqlOption>,
162 },
163 ResetConfig {
165 keys: Vec<ObjectName>,
166 },
167}
168
169#[derive(Debug, Clone, PartialEq, Eq, Hash)]
170pub enum AlterViewOperation {
171 RenameView {
172 view_name: ObjectName,
173 },
174 ChangeOwner {
175 new_owner_name: Ident,
176 },
177 SetSchema {
178 new_schema_name: ObjectName,
179 },
180 SetParallelism {
182 parallelism: SetVariableValue,
183 deferred: bool,
184 },
185 SetBackfillParallelism {
187 parallelism: SetVariableValue,
188 deferred: bool,
189 },
190 SetResourceGroup {
193 resource_group: Option<SetVariableValue>,
194 deferred: bool,
195 },
196 SetBackfillRateLimit {
198 rate_limit: i32,
199 },
200 SwapRenameView {
202 target_view: ObjectName,
203 },
204 SetStreamingEnableUnalignedJoin {
205 enable: bool,
206 },
207 AsQuery {
209 query: Box<Query>,
210 },
211 SetConfig {
213 entries: Vec<SqlOption>,
214 },
215 ResetConfig {
217 keys: Vec<ObjectName>,
218 },
219}
220
221#[derive(Debug, Clone, PartialEq, Eq, Hash)]
222pub enum AlterSinkOperation {
223 RenameSink {
224 sink_name: ObjectName,
225 },
226 ChangeOwner {
227 new_owner_name: Ident,
228 },
229 SetSchema {
230 new_schema_name: ObjectName,
231 },
232 SetParallelism {
234 parallelism: SetVariableValue,
235 deferred: bool,
236 },
237 SetBackfillParallelism {
239 parallelism: SetVariableValue,
240 deferred: bool,
241 },
242 SetConfig {
244 entries: Vec<SqlOption>,
245 },
246 ResetConfig {
248 keys: Vec<ObjectName>,
249 },
250 SwapRenameSink {
252 target_sink: ObjectName,
253 },
254 SetSinkRateLimit {
255 rate_limit: i32,
256 },
257 SetBackfillRateLimit {
258 rate_limit: i32,
259 },
260 AlterConnectorProps {
261 alter_props: Vec<SqlOption>,
262 },
263 SetStreamingEnableUnalignedJoin {
264 enable: bool,
265 },
266}
267
268#[derive(Debug, Clone, PartialEq, Eq, Hash)]
269pub enum AlterSubscriptionOperation {
270 RenameSubscription { subscription_name: ObjectName },
271 ChangeOwner { new_owner_name: Ident },
272 SetSchema { new_schema_name: ObjectName },
273 SetRetention { retention: Value },
274 SwapRenameSubscription { target_subscription: ObjectName },
275}
276
277#[derive(Debug, Clone, PartialEq, Eq, Hash)]
278pub enum AlterSourceOperation {
279 RenameSource {
280 source_name: ObjectName,
281 },
282 AddColumn {
283 column_def: ColumnDef,
284 },
285 ChangeOwner {
286 new_owner_name: Ident,
287 },
288 SetSchema {
289 new_schema_name: ObjectName,
290 },
291 FormatEncode {
292 format_encode: FormatEncodeOptions,
293 },
294 RefreshSchema,
295 SetSourceRateLimit {
296 rate_limit: i32,
297 },
298 SwapRenameSource {
299 target_source: ObjectName,
300 },
301 SetParallelism {
303 parallelism: SetVariableValue,
304 deferred: bool,
305 },
306 SetBackfillParallelism {
308 parallelism: SetVariableValue,
309 deferred: bool,
310 },
311 SetConfig {
313 entries: Vec<SqlOption>,
314 },
315 ResetConfig {
317 keys: Vec<ObjectName>,
318 },
319 ResetSource,
321 AlterConnectorProps {
322 alter_props: Vec<SqlOption>,
323 },
324}
325
326#[derive(Debug, Clone, PartialEq, Eq, Hash)]
327pub enum AlterFunctionOperation {
328 SetSchema { new_schema_name: ObjectName },
329 ChangeOwner { new_owner_name: Ident },
330}
331
332#[derive(Debug, Clone, PartialEq, Eq, Hash)]
333pub enum AlterConnectionOperation {
334 SetSchema { new_schema_name: ObjectName },
335 ChangeOwner { new_owner_name: Ident },
336 AlterConnectorProps { alter_props: Vec<SqlOption> },
337}
338
339#[derive(Debug, Clone, PartialEq, Eq, Hash)]
340pub enum AlterSecretOperation {
341 ChangeCredential {
342 with_options: Vec<SqlOption>,
343 new_credential: Value,
344 },
345 ChangeOwner {
346 new_owner_name: Ident,
347 },
348}
349
350#[derive(Debug, Clone, PartialEq, Eq, Hash)]
351pub enum AlterFragmentOperation {
352 AlterBackfillRateLimit { rate_limit: i32 },
353 SetParallelism { parallelism: SetVariableValue },
354}
355
356#[derive(Debug, Clone, PartialEq, Eq, Hash)]
357pub enum AlterCompactionGroupOperation {
358 Set { configs: Vec<ConfigParam> },
359}
360
361impl fmt::Display for AlterDatabaseOperation {
362 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
363 match self {
364 AlterDatabaseOperation::ChangeOwner { new_owner_name } => {
365 write!(f, "OWNER TO {}", new_owner_name)
366 }
367 AlterDatabaseOperation::RenameDatabase { database_name } => {
368 write!(f, "RENAME TO {}", database_name)
369 }
370 AlterDatabaseOperation::SetParam(ConfigParam { param, value }) => {
371 write!(f, "SET {} TO {}", param, value)
372 }
373 }
374 }
375}
376
377impl fmt::Display for AlterSchemaOperation {
378 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
379 match self {
380 AlterSchemaOperation::ChangeOwner { new_owner_name } => {
381 write!(f, "OWNER TO {}", new_owner_name)
382 }
383 AlterSchemaOperation::RenameSchema { schema_name } => {
384 write!(f, "RENAME TO {}", schema_name)
385 }
386 AlterSchemaOperation::SwapRenameSchema { target_schema } => {
387 write!(f, "SWAP WITH {}", target_schema)
388 }
389 }
390 }
391}
392
393impl fmt::Display for AlterTableOperation {
394 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
395 match self {
396 AlterTableOperation::AddConstraint(c) => write!(f, "ADD {}", c),
397 AlterTableOperation::AddColumn { column_def } => {
398 write!(f, "ADD COLUMN {}", column_def)
399 }
400 AlterTableOperation::AlterColumn { column_name, op } => {
401 write!(f, "ALTER COLUMN {} {}", column_name, op)
402 }
403 AlterTableOperation::AlterWatermark {
404 column_name,
405 expr,
406 with_ttl,
407 } => {
408 write!(f, "ALTER WATERMARK FOR {} AS {}", column_name, expr)?;
409 if *with_ttl {
410 write!(f, " WITH TTL")?;
411 }
412 Ok(())
413 }
414 AlterTableOperation::DropConstraint { name } => write!(f, "DROP CONSTRAINT {}", name),
415 AlterTableOperation::DropColumn {
416 column_name,
417 if_exists,
418 cascade,
419 } => write!(
420 f,
421 "DROP COLUMN {}{}{}",
422 if *if_exists { "IF EXISTS " } else { "" },
423 column_name,
424 if *cascade { " CASCADE" } else { "" }
425 ),
426 AlterTableOperation::RenameColumn {
427 old_column_name,
428 new_column_name,
429 } => write!(
430 f,
431 "RENAME COLUMN {} TO {}",
432 old_column_name, new_column_name
433 ),
434 AlterTableOperation::RenameTable { table_name } => {
435 write!(f, "RENAME TO {}", table_name)
436 }
437 AlterTableOperation::ChangeColumn {
438 old_name,
439 new_name,
440 data_type,
441 options,
442 } => {
443 write!(f, "CHANGE COLUMN {} {} {}", old_name, new_name, data_type)?;
444 if options.is_empty() {
445 Ok(())
446 } else {
447 write!(f, " {}", display_separated(options, " "))
448 }
449 }
450 AlterTableOperation::RenameConstraint { old_name, new_name } => {
451 write!(f, "RENAME CONSTRAINT {} TO {}", old_name, new_name)
452 }
453 AlterTableOperation::ChangeOwner { new_owner_name } => {
454 write!(f, "OWNER TO {}", new_owner_name)
455 }
456 AlterTableOperation::SetSchema { new_schema_name } => {
457 write!(f, "SET SCHEMA {}", new_schema_name)
458 }
459 AlterTableOperation::SetParallelism {
460 parallelism,
461 deferred,
462 } => {
463 write!(
464 f,
465 "SET PARALLELISM TO {}{}",
466 parallelism,
467 if *deferred { " DEFERRED" } else { "" }
468 )
469 }
470 AlterTableOperation::SetBackfillParallelism {
471 parallelism,
472 deferred,
473 } => {
474 write!(
475 f,
476 "SET BACKFILL_PARALLELISM TO {}{}",
477 parallelism,
478 if *deferred { " DEFERRED" } else { "" }
479 )
480 }
481 AlterTableOperation::SetConfig { entries } => {
482 write!(f, "SET CONFIG ({})", display_comma_separated(entries))
483 }
484 AlterTableOperation::ResetConfig { keys } => {
485 write!(f, "RESET CONFIG ({})", display_comma_separated(keys))
486 }
487 AlterTableOperation::RefreshSchema => {
488 write!(f, "REFRESH SCHEMA")
489 }
490 AlterTableOperation::SetSourceRateLimit { rate_limit } => {
491 write!(f, "SET SOURCE_RATE_LIMIT TO {}", rate_limit)
492 }
493 AlterTableOperation::SetBackfillRateLimit { rate_limit } => {
494 write!(f, "SET BACKFILL_RATE_LIMIT TO {}", rate_limit)
495 }
496 AlterTableOperation::SetDmlRateLimit { rate_limit } => {
497 write!(f, "SET DML_RATE_LIMIT TO {}", rate_limit)
498 }
499 AlterTableOperation::SwapRenameTable { target_table } => {
500 write!(f, "SWAP WITH {}", target_table)
501 }
502 AlterTableOperation::DropConnector => {
503 write!(f, "DROP CONNECTOR")
504 }
505 AlterTableOperation::AlterConnectorProps { alter_props } => {
506 write!(
507 f,
508 "CONNECTOR WITH ({})",
509 display_comma_separated(alter_props)
510 )
511 }
512 }
513 }
514}
515
516impl fmt::Display for AlterIndexOperation {
517 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
518 match self {
519 AlterIndexOperation::RenameIndex { index_name } => {
520 write!(f, "RENAME TO {index_name}")
521 }
522 AlterIndexOperation::SetParallelism {
523 parallelism,
524 deferred,
525 } => {
526 write!(
527 f,
528 "SET PARALLELISM TO {}{}",
529 parallelism,
530 if *deferred { " DEFERRED" } else { "" }
531 )
532 }
533 AlterIndexOperation::SetBackfillParallelism {
534 parallelism,
535 deferred,
536 } => {
537 write!(
538 f,
539 "SET BACKFILL_PARALLELISM TO {}{}",
540 parallelism,
541 if *deferred { " DEFERRED" } else { "" }
542 )
543 }
544 AlterIndexOperation::SetConfig { entries } => {
545 write!(f, "SET CONFIG ({})", display_comma_separated(entries))
546 }
547 AlterIndexOperation::ResetConfig { keys } => {
548 write!(f, "RESET CONFIG ({})", display_comma_separated(keys))
549 }
550 }
551 }
552}
553
554impl fmt::Display for AlterViewOperation {
555 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
556 match self {
557 AlterViewOperation::RenameView { view_name } => {
558 write!(f, "RENAME TO {view_name}")
559 }
560 AlterViewOperation::ChangeOwner { new_owner_name } => {
561 write!(f, "OWNER TO {}", new_owner_name)
562 }
563 AlterViewOperation::SetSchema { new_schema_name } => {
564 write!(f, "SET SCHEMA {}", new_schema_name)
565 }
566 AlterViewOperation::SetParallelism {
567 parallelism,
568 deferred,
569 } => {
570 write!(
571 f,
572 "SET PARALLELISM TO {}{}",
573 parallelism,
574 if *deferred { " DEFERRED" } else { "" }
575 )
576 }
577 AlterViewOperation::SetBackfillParallelism {
578 parallelism,
579 deferred,
580 } => {
581 write!(
582 f,
583 "SET BACKFILL_PARALLELISM TO {}{}",
584 parallelism,
585 if *deferred { " DEFERRED" } else { "" }
586 )
587 }
588 AlterViewOperation::SetBackfillRateLimit { rate_limit } => {
589 write!(f, "SET BACKFILL_RATE_LIMIT TO {}", rate_limit)
590 }
591 AlterViewOperation::SwapRenameView { target_view } => {
592 write!(f, "SWAP WITH {}", target_view)
593 }
594 AlterViewOperation::SetResourceGroup {
595 resource_group,
596 deferred,
597 } => {
598 let deferred = if *deferred { " DEFERRED" } else { "" };
599
600 if let Some(resource_group) = resource_group {
601 write!(f, "SET RESOURCE_GROUP TO {} {}", resource_group, deferred)
602 } else {
603 write!(f, "RESET RESOURCE_GROUP {}", deferred)
604 }
605 }
606 AlterViewOperation::SetStreamingEnableUnalignedJoin { enable } => {
607 write!(f, "SET STREAMING_ENABLE_UNALIGNED_JOIN TO {}", enable)
608 }
609 AlterViewOperation::AsQuery { query } => {
610 write!(f, "AS {}", query)
611 }
612 AlterViewOperation::SetConfig { entries } => {
613 write!(f, "SET CONFIG ({})", display_comma_separated(entries))
614 }
615 AlterViewOperation::ResetConfig { keys } => {
616 write!(f, "RESET CONFIG ({})", display_comma_separated(keys))
617 }
618 }
619 }
620}
621
622impl fmt::Display for AlterSinkOperation {
623 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
624 match self {
625 AlterSinkOperation::RenameSink { sink_name } => {
626 write!(f, "RENAME TO {sink_name}")
627 }
628 AlterSinkOperation::ChangeOwner { new_owner_name } => {
629 write!(f, "OWNER TO {}", new_owner_name)
630 }
631 AlterSinkOperation::SetSchema { new_schema_name } => {
632 write!(f, "SET SCHEMA {}", new_schema_name)
633 }
634 AlterSinkOperation::SetParallelism {
635 parallelism,
636 deferred,
637 } => {
638 write!(
639 f,
640 "SET PARALLELISM TO {}{}",
641 parallelism,
642 if *deferred { " DEFERRED" } else { "" }
643 )
644 }
645 AlterSinkOperation::SetBackfillParallelism {
646 parallelism,
647 deferred,
648 } => {
649 write!(
650 f,
651 "SET BACKFILL_PARALLELISM TO {}{}",
652 parallelism,
653 if *deferred { " DEFERRED" } else { "" }
654 )
655 }
656 AlterSinkOperation::SetConfig { entries } => {
657 write!(f, "SET CONFIG ({})", display_comma_separated(entries))
658 }
659 AlterSinkOperation::ResetConfig { keys } => {
660 write!(f, "RESET CONFIG ({})", display_comma_separated(keys))
661 }
662 AlterSinkOperation::SwapRenameSink { target_sink } => {
663 write!(f, "SWAP WITH {}", target_sink)
664 }
665 AlterSinkOperation::SetSinkRateLimit { rate_limit } => {
666 write!(f, "SET SINK_RATE_LIMIT TO {}", rate_limit)
667 }
668 AlterSinkOperation::SetBackfillRateLimit { rate_limit } => {
669 write!(f, "SET BACKFILL_RATE_LIMIT TO {}", rate_limit)
670 }
671 AlterSinkOperation::AlterConnectorProps {
672 alter_props: changed_props,
673 } => {
674 write!(
675 f,
676 "CONNECTOR WITH ({})",
677 display_comma_separated(changed_props)
678 )
679 }
680 AlterSinkOperation::SetStreamingEnableUnalignedJoin { enable } => {
681 write!(f, "SET STREAMING_ENABLE_UNALIGNED_JOIN TO {}", enable)
682 }
683 }
684 }
685}
686
687impl fmt::Display for AlterSubscriptionOperation {
688 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
689 match self {
690 AlterSubscriptionOperation::RenameSubscription { subscription_name } => {
691 write!(f, "RENAME TO {subscription_name}")
692 }
693 AlterSubscriptionOperation::ChangeOwner { new_owner_name } => {
694 write!(f, "OWNER TO {}", new_owner_name)
695 }
696 AlterSubscriptionOperation::SetSchema { new_schema_name } => {
697 write!(f, "SET SCHEMA {}", new_schema_name)
698 }
699 AlterSubscriptionOperation::SetRetention { retention } => {
700 write!(f, "SET RETENTION TO {}", retention)
701 }
702 AlterSubscriptionOperation::SwapRenameSubscription {
703 target_subscription,
704 } => {
705 write!(f, "SWAP WITH {}", target_subscription)
706 }
707 }
708 }
709}
710
711impl fmt::Display for AlterSourceOperation {
712 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
713 match self {
714 AlterSourceOperation::RenameSource { source_name } => {
715 write!(f, "RENAME TO {source_name}")
716 }
717 AlterSourceOperation::AddColumn { column_def } => {
718 write!(f, "ADD COLUMN {column_def}")
719 }
720 AlterSourceOperation::ChangeOwner { new_owner_name } => {
721 write!(f, "OWNER TO {}", new_owner_name)
722 }
723 AlterSourceOperation::SetSchema { new_schema_name } => {
724 write!(f, "SET SCHEMA {}", new_schema_name)
725 }
726 AlterSourceOperation::FormatEncode { format_encode } => {
727 write!(f, "{format_encode}")
728 }
729 AlterSourceOperation::RefreshSchema => {
730 write!(f, "REFRESH SCHEMA")
731 }
732 AlterSourceOperation::SetSourceRateLimit { rate_limit } => {
733 write!(f, "SET SOURCE_RATE_LIMIT TO {}", rate_limit)
734 }
735 AlterSourceOperation::SwapRenameSource { target_source } => {
736 write!(f, "SWAP WITH {}", target_source)
737 }
738 AlterSourceOperation::SetParallelism {
739 parallelism,
740 deferred,
741 } => {
742 write!(
743 f,
744 "SET PARALLELISM TO {}{}",
745 parallelism,
746 if *deferred { " DEFERRED" } else { "" }
747 )
748 }
749 AlterSourceOperation::SetBackfillParallelism {
750 parallelism,
751 deferred,
752 } => {
753 write!(
754 f,
755 "SET BACKFILL_PARALLELISM TO {}{}",
756 parallelism,
757 if *deferred { " DEFERRED" } else { "" }
758 )
759 }
760 AlterSourceOperation::SetConfig { entries } => {
761 write!(f, "SET CONFIG ({})", display_comma_separated(entries))
762 }
763 AlterSourceOperation::ResetConfig { keys } => {
764 write!(f, "RESET CONFIG ({})", display_comma_separated(keys))
765 }
766 AlterSourceOperation::ResetSource => {
767 write!(f, "RESET")
768 }
769 AlterSourceOperation::AlterConnectorProps { alter_props } => {
770 write!(
771 f,
772 "CONNECTOR WITH ({})",
773 display_comma_separated(alter_props)
774 )
775 }
776 }
777 }
778}
779
780impl fmt::Display for AlterFunctionOperation {
781 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
782 match self {
783 AlterFunctionOperation::SetSchema { new_schema_name } => {
784 write!(f, "SET SCHEMA {new_schema_name}")
785 }
786 AlterFunctionOperation::ChangeOwner { new_owner_name } => {
787 write!(f, "OWNER TO {new_owner_name}")
788 }
789 }
790 }
791}
792
793impl fmt::Display for AlterConnectionOperation {
794 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
795 match self {
796 AlterConnectionOperation::SetSchema { new_schema_name } => {
797 write!(f, "SET SCHEMA {new_schema_name}")
798 }
799 AlterConnectionOperation::ChangeOwner { new_owner_name } => {
800 write!(f, "OWNER TO {new_owner_name}")
801 }
802 AlterConnectionOperation::AlterConnectorProps { alter_props } => {
803 write!(
804 f,
805 "CONNECTOR WITH ({})",
806 display_comma_separated(alter_props)
807 )
808 }
809 }
810 }
811}
812
813impl fmt::Display for AlterSecretOperation {
814 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
815 match self {
816 AlterSecretOperation::ChangeCredential {
817 new_credential,
818 with_options,
819 } => {
820 write!(
821 f,
822 "WITH ({}) AS {}",
823 display_comma_separated(with_options),
824 new_credential
825 )
826 }
827 AlterSecretOperation::ChangeOwner { new_owner_name } => {
828 write!(f, "OWNER TO {new_owner_name}")
829 }
830 }
831 }
832}
833
834#[derive(Debug, Clone, PartialEq, Eq, Hash)]
836pub enum AlterColumnOperation {
837 SetNotNull,
839 DropNotNull,
841 SetDefault { value: Expr },
843 DropDefault,
845 SetDataType {
847 data_type: DataType,
848 using: Option<Expr>,
850 },
851}
852
853impl fmt::Display for AlterColumnOperation {
854 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
855 match self {
856 AlterColumnOperation::SetNotNull => write!(f, "SET NOT NULL",),
857 AlterColumnOperation::DropNotNull => write!(f, "DROP NOT NULL",),
858 AlterColumnOperation::SetDefault { value } => {
859 write!(f, "SET DEFAULT {}", value)
860 }
861 AlterColumnOperation::DropDefault => {
862 write!(f, "DROP DEFAULT")
863 }
864 AlterColumnOperation::SetDataType { data_type, using } => {
865 if let Some(expr) = using {
866 write!(f, "SET DATA TYPE {} USING {}", data_type, expr)
867 } else {
868 write!(f, "SET DATA TYPE {}", data_type)
869 }
870 }
871 }
872 }
873}
874
875impl fmt::Display for AlterFragmentOperation {
876 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
877 match self {
878 AlterFragmentOperation::AlterBackfillRateLimit { rate_limit } => {
879 write!(f, "SET BACKFILL_RATE_LIMIT TO {}", rate_limit)
880 }
881 AlterFragmentOperation::SetParallelism { parallelism } => {
882 write!(f, "SET PARALLELISM TO {}", parallelism)
883 }
884 }
885 }
886}
887
888impl fmt::Display for AlterCompactionGroupOperation {
889 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
890 match self {
891 AlterCompactionGroupOperation::Set { configs } => {
892 struct Assign<'a>(&'a ConfigParam);
893
894 impl fmt::Display for Assign<'_> {
895 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
896 write!(f, "{} = {}", self.0.param, self.0.value)
897 }
898 }
899
900 let assigns: Vec<Assign<'_>> = configs.iter().map(Assign).collect();
901 write!(f, "SET {}", display_comma_separated(&assigns))
902 }
903 }
904 }
905}
906
907#[derive(Debug, Clone, PartialEq, Eq, Hash)]
910pub struct SourceWatermark {
911 pub column: Ident,
912 pub expr: Expr,
913 pub with_ttl: bool,
915}
916
917impl fmt::Display for SourceWatermark {
918 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
919 write!(f, "WATERMARK FOR {} AS {}", self.column, self.expr,)?;
920 if self.with_ttl {
921 write!(f, " WITH TTL")?;
922 }
923 Ok(())
924 }
925}
926
927#[derive(Debug, Clone, PartialEq, Eq, Hash)]
930pub enum TableConstraint {
931 Unique {
933 name: Option<Ident>,
934 columns: Vec<Ident>,
935 is_primary: bool,
937 },
938 ForeignKey {
944 name: Option<Ident>,
945 columns: Vec<Ident>,
946 foreign_table: ObjectName,
947 referred_columns: Vec<Ident>,
948 on_delete: Option<ReferentialAction>,
949 on_update: Option<ReferentialAction>,
950 },
951 Check {
953 name: Option<Ident>,
954 expr: Box<Expr>,
955 },
956}
957
958impl fmt::Display for TableConstraint {
959 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
960 match self {
961 TableConstraint::Unique {
962 name,
963 columns,
964 is_primary,
965 } => write!(
966 f,
967 "{}{} ({})",
968 display_constraint_name(name),
969 if *is_primary { "PRIMARY KEY" } else { "UNIQUE" },
970 display_comma_separated(columns)
971 ),
972 TableConstraint::ForeignKey {
973 name,
974 columns,
975 foreign_table,
976 referred_columns,
977 on_delete,
978 on_update,
979 } => {
980 write!(
981 f,
982 "{}FOREIGN KEY ({}) REFERENCES {}({})",
983 display_constraint_name(name),
984 display_comma_separated(columns),
985 foreign_table,
986 display_comma_separated(referred_columns),
987 )?;
988 if let Some(action) = on_delete {
989 write!(f, " ON DELETE {}", action)?;
990 }
991 if let Some(action) = on_update {
992 write!(f, " ON UPDATE {}", action)?;
993 }
994 Ok(())
995 }
996 TableConstraint::Check { name, expr } => {
997 write!(f, "{}CHECK ({})", display_constraint_name(name), expr)
998 }
999 }
1000 }
1001}
1002
1003#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1005pub struct ColumnDef {
1006 pub name: Ident,
1007 pub data_type: Option<DataType>,
1008 pub collation: Option<ObjectName>,
1009 pub options: Vec<ColumnOptionDef>,
1010}
1011
1012impl ColumnDef {
1013 pub fn new(
1014 name: Ident,
1015 data_type: DataType,
1016 collation: Option<ObjectName>,
1017 options: Vec<ColumnOptionDef>,
1018 ) -> Self {
1019 ColumnDef {
1020 name,
1021 data_type: Some(data_type),
1022 collation,
1023 options,
1024 }
1025 }
1026
1027 pub fn is_generated(&self) -> bool {
1028 self.options
1029 .iter()
1030 .any(|option| matches!(option.option, ColumnOption::GeneratedColumns(_)))
1031 }
1032}
1033
1034impl fmt::Display for ColumnDef {
1035 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1036 write!(
1037 f,
1038 "{} {}",
1039 self.name,
1040 if let Some(data_type) = &self.data_type {
1041 data_type.to_string()
1042 } else {
1043 "None".to_owned()
1044 }
1045 )?;
1046 for option in &self.options {
1047 write!(f, " {}", option)?;
1048 }
1049 Ok(())
1050 }
1051}
1052
1053#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1070pub struct ColumnOptionDef {
1071 pub name: Option<Ident>,
1072 pub option: ColumnOption,
1073}
1074
1075impl fmt::Display for ColumnOptionDef {
1076 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1077 write!(f, "{}{}", display_constraint_name(&self.name), self.option)
1078 }
1079}
1080
1081#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1084pub enum ColumnOption {
1085 Null,
1087 NotNull,
1089 DefaultValue(Expr),
1091 DefaultValueInternal {
1094 persisted: Box<[u8]>,
1096 expr: Option<Expr>,
1100 },
1101 Unique { is_primary: bool },
1103 ForeignKey {
1109 foreign_table: ObjectName,
1110 referred_columns: Vec<Ident>,
1111 on_delete: Option<ReferentialAction>,
1112 on_update: Option<ReferentialAction>,
1113 },
1114 Check(Expr),
1116 DialectSpecific(Vec<Token>),
1120 GeneratedColumns(Expr),
1122}
1123
1124impl fmt::Display for ColumnOption {
1125 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1126 use ColumnOption::*;
1127 match self {
1128 Null => write!(f, "NULL"),
1129 NotNull => write!(f, "NOT NULL"),
1130 DefaultValue(expr) => write!(f, "DEFAULT {}", expr),
1131 DefaultValueInternal { persisted: _, expr } => {
1132 if let Some(expr) = expr {
1133 write!(f, "DEFAULT {}", expr)
1134 } else {
1135 write!(f, "DEFAULT INTERNAL")
1136 }
1137 }
1138 Unique { is_primary } => {
1139 write!(f, "{}", if *is_primary { "PRIMARY KEY" } else { "UNIQUE" })
1140 }
1141 ForeignKey {
1142 foreign_table,
1143 referred_columns,
1144 on_delete,
1145 on_update,
1146 } => {
1147 write!(f, "REFERENCES {}", foreign_table)?;
1148 if !referred_columns.is_empty() {
1149 write!(f, " ({})", display_comma_separated(referred_columns))?;
1150 }
1151 if let Some(action) = on_delete {
1152 write!(f, " ON DELETE {}", action)?;
1153 }
1154 if let Some(action) = on_update {
1155 write!(f, " ON UPDATE {}", action)?;
1156 }
1157 Ok(())
1158 }
1159 Check(expr) => write!(f, "CHECK ({})", expr),
1160 DialectSpecific(val) => write!(f, "{}", display_separated(val, " ")),
1161 GeneratedColumns(expr) => write!(f, "AS {}", expr),
1162 }
1163 }
1164}
1165
1166fn display_constraint_name(name: &'_ Option<Ident>) -> impl fmt::Display + '_ {
1167 struct ConstraintName<'a>(&'a Option<Ident>);
1168 impl fmt::Display for ConstraintName<'_> {
1169 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1170 if let Some(name) = self.0 {
1171 write!(f, "CONSTRAINT {} ", name)?;
1172 }
1173 Ok(())
1174 }
1175 }
1176 ConstraintName(name)
1177}
1178
1179#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1184pub enum ReferentialAction {
1185 Restrict,
1186 Cascade,
1187 SetNull,
1188 NoAction,
1189 SetDefault,
1190}
1191
1192impl fmt::Display for ReferentialAction {
1193 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1194 f.write_str(match self {
1195 ReferentialAction::Restrict => "RESTRICT",
1196 ReferentialAction::Cascade => "CASCADE",
1197 ReferentialAction::SetNull => "SET NULL",
1198 ReferentialAction::NoAction => "NO ACTION",
1199 ReferentialAction::SetDefault => "SET DEFAULT",
1200 })
1201 }
1202}
1203
1204#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1206pub struct WebhookSourceInfo {
1207 pub secret_ref: Option<SecretRefValue>,
1208 pub signature_expr: Option<Expr>,
1209 pub wait_for_persistence: bool,
1210 pub is_batched: bool,
1211}