risingwave_sqlparser/ast/
ddl.rs

1// Licensed under the Apache License, Version 2.0 (the "License");
2// you may not use this file except in compliance with the License.
3// You may obtain a copy of the License at
4//
5//     http://www.apache.org/licenses/LICENSE-2.0
6//
7// Unless required by applicable law or agreed to in writing, software
8// distributed under the License is distributed on an "AS IS" BASIS,
9// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10// See the License for the specific language governing permissions and
11// limitations under the License.
12
13//! AST types specific to CREATE/ALTER variants of [`crate::ast::Statement`]
14//! (commonly referred to as Data Definition Language, or DDL)
15
16use std::fmt;
17
18use super::{ConfigParam, FormatEncodeOptions, SqlOption};
19use crate::ast::{
20    DataType, Expr, Ident, ObjectName, Query, SecretRefValue, SetVariableValue, Value,
21    display_comma_separated, display_separated,
22};
23use crate::tokenizer::Token;
24
25#[derive(Debug, Clone, PartialEq, Eq, Hash)]
26pub enum AlterDatabaseOperation {
27    ChangeOwner { new_owner_name: Ident },
28    RenameDatabase { database_name: ObjectName },
29    SetParam(ConfigParam),
30}
31
32#[derive(Debug, Clone, PartialEq, Eq, Hash)]
33pub enum AlterSchemaOperation {
34    ChangeOwner { new_owner_name: Ident },
35    RenameSchema { schema_name: ObjectName },
36    SwapRenameSchema { target_schema: ObjectName },
37}
38
39/// An `ALTER TABLE` (`Statement::AlterTable`) operation
40#[derive(Debug, Clone, PartialEq, Eq, Hash)]
41pub enum AlterTableOperation {
42    /// `ADD <table_constraint>`
43    AddConstraint(TableConstraint),
44    /// `ADD [ COLUMN ] <column_def>`
45    AddColumn {
46        column_def: ColumnDef,
47    },
48    /// TODO: implement `DROP CONSTRAINT <name>`
49    DropConstraint {
50        name: Ident,
51    },
52    /// `DROP [ COLUMN ] [ IF EXISTS ] <column_name> [ CASCADE ]`
53    DropColumn {
54        column_name: Ident,
55        if_exists: bool,
56        cascade: bool,
57    },
58    /// `RENAME [ COLUMN ] <old_column_name> TO <new_column_name>`
59    RenameColumn {
60        old_column_name: Ident,
61        new_column_name: Ident,
62    },
63    /// `RENAME TO <table_name>`
64    RenameTable {
65        table_name: ObjectName,
66    },
67    // CHANGE [ COLUMN ] <old_name> <new_name> <data_type> [ <options> ]
68    ChangeColumn {
69        old_name: Ident,
70        new_name: Ident,
71        data_type: DataType,
72        options: Vec<ColumnOption>,
73    },
74    /// `RENAME CONSTRAINT <old_constraint_name> TO <new_constraint_name>`
75    ///
76    /// Note: this is a PostgreSQL-specific operation.
77    RenameConstraint {
78        old_name: Ident,
79        new_name: Ident,
80    },
81    /// `ALTER [ COLUMN ]`
82    AlterColumn {
83        column_name: Ident,
84        op: AlterColumnOperation,
85    },
86    /// `OWNER TO <owner_name>`
87    ChangeOwner {
88        new_owner_name: Ident,
89    },
90    /// `SET SCHEMA <schema_name>`
91    SetSchema {
92        new_schema_name: ObjectName,
93    },
94    /// `SET PARALLELISM TO <parallelism> [ DEFERRED ]`
95    SetParallelism {
96        parallelism: SetVariableValue,
97        deferred: bool,
98    },
99    /// `SET CONFIG (key = value, ...)`
100    SetConfig {
101        entries: Vec<SqlOption>,
102    },
103    /// `RESET CONFIG (key, ...)`
104    ResetConfig {
105        keys: Vec<ObjectName>,
106    },
107    RefreshSchema,
108    /// `SET SOURCE_RATE_LIMIT TO <rate_limit>`
109    SetSourceRateLimit {
110        rate_limit: i32,
111    },
112    /// SET BACKFILL_RATE_LIMIT TO <rate_limit>
113    SetBackfillRateLimit {
114        rate_limit: i32,
115    },
116    /// `SET DML_RATE_LIMIT TO <rate_limit>`
117    SetDmlRateLimit {
118        rate_limit: i32,
119    },
120    /// `SWAP WITH <table_name>`
121    SwapRenameTable {
122        target_table: ObjectName,
123    },
124    /// `DROP CONNECTOR`
125    DropConnector,
126
127    /// `ALTER CONNECTOR WITH (<connector_props>)`
128    AlterConnectorProps {
129        alter_props: Vec<SqlOption>,
130    },
131}
132
133#[derive(Debug, Clone, PartialEq, Eq, Hash)]
134pub enum AlterIndexOperation {
135    RenameIndex {
136        index_name: ObjectName,
137    },
138    /// `SET PARALLELISM TO <parallelism> [ DEFERRED ]`
139    SetParallelism {
140        parallelism: SetVariableValue,
141        deferred: bool,
142    },
143    /// `SET CONFIG (key = value, ...)`
144    SetConfig {
145        entries: Vec<SqlOption>,
146    },
147    /// `RESET CONFIG (key, ...)`
148    ResetConfig {
149        keys: Vec<ObjectName>,
150    },
151}
152
153#[derive(Debug, Clone, PartialEq, Eq, Hash)]
154pub enum AlterViewOperation {
155    RenameView {
156        view_name: ObjectName,
157    },
158    ChangeOwner {
159        new_owner_name: Ident,
160    },
161    SetSchema {
162        new_schema_name: ObjectName,
163    },
164    /// `SET PARALLELISM TO <parallelism> [ DEFERRED ]`
165    SetParallelism {
166        parallelism: SetVariableValue,
167        deferred: bool,
168    },
169    /// `SET RESOURCE_GROUP TO 'RESOURCE GROUP' [ DEFERRED ]`
170    /// `RESET RESOURCE_GROUP [ DEFERRED ]`
171    SetResourceGroup {
172        resource_group: Option<SetVariableValue>,
173        deferred: bool,
174    },
175    /// `SET BACKFILL_RATE_LIMIT TO <rate_limit>`
176    SetBackfillRateLimit {
177        rate_limit: i32,
178    },
179    /// `SWAP WITH <view_name>`
180    SwapRenameView {
181        target_view: ObjectName,
182    },
183    SetStreamingEnableUnalignedJoin {
184        enable: bool,
185    },
186    /// `AS <query>`
187    AsQuery {
188        query: Box<Query>,
189    },
190    /// `SET CONFIG ( streaming.some_config_key = <some_config_value>, .. )`
191    SetConfig {
192        entries: Vec<SqlOption>,
193    },
194    /// `RESET CONFIG ( streaming.some_config_key, .. )`
195    ResetConfig {
196        keys: Vec<ObjectName>,
197    },
198}
199
200#[derive(Debug, Clone, PartialEq, Eq, Hash)]
201pub enum AlterSinkOperation {
202    RenameSink {
203        sink_name: ObjectName,
204    },
205    ChangeOwner {
206        new_owner_name: Ident,
207    },
208    SetSchema {
209        new_schema_name: ObjectName,
210    },
211    /// `SET PARALLELISM TO <parallelism> [ DEFERRED ]`
212    SetParallelism {
213        parallelism: SetVariableValue,
214        deferred: bool,
215    },
216    /// `SET CONFIG (key = value, ...)`
217    SetConfig {
218        entries: Vec<SqlOption>,
219    },
220    /// `RESET CONFIG (key, ...)`
221    ResetConfig {
222        keys: Vec<ObjectName>,
223    },
224    /// `SWAP WITH <sink_name>`
225    SwapRenameSink {
226        target_sink: ObjectName,
227    },
228    SetSinkRateLimit {
229        rate_limit: i32,
230    },
231    SetBackfillRateLimit {
232        rate_limit: i32,
233    },
234    AlterConnectorProps {
235        alter_props: Vec<SqlOption>,
236    },
237    SetStreamingEnableUnalignedJoin {
238        enable: bool,
239    },
240}
241
242#[derive(Debug, Clone, PartialEq, Eq, Hash)]
243pub enum AlterSubscriptionOperation {
244    RenameSubscription { subscription_name: ObjectName },
245    ChangeOwner { new_owner_name: Ident },
246    SetSchema { new_schema_name: ObjectName },
247    SwapRenameSubscription { target_subscription: ObjectName },
248}
249
250#[derive(Debug, Clone, PartialEq, Eq, Hash)]
251pub enum AlterSourceOperation {
252    RenameSource {
253        source_name: ObjectName,
254    },
255    AddColumn {
256        column_def: ColumnDef,
257    },
258    ChangeOwner {
259        new_owner_name: Ident,
260    },
261    SetSchema {
262        new_schema_name: ObjectName,
263    },
264    FormatEncode {
265        format_encode: FormatEncodeOptions,
266    },
267    RefreshSchema,
268    SetSourceRateLimit {
269        rate_limit: i32,
270    },
271    SwapRenameSource {
272        target_source: ObjectName,
273    },
274    /// `SET PARALLELISM TO <parallelism> [ DEFERRED ]`
275    SetParallelism {
276        parallelism: SetVariableValue,
277        deferred: bool,
278    },
279    /// `SET CONFIG (key = value, ...)`
280    SetConfig {
281        entries: Vec<SqlOption>,
282    },
283    /// `RESET CONFIG (key, ...)`
284    ResetConfig {
285        keys: Vec<ObjectName>,
286    },
287    /// `RESET` - Reset CDC source offset to latest
288    ResetSource,
289    AlterConnectorProps {
290        alter_props: Vec<SqlOption>,
291    },
292}
293
294#[derive(Debug, Clone, PartialEq, Eq, Hash)]
295pub enum AlterFunctionOperation {
296    SetSchema { new_schema_name: ObjectName },
297    ChangeOwner { new_owner_name: Ident },
298}
299
300#[derive(Debug, Clone, PartialEq, Eq, Hash)]
301pub enum AlterConnectionOperation {
302    SetSchema { new_schema_name: ObjectName },
303    ChangeOwner { new_owner_name: Ident },
304    AlterConnectorProps { alter_props: Vec<SqlOption> },
305}
306
307#[derive(Debug, Clone, PartialEq, Eq, Hash)]
308pub enum AlterSecretOperation {
309    ChangeCredential { new_credential: Value },
310}
311
312#[derive(Debug, Clone, PartialEq, Eq, Hash)]
313pub enum AlterFragmentOperation {
314    AlterBackfillRateLimit { rate_limit: i32 },
315    SetParallelism { parallelism: SetVariableValue },
316}
317
318impl fmt::Display for AlterDatabaseOperation {
319    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
320        match self {
321            AlterDatabaseOperation::ChangeOwner { new_owner_name } => {
322                write!(f, "OWNER TO {}", new_owner_name)
323            }
324            AlterDatabaseOperation::RenameDatabase { database_name } => {
325                write!(f, "RENAME TO {}", database_name)
326            }
327            AlterDatabaseOperation::SetParam(ConfigParam { param, value }) => {
328                write!(f, "SET {} TO {}", param, value)
329            }
330        }
331    }
332}
333
334impl fmt::Display for AlterSchemaOperation {
335    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
336        match self {
337            AlterSchemaOperation::ChangeOwner { new_owner_name } => {
338                write!(f, "OWNER TO {}", new_owner_name)
339            }
340            AlterSchemaOperation::RenameSchema { schema_name } => {
341                write!(f, "RENAME TO {}", schema_name)
342            }
343            AlterSchemaOperation::SwapRenameSchema { target_schema } => {
344                write!(f, "SWAP WITH {}", target_schema)
345            }
346        }
347    }
348}
349
350impl fmt::Display for AlterTableOperation {
351    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
352        match self {
353            AlterTableOperation::AddConstraint(c) => write!(f, "ADD {}", c),
354            AlterTableOperation::AddColumn { column_def } => {
355                write!(f, "ADD COLUMN {}", column_def)
356            }
357            AlterTableOperation::AlterColumn { column_name, op } => {
358                write!(f, "ALTER COLUMN {} {}", column_name, op)
359            }
360            AlterTableOperation::DropConstraint { name } => write!(f, "DROP CONSTRAINT {}", name),
361            AlterTableOperation::DropColumn {
362                column_name,
363                if_exists,
364                cascade,
365            } => write!(
366                f,
367                "DROP COLUMN {}{}{}",
368                if *if_exists { "IF EXISTS " } else { "" },
369                column_name,
370                if *cascade { " CASCADE" } else { "" }
371            ),
372            AlterTableOperation::RenameColumn {
373                old_column_name,
374                new_column_name,
375            } => write!(
376                f,
377                "RENAME COLUMN {} TO {}",
378                old_column_name, new_column_name
379            ),
380            AlterTableOperation::RenameTable { table_name } => {
381                write!(f, "RENAME TO {}", table_name)
382            }
383            AlterTableOperation::ChangeColumn {
384                old_name,
385                new_name,
386                data_type,
387                options,
388            } => {
389                write!(f, "CHANGE COLUMN {} {} {}", old_name, new_name, data_type)?;
390                if options.is_empty() {
391                    Ok(())
392                } else {
393                    write!(f, " {}", display_separated(options, " "))
394                }
395            }
396            AlterTableOperation::RenameConstraint { old_name, new_name } => {
397                write!(f, "RENAME CONSTRAINT {} TO {}", old_name, new_name)
398            }
399            AlterTableOperation::ChangeOwner { new_owner_name } => {
400                write!(f, "OWNER TO {}", new_owner_name)
401            }
402            AlterTableOperation::SetSchema { new_schema_name } => {
403                write!(f, "SET SCHEMA {}", new_schema_name)
404            }
405            AlterTableOperation::SetParallelism {
406                parallelism,
407                deferred,
408            } => {
409                write!(
410                    f,
411                    "SET PARALLELISM TO {}{}",
412                    parallelism,
413                    if *deferred { " DEFERRED" } else { "" }
414                )
415            }
416            AlterTableOperation::SetConfig { entries } => {
417                write!(f, "SET CONFIG ({})", display_comma_separated(entries))
418            }
419            AlterTableOperation::ResetConfig { keys } => {
420                write!(f, "RESET CONFIG ({})", display_comma_separated(keys))
421            }
422            AlterTableOperation::RefreshSchema => {
423                write!(f, "REFRESH SCHEMA")
424            }
425            AlterTableOperation::SetSourceRateLimit { rate_limit } => {
426                write!(f, "SET SOURCE_RATE_LIMIT TO {}", rate_limit)
427            }
428            AlterTableOperation::SetBackfillRateLimit { rate_limit } => {
429                write!(f, "SET BACKFILL_RATE_LIMIT TO {}", rate_limit)
430            }
431            AlterTableOperation::SetDmlRateLimit { rate_limit } => {
432                write!(f, "SET DML_RATE_LIMIT TO {}", rate_limit)
433            }
434            AlterTableOperation::SwapRenameTable { target_table } => {
435                write!(f, "SWAP WITH {}", target_table)
436            }
437            AlterTableOperation::DropConnector => {
438                write!(f, "DROP CONNECTOR")
439            }
440            AlterTableOperation::AlterConnectorProps { alter_props } => {
441                write!(
442                    f,
443                    "CONNECTOR WITH ({})",
444                    display_comma_separated(alter_props)
445                )
446            }
447        }
448    }
449}
450
451impl fmt::Display for AlterIndexOperation {
452    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
453        match self {
454            AlterIndexOperation::RenameIndex { index_name } => {
455                write!(f, "RENAME TO {index_name}")
456            }
457            AlterIndexOperation::SetParallelism {
458                parallelism,
459                deferred,
460            } => {
461                write!(
462                    f,
463                    "SET PARALLELISM TO {}{}",
464                    parallelism,
465                    if *deferred { " DEFERRED" } else { "" }
466                )
467            }
468            AlterIndexOperation::SetConfig { entries } => {
469                write!(f, "SET CONFIG ({})", display_comma_separated(entries))
470            }
471            AlterIndexOperation::ResetConfig { keys } => {
472                write!(f, "RESET CONFIG ({})", display_comma_separated(keys))
473            }
474        }
475    }
476}
477
478impl fmt::Display for AlterViewOperation {
479    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
480        match self {
481            AlterViewOperation::RenameView { view_name } => {
482                write!(f, "RENAME TO {view_name}")
483            }
484            AlterViewOperation::ChangeOwner { new_owner_name } => {
485                write!(f, "OWNER TO {}", new_owner_name)
486            }
487            AlterViewOperation::SetSchema { new_schema_name } => {
488                write!(f, "SET SCHEMA {}", new_schema_name)
489            }
490            AlterViewOperation::SetParallelism {
491                parallelism,
492                deferred,
493            } => {
494                write!(
495                    f,
496                    "SET PARALLELISM TO {}{}",
497                    parallelism,
498                    if *deferred { " DEFERRED" } else { "" }
499                )
500            }
501            AlterViewOperation::SetBackfillRateLimit { rate_limit } => {
502                write!(f, "SET BACKFILL_RATE_LIMIT TO {}", rate_limit)
503            }
504            AlterViewOperation::SwapRenameView { target_view } => {
505                write!(f, "SWAP WITH {}", target_view)
506            }
507            AlterViewOperation::SetResourceGroup {
508                resource_group,
509                deferred,
510            } => {
511                let deferred = if *deferred { " DEFERRED" } else { "" };
512
513                if let Some(resource_group) = resource_group {
514                    write!(f, "SET RESOURCE_GROUP TO {} {}", resource_group, deferred)
515                } else {
516                    write!(f, "RESET RESOURCE_GROUP {}", deferred)
517                }
518            }
519            AlterViewOperation::SetStreamingEnableUnalignedJoin { enable } => {
520                write!(f, "SET STREAMING_ENABLE_UNALIGNED_JOIN TO {}", enable)
521            }
522            AlterViewOperation::AsQuery { query } => {
523                write!(f, "AS {}", query)
524            }
525            AlterViewOperation::SetConfig { entries } => {
526                write!(f, "SET CONFIG ({})", display_comma_separated(entries))
527            }
528            AlterViewOperation::ResetConfig { keys } => {
529                write!(f, "RESET CONFIG ({})", display_comma_separated(keys))
530            }
531        }
532    }
533}
534
535impl fmt::Display for AlterSinkOperation {
536    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
537        match self {
538            AlterSinkOperation::RenameSink { sink_name } => {
539                write!(f, "RENAME TO {sink_name}")
540            }
541            AlterSinkOperation::ChangeOwner { new_owner_name } => {
542                write!(f, "OWNER TO {}", new_owner_name)
543            }
544            AlterSinkOperation::SetSchema { new_schema_name } => {
545                write!(f, "SET SCHEMA {}", new_schema_name)
546            }
547            AlterSinkOperation::SetParallelism {
548                parallelism,
549                deferred,
550            } => {
551                write!(
552                    f,
553                    "SET PARALLELISM TO {}{}",
554                    parallelism,
555                    if *deferred { " DEFERRED" } else { "" }
556                )
557            }
558            AlterSinkOperation::SetConfig { entries } => {
559                write!(f, "SET CONFIG ({})", display_comma_separated(entries))
560            }
561            AlterSinkOperation::ResetConfig { keys } => {
562                write!(f, "RESET CONFIG ({})", display_comma_separated(keys))
563            }
564            AlterSinkOperation::SwapRenameSink { target_sink } => {
565                write!(f, "SWAP WITH {}", target_sink)
566            }
567            AlterSinkOperation::SetSinkRateLimit { rate_limit } => {
568                write!(f, "SET SINK_RATE_LIMIT TO {}", rate_limit)
569            }
570            AlterSinkOperation::SetBackfillRateLimit { rate_limit } => {
571                write!(f, "SET BACKFILL_RATE_LIMIT TO {}", rate_limit)
572            }
573            AlterSinkOperation::AlterConnectorProps {
574                alter_props: changed_props,
575            } => {
576                write!(
577                    f,
578                    "CONNECTOR WITH ({})",
579                    display_comma_separated(changed_props)
580                )
581            }
582            AlterSinkOperation::SetStreamingEnableUnalignedJoin { enable } => {
583                write!(f, "SET STREAMING_ENABLE_UNALIGNED_JOIN TO {}", enable)
584            }
585        }
586    }
587}
588
589impl fmt::Display for AlterSubscriptionOperation {
590    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
591        match self {
592            AlterSubscriptionOperation::RenameSubscription { subscription_name } => {
593                write!(f, "RENAME TO {subscription_name}")
594            }
595            AlterSubscriptionOperation::ChangeOwner { new_owner_name } => {
596                write!(f, "OWNER TO {}", new_owner_name)
597            }
598            AlterSubscriptionOperation::SetSchema { new_schema_name } => {
599                write!(f, "SET SCHEMA {}", new_schema_name)
600            }
601            AlterSubscriptionOperation::SwapRenameSubscription {
602                target_subscription,
603            } => {
604                write!(f, "SWAP WITH {}", target_subscription)
605            }
606        }
607    }
608}
609
610impl fmt::Display for AlterSourceOperation {
611    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
612        match self {
613            AlterSourceOperation::RenameSource { source_name } => {
614                write!(f, "RENAME TO {source_name}")
615            }
616            AlterSourceOperation::AddColumn { column_def } => {
617                write!(f, "ADD COLUMN {column_def}")
618            }
619            AlterSourceOperation::ChangeOwner { new_owner_name } => {
620                write!(f, "OWNER TO {}", new_owner_name)
621            }
622            AlterSourceOperation::SetSchema { new_schema_name } => {
623                write!(f, "SET SCHEMA {}", new_schema_name)
624            }
625            AlterSourceOperation::FormatEncode { format_encode } => {
626                write!(f, "{format_encode}")
627            }
628            AlterSourceOperation::RefreshSchema => {
629                write!(f, "REFRESH SCHEMA")
630            }
631            AlterSourceOperation::SetSourceRateLimit { rate_limit } => {
632                write!(f, "SET SOURCE_RATE_LIMIT TO {}", rate_limit)
633            }
634            AlterSourceOperation::SwapRenameSource { target_source } => {
635                write!(f, "SWAP WITH {}", target_source)
636            }
637            AlterSourceOperation::SetParallelism {
638                parallelism,
639                deferred,
640            } => {
641                write!(
642                    f,
643                    "SET PARALLELISM TO {}{}",
644                    parallelism,
645                    if *deferred { " DEFERRED" } else { "" }
646                )
647            }
648            AlterSourceOperation::SetConfig { entries } => {
649                write!(f, "SET CONFIG ({})", display_comma_separated(entries))
650            }
651            AlterSourceOperation::ResetConfig { keys } => {
652                write!(f, "RESET CONFIG ({})", display_comma_separated(keys))
653            }
654            AlterSourceOperation::ResetSource => {
655                write!(f, "RESET")
656            }
657            AlterSourceOperation::AlterConnectorProps { alter_props } => {
658                write!(
659                    f,
660                    "CONNECTOR WITH ({})",
661                    display_comma_separated(alter_props)
662                )
663            }
664        }
665    }
666}
667
668impl fmt::Display for AlterFunctionOperation {
669    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
670        match self {
671            AlterFunctionOperation::SetSchema { new_schema_name } => {
672                write!(f, "SET SCHEMA {new_schema_name}")
673            }
674            AlterFunctionOperation::ChangeOwner { new_owner_name } => {
675                write!(f, "OWNER TO {new_owner_name}")
676            }
677        }
678    }
679}
680
681impl fmt::Display for AlterConnectionOperation {
682    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
683        match self {
684            AlterConnectionOperation::SetSchema { new_schema_name } => {
685                write!(f, "SET SCHEMA {new_schema_name}")
686            }
687            AlterConnectionOperation::ChangeOwner { new_owner_name } => {
688                write!(f, "OWNER TO {new_owner_name}")
689            }
690            AlterConnectionOperation::AlterConnectorProps { alter_props } => {
691                write!(
692                    f,
693                    "CONNECTOR WITH ({})",
694                    display_comma_separated(alter_props)
695                )
696            }
697        }
698    }
699}
700
701impl fmt::Display for AlterSecretOperation {
702    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
703        match self {
704            AlterSecretOperation::ChangeCredential { new_credential } => {
705                write!(f, "AS {new_credential}")
706            }
707        }
708    }
709}
710
711/// An `ALTER COLUMN` (`Statement::AlterTable`) operation
712#[derive(Debug, Clone, PartialEq, Eq, Hash)]
713pub enum AlterColumnOperation {
714    /// `SET NOT NULL`
715    SetNotNull,
716    /// `DROP NOT NULL`
717    DropNotNull,
718    /// `SET DEFAULT <expr>`
719    SetDefault { value: Expr },
720    /// `DROP DEFAULT`
721    DropDefault,
722    /// `[SET DATA] TYPE <data_type> [USING <expr>]`
723    SetDataType {
724        data_type: DataType,
725        /// PostgreSQL specific
726        using: Option<Expr>,
727    },
728}
729
730impl fmt::Display for AlterColumnOperation {
731    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
732        match self {
733            AlterColumnOperation::SetNotNull => write!(f, "SET NOT NULL",),
734            AlterColumnOperation::DropNotNull => write!(f, "DROP NOT NULL",),
735            AlterColumnOperation::SetDefault { value } => {
736                write!(f, "SET DEFAULT {}", value)
737            }
738            AlterColumnOperation::DropDefault => {
739                write!(f, "DROP DEFAULT")
740            }
741            AlterColumnOperation::SetDataType { data_type, using } => {
742                if let Some(expr) = using {
743                    write!(f, "SET DATA TYPE {} USING {}", data_type, expr)
744                } else {
745                    write!(f, "SET DATA TYPE {}", data_type)
746                }
747            }
748        }
749    }
750}
751
752impl fmt::Display for AlterFragmentOperation {
753    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
754        match self {
755            AlterFragmentOperation::AlterBackfillRateLimit { rate_limit } => {
756                write!(f, "SET BACKFILL_RATE_LIMIT TO {}", rate_limit)
757            }
758            AlterFragmentOperation::SetParallelism { parallelism } => {
759                write!(f, "SET PARALLELISM TO {}", parallelism)
760            }
761        }
762    }
763}
764
765/// The watermark on source.
766/// `WATERMARK FOR <column> AS (<expr>)`
767#[derive(Debug, Clone, PartialEq, Eq, Hash)]
768pub struct SourceWatermark {
769    pub column: Ident,
770    pub expr: Expr,
771    /// Whether `WITH TTL` is specified.
772    pub with_ttl: bool,
773}
774
775impl fmt::Display for SourceWatermark {
776    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
777        write!(f, "WATERMARK FOR {} AS {}", self.column, self.expr,)?;
778        if self.with_ttl {
779            write!(f, " WITH TTL")?;
780        }
781        Ok(())
782    }
783}
784
785/// A table-level constraint, specified in a `CREATE TABLE` or an
786/// `ALTER TABLE ADD <constraint>` statement.
787#[derive(Debug, Clone, PartialEq, Eq, Hash)]
788pub enum TableConstraint {
789    /// `[ CONSTRAINT <name> ] { PRIMARY KEY | UNIQUE } (<columns>)`
790    Unique {
791        name: Option<Ident>,
792        columns: Vec<Ident>,
793        /// Whether this is a `PRIMARY KEY` or just a `UNIQUE` constraint
794        is_primary: bool,
795    },
796    /// A referential integrity constraint (`[ CONSTRAINT <name> ] FOREIGN KEY (<columns>)
797    /// REFERENCES <foreign_table> (<referred_columns>)
798    /// { [ON DELETE <referential_action>] [ON UPDATE <referential_action>] |
799    ///   [ON UPDATE <referential_action>] [ON DELETE <referential_action>]
800    /// }`).
801    ForeignKey {
802        name: Option<Ident>,
803        columns: Vec<Ident>,
804        foreign_table: ObjectName,
805        referred_columns: Vec<Ident>,
806        on_delete: Option<ReferentialAction>,
807        on_update: Option<ReferentialAction>,
808    },
809    /// `[ CONSTRAINT <name> ] CHECK (<expr>)`
810    Check {
811        name: Option<Ident>,
812        expr: Box<Expr>,
813    },
814}
815
816impl fmt::Display for TableConstraint {
817    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
818        match self {
819            TableConstraint::Unique {
820                name,
821                columns,
822                is_primary,
823            } => write!(
824                f,
825                "{}{} ({})",
826                display_constraint_name(name),
827                if *is_primary { "PRIMARY KEY" } else { "UNIQUE" },
828                display_comma_separated(columns)
829            ),
830            TableConstraint::ForeignKey {
831                name,
832                columns,
833                foreign_table,
834                referred_columns,
835                on_delete,
836                on_update,
837            } => {
838                write!(
839                    f,
840                    "{}FOREIGN KEY ({}) REFERENCES {}({})",
841                    display_constraint_name(name),
842                    display_comma_separated(columns),
843                    foreign_table,
844                    display_comma_separated(referred_columns),
845                )?;
846                if let Some(action) = on_delete {
847                    write!(f, " ON DELETE {}", action)?;
848                }
849                if let Some(action) = on_update {
850                    write!(f, " ON UPDATE {}", action)?;
851                }
852                Ok(())
853            }
854            TableConstraint::Check { name, expr } => {
855                write!(f, "{}CHECK ({})", display_constraint_name(name), expr)
856            }
857        }
858    }
859}
860
861/// SQL column definition
862#[derive(Debug, Clone, PartialEq, Eq, Hash)]
863pub struct ColumnDef {
864    pub name: Ident,
865    pub data_type: Option<DataType>,
866    pub collation: Option<ObjectName>,
867    pub options: Vec<ColumnOptionDef>,
868}
869
870impl ColumnDef {
871    pub fn new(
872        name: Ident,
873        data_type: DataType,
874        collation: Option<ObjectName>,
875        options: Vec<ColumnOptionDef>,
876    ) -> Self {
877        ColumnDef {
878            name,
879            data_type: Some(data_type),
880            collation,
881            options,
882        }
883    }
884
885    pub fn is_generated(&self) -> bool {
886        self.options
887            .iter()
888            .any(|option| matches!(option.option, ColumnOption::GeneratedColumns(_)))
889    }
890}
891
892impl fmt::Display for ColumnDef {
893    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
894        write!(
895            f,
896            "{} {}",
897            self.name,
898            if let Some(data_type) = &self.data_type {
899                data_type.to_string()
900            } else {
901                "None".to_owned()
902            }
903        )?;
904        for option in &self.options {
905            write!(f, " {}", option)?;
906        }
907        Ok(())
908    }
909}
910
911/// An optionally-named `ColumnOption`: `[ CONSTRAINT <name> ] <column-option>`.
912///
913/// Note that implementations are substantially more permissive than the ANSI
914/// specification on what order column options can be presented in, and whether
915/// they are allowed to be named. The specification distinguishes between
916/// constraints (NOT NULL, UNIQUE, PRIMARY KEY, and CHECK), which can be named
917/// and can appear in any order, and other options (DEFAULT, GENERATED), which
918/// cannot be named and must appear in a fixed order. PostgreSQL, however,
919/// allows preceding any option with `CONSTRAINT <name>`, even those that are
920/// not really constraints, like NULL and DEFAULT. MSSQL is less permissive,
921/// allowing DEFAULT, UNIQUE, PRIMARY KEY and CHECK to be named, but not NULL or
922/// NOT NULL constraints (the last of which is in violation of the spec).
923///
924/// For maximum flexibility, we don't distinguish between constraint and
925/// non-constraint options, lumping them all together under the umbrella of
926/// "column options," and we allow any column option to be named.
927#[derive(Debug, Clone, PartialEq, Eq, Hash)]
928pub struct ColumnOptionDef {
929    pub name: Option<Ident>,
930    pub option: ColumnOption,
931}
932
933impl fmt::Display for ColumnOptionDef {
934    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
935        write!(f, "{}{}", display_constraint_name(&self.name), self.option)
936    }
937}
938
939/// `ColumnOption`s are modifiers that follow a column definition in a `CREATE
940/// TABLE` statement.
941#[derive(Debug, Clone, PartialEq, Eq, Hash)]
942pub enum ColumnOption {
943    /// `NULL`
944    Null,
945    /// `NOT NULL`
946    NotNull,
947    /// `DEFAULT <restricted-expr>`
948    DefaultValue(Expr),
949    /// Default value from previous bound `DefaultColumnDesc`. Used internally
950    /// for schema change and should not be specified by users.
951    DefaultValueInternal {
952        /// Protobuf encoded `DefaultColumnDesc`.
953        persisted: Box<[u8]>,
954        /// Optional AST for unparsing. If `None`, the default value will be
955        /// shown as `DEFAULT INTERNAL` which is for demonstrating and should
956        /// not be specified by users.
957        expr: Option<Expr>,
958    },
959    /// `{ PRIMARY KEY | UNIQUE }`
960    Unique { is_primary: bool },
961    /// A referential integrity constraint (`[FOREIGN KEY REFERENCES
962    /// <foreign_table> (<referred_columns>)
963    /// { [ON DELETE <referential_action>] [ON UPDATE <referential_action>] |
964    ///   [ON UPDATE <referential_action>] [ON DELETE <referential_action>]
965    /// }`).
966    ForeignKey {
967        foreign_table: ObjectName,
968        referred_columns: Vec<Ident>,
969        on_delete: Option<ReferentialAction>,
970        on_update: Option<ReferentialAction>,
971    },
972    /// `CHECK (<expr>)`
973    Check(Expr),
974    /// Dialect-specific options, such as:
975    /// - MySQL's `AUTO_INCREMENT` or SQLite's `AUTOINCREMENT`
976    /// - ...
977    DialectSpecific(Vec<Token>),
978    /// AS ( <generation_expr> )`
979    GeneratedColumns(Expr),
980}
981
982impl fmt::Display for ColumnOption {
983    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
984        use ColumnOption::*;
985        match self {
986            Null => write!(f, "NULL"),
987            NotNull => write!(f, "NOT NULL"),
988            DefaultValue(expr) => write!(f, "DEFAULT {}", expr),
989            DefaultValueInternal { persisted: _, expr } => {
990                if let Some(expr) = expr {
991                    write!(f, "DEFAULT {}", expr)
992                } else {
993                    write!(f, "DEFAULT INTERNAL")
994                }
995            }
996            Unique { is_primary } => {
997                write!(f, "{}", if *is_primary { "PRIMARY KEY" } else { "UNIQUE" })
998            }
999            ForeignKey {
1000                foreign_table,
1001                referred_columns,
1002                on_delete,
1003                on_update,
1004            } => {
1005                write!(f, "REFERENCES {}", foreign_table)?;
1006                if !referred_columns.is_empty() {
1007                    write!(f, " ({})", display_comma_separated(referred_columns))?;
1008                }
1009                if let Some(action) = on_delete {
1010                    write!(f, " ON DELETE {}", action)?;
1011                }
1012                if let Some(action) = on_update {
1013                    write!(f, " ON UPDATE {}", action)?;
1014                }
1015                Ok(())
1016            }
1017            Check(expr) => write!(f, "CHECK ({})", expr),
1018            DialectSpecific(val) => write!(f, "{}", display_separated(val, " ")),
1019            GeneratedColumns(expr) => write!(f, "AS {}", expr),
1020        }
1021    }
1022}
1023
1024fn display_constraint_name(name: &'_ Option<Ident>) -> impl fmt::Display + '_ {
1025    struct ConstraintName<'a>(&'a Option<Ident>);
1026    impl fmt::Display for ConstraintName<'_> {
1027        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1028            if let Some(name) = self.0 {
1029                write!(f, "CONSTRAINT {} ", name)?;
1030            }
1031            Ok(())
1032        }
1033    }
1034    ConstraintName(name)
1035}
1036
1037/// `<referential_action> =
1038/// { RESTRICT | CASCADE | SET NULL | NO ACTION | SET DEFAULT }`
1039///
1040/// Used in foreign key constraints in `ON UPDATE` and `ON DELETE` options.
1041#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1042pub enum ReferentialAction {
1043    Restrict,
1044    Cascade,
1045    SetNull,
1046    NoAction,
1047    SetDefault,
1048}
1049
1050impl fmt::Display for ReferentialAction {
1051    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1052        f.write_str(match self {
1053            ReferentialAction::Restrict => "RESTRICT",
1054            ReferentialAction::Cascade => "CASCADE",
1055            ReferentialAction::SetNull => "SET NULL",
1056            ReferentialAction::NoAction => "NO ACTION",
1057            ReferentialAction::SetDefault => "SET DEFAULT",
1058        })
1059    }
1060}
1061
1062/// secure secret definition for webhook source
1063#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1064pub struct WebhookSourceInfo {
1065    pub secret_ref: Option<SecretRefValue>,
1066    pub signature_expr: Option<Expr>,
1067    pub wait_for_persistence: bool,
1068    pub is_batched: bool,
1069}