risingwave_sqlparser/ast/
ddl.rs

1// Licensed under the Apache License, Version 2.0 (the "License");
2// you may not use this file except in compliance with the License.
3// You may obtain a copy of the License at
4//
5//     http://www.apache.org/licenses/LICENSE-2.0
6//
7// Unless required by applicable law or agreed to in writing, software
8// distributed under the License is distributed on an "AS IS" BASIS,
9// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10// See the License for the specific language governing permissions and
11// limitations under the License.
12
13//! AST types specific to CREATE/ALTER variants of [`crate::ast::Statement`]
14//! (commonly referred to as Data Definition Language, or DDL)
15
16use std::fmt;
17
18use super::{ConfigParam, FormatEncodeOptions, SqlOption};
19use crate::ast::{
20    DataType, Expr, Ident, ObjectName, Query, SecretRefValue, SetVariableValue, Value,
21    display_comma_separated, display_separated,
22};
23use crate::tokenizer::Token;
24
25#[derive(Debug, Clone, PartialEq, Eq, Hash)]
26pub enum AlterDatabaseOperation {
27    ChangeOwner { new_owner_name: Ident },
28    RenameDatabase { database_name: ObjectName },
29    SetParam(ConfigParam),
30}
31
32#[derive(Debug, Clone, PartialEq, Eq, Hash)]
33pub enum AlterSchemaOperation {
34    ChangeOwner { new_owner_name: Ident },
35    RenameSchema { schema_name: ObjectName },
36    SwapRenameSchema { target_schema: ObjectName },
37}
38
39/// An `ALTER TABLE` (`Statement::AlterTable`) operation
40#[derive(Debug, Clone, PartialEq, Eq, Hash)]
41pub enum AlterTableOperation {
42    /// `ADD <table_constraint>`
43    AddConstraint(TableConstraint),
44    /// `ADD [ COLUMN ] <column_def>`
45    AddColumn {
46        column_def: ColumnDef,
47    },
48    /// TODO: implement `DROP CONSTRAINT <name>`
49    DropConstraint {
50        name: Ident,
51    },
52    /// `DROP [ COLUMN ] [ IF EXISTS ] <column_name> [ CASCADE ]`
53    DropColumn {
54        column_name: Ident,
55        if_exists: bool,
56        cascade: bool,
57    },
58    /// `RENAME [ COLUMN ] <old_column_name> TO <new_column_name>`
59    RenameColumn {
60        old_column_name: Ident,
61        new_column_name: Ident,
62    },
63    /// `RENAME TO <table_name>`
64    RenameTable {
65        table_name: ObjectName,
66    },
67    // CHANGE [ COLUMN ] <old_name> <new_name> <data_type> [ <options> ]
68    ChangeColumn {
69        old_name: Ident,
70        new_name: Ident,
71        data_type: DataType,
72        options: Vec<ColumnOption>,
73    },
74    /// `RENAME CONSTRAINT <old_constraint_name> TO <new_constraint_name>`
75    ///
76    /// Note: this is a PostgreSQL-specific operation.
77    RenameConstraint {
78        old_name: Ident,
79        new_name: Ident,
80    },
81    /// `ALTER [ COLUMN ]`
82    AlterColumn {
83        column_name: Ident,
84        op: AlterColumnOperation,
85    },
86    /// `OWNER TO <owner_name>`
87    ChangeOwner {
88        new_owner_name: Ident,
89    },
90    /// `SET SCHEMA <schema_name>`
91    SetSchema {
92        new_schema_name: ObjectName,
93    },
94    /// `SET PARALLELISM TO <parallelism> [ DEFERRED ]`
95    SetParallelism {
96        parallelism: SetVariableValue,
97        deferred: bool,
98    },
99    /// `SET BACKFILL_PARALLELISM TO <parallelism> [ DEFERRED ]`
100    SetBackfillParallelism {
101        parallelism: SetVariableValue,
102        deferred: bool,
103    },
104    /// `SET CONFIG (key = value, ...)`
105    SetConfig {
106        entries: Vec<SqlOption>,
107    },
108    /// `RESET CONFIG (key, ...)`
109    ResetConfig {
110        keys: Vec<ObjectName>,
111    },
112    RefreshSchema,
113    /// `SET SOURCE_RATE_LIMIT TO <rate_limit>`
114    SetSourceRateLimit {
115        rate_limit: i32,
116    },
117    /// SET BACKFILL_RATE_LIMIT TO <rate_limit>
118    SetBackfillRateLimit {
119        rate_limit: i32,
120    },
121    /// `SET DML_RATE_LIMIT TO <rate_limit>`
122    SetDmlRateLimit {
123        rate_limit: i32,
124    },
125    /// `SWAP WITH <table_name>`
126    SwapRenameTable {
127        target_table: ObjectName,
128    },
129    /// `DROP CONNECTOR`
130    DropConnector,
131
132    /// `ALTER CONNECTOR WITH (<connector_props>)`
133    AlterConnectorProps {
134        alter_props: Vec<SqlOption>,
135    },
136}
137
138#[derive(Debug, Clone, PartialEq, Eq, Hash)]
139pub enum AlterIndexOperation {
140    RenameIndex {
141        index_name: ObjectName,
142    },
143    /// `SET PARALLELISM TO <parallelism> [ DEFERRED ]`
144    SetParallelism {
145        parallelism: SetVariableValue,
146        deferred: bool,
147    },
148    /// `SET BACKFILL_PARALLELISM TO <parallelism> [ DEFERRED ]`
149    SetBackfillParallelism {
150        parallelism: SetVariableValue,
151        deferred: bool,
152    },
153    /// `SET CONFIG (key = value, ...)`
154    SetConfig {
155        entries: Vec<SqlOption>,
156    },
157    /// `RESET CONFIG (key, ...)`
158    ResetConfig {
159        keys: Vec<ObjectName>,
160    },
161}
162
163#[derive(Debug, Clone, PartialEq, Eq, Hash)]
164pub enum AlterViewOperation {
165    RenameView {
166        view_name: ObjectName,
167    },
168    ChangeOwner {
169        new_owner_name: Ident,
170    },
171    SetSchema {
172        new_schema_name: ObjectName,
173    },
174    /// `SET PARALLELISM TO <parallelism> [ DEFERRED ]`
175    SetParallelism {
176        parallelism: SetVariableValue,
177        deferred: bool,
178    },
179    /// `SET BACKFILL_PARALLELISM TO <parallelism> [ DEFERRED ]`
180    SetBackfillParallelism {
181        parallelism: SetVariableValue,
182        deferred: bool,
183    },
184    /// `SET RESOURCE_GROUP TO 'RESOURCE GROUP' [ DEFERRED ]`
185    /// `RESET RESOURCE_GROUP [ DEFERRED ]`
186    SetResourceGroup {
187        resource_group: Option<SetVariableValue>,
188        deferred: bool,
189    },
190    /// `SET BACKFILL_RATE_LIMIT TO <rate_limit>`
191    SetBackfillRateLimit {
192        rate_limit: i32,
193    },
194    /// `SWAP WITH <view_name>`
195    SwapRenameView {
196        target_view: ObjectName,
197    },
198    SetStreamingEnableUnalignedJoin {
199        enable: bool,
200    },
201    /// `AS <query>`
202    AsQuery {
203        query: Box<Query>,
204    },
205    /// `SET CONFIG ( streaming.some_config_key = <some_config_value>, .. )`
206    SetConfig {
207        entries: Vec<SqlOption>,
208    },
209    /// `RESET CONFIG ( streaming.some_config_key, .. )`
210    ResetConfig {
211        keys: Vec<ObjectName>,
212    },
213}
214
215#[derive(Debug, Clone, PartialEq, Eq, Hash)]
216pub enum AlterSinkOperation {
217    RenameSink {
218        sink_name: ObjectName,
219    },
220    ChangeOwner {
221        new_owner_name: Ident,
222    },
223    SetSchema {
224        new_schema_name: ObjectName,
225    },
226    /// `SET PARALLELISM TO <parallelism> [ DEFERRED ]`
227    SetParallelism {
228        parallelism: SetVariableValue,
229        deferred: bool,
230    },
231    /// `SET BACKFILL_PARALLELISM TO <parallelism> [ DEFERRED ]`
232    SetBackfillParallelism {
233        parallelism: SetVariableValue,
234        deferred: bool,
235    },
236    /// `SET CONFIG (key = value, ...)`
237    SetConfig {
238        entries: Vec<SqlOption>,
239    },
240    /// `RESET CONFIG (key, ...)`
241    ResetConfig {
242        keys: Vec<ObjectName>,
243    },
244    /// `SWAP WITH <sink_name>`
245    SwapRenameSink {
246        target_sink: ObjectName,
247    },
248    SetSinkRateLimit {
249        rate_limit: i32,
250    },
251    SetBackfillRateLimit {
252        rate_limit: i32,
253    },
254    AlterConnectorProps {
255        alter_props: Vec<SqlOption>,
256    },
257    SetStreamingEnableUnalignedJoin {
258        enable: bool,
259    },
260}
261
262#[derive(Debug, Clone, PartialEq, Eq, Hash)]
263pub enum AlterSubscriptionOperation {
264    RenameSubscription { subscription_name: ObjectName },
265    ChangeOwner { new_owner_name: Ident },
266    SetSchema { new_schema_name: ObjectName },
267    SetRetention { retention: Value },
268    SwapRenameSubscription { target_subscription: ObjectName },
269}
270
271#[derive(Debug, Clone, PartialEq, Eq, Hash)]
272pub enum AlterSourceOperation {
273    RenameSource {
274        source_name: ObjectName,
275    },
276    AddColumn {
277        column_def: ColumnDef,
278    },
279    ChangeOwner {
280        new_owner_name: Ident,
281    },
282    SetSchema {
283        new_schema_name: ObjectName,
284    },
285    FormatEncode {
286        format_encode: FormatEncodeOptions,
287    },
288    RefreshSchema,
289    SetSourceRateLimit {
290        rate_limit: i32,
291    },
292    SwapRenameSource {
293        target_source: ObjectName,
294    },
295    /// `SET PARALLELISM TO <parallelism> [ DEFERRED ]`
296    SetParallelism {
297        parallelism: SetVariableValue,
298        deferred: bool,
299    },
300    /// `SET BACKFILL_PARALLELISM TO <parallelism> [ DEFERRED ]`
301    SetBackfillParallelism {
302        parallelism: SetVariableValue,
303        deferred: bool,
304    },
305    /// `SET CONFIG (key = value, ...)`
306    SetConfig {
307        entries: Vec<SqlOption>,
308    },
309    /// `RESET CONFIG (key, ...)`
310    ResetConfig {
311        keys: Vec<ObjectName>,
312    },
313    /// `RESET` - Reset CDC source offset to latest
314    ResetSource,
315    AlterConnectorProps {
316        alter_props: Vec<SqlOption>,
317    },
318}
319
320#[derive(Debug, Clone, PartialEq, Eq, Hash)]
321pub enum AlterFunctionOperation {
322    SetSchema { new_schema_name: ObjectName },
323    ChangeOwner { new_owner_name: Ident },
324}
325
326#[derive(Debug, Clone, PartialEq, Eq, Hash)]
327pub enum AlterConnectionOperation {
328    SetSchema { new_schema_name: ObjectName },
329    ChangeOwner { new_owner_name: Ident },
330    AlterConnectorProps { alter_props: Vec<SqlOption> },
331}
332
333#[derive(Debug, Clone, PartialEq, Eq, Hash)]
334pub enum AlterSecretOperation {
335    ChangeCredential {
336        with_options: Vec<SqlOption>,
337        new_credential: Value,
338    },
339    ChangeOwner {
340        new_owner_name: Ident,
341    },
342}
343
344#[derive(Debug, Clone, PartialEq, Eq, Hash)]
345pub enum AlterFragmentOperation {
346    AlterBackfillRateLimit { rate_limit: i32 },
347    SetParallelism { parallelism: SetVariableValue },
348}
349
350impl fmt::Display for AlterDatabaseOperation {
351    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
352        match self {
353            AlterDatabaseOperation::ChangeOwner { new_owner_name } => {
354                write!(f, "OWNER TO {}", new_owner_name)
355            }
356            AlterDatabaseOperation::RenameDatabase { database_name } => {
357                write!(f, "RENAME TO {}", database_name)
358            }
359            AlterDatabaseOperation::SetParam(ConfigParam { param, value }) => {
360                write!(f, "SET {} TO {}", param, value)
361            }
362        }
363    }
364}
365
366impl fmt::Display for AlterSchemaOperation {
367    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
368        match self {
369            AlterSchemaOperation::ChangeOwner { new_owner_name } => {
370                write!(f, "OWNER TO {}", new_owner_name)
371            }
372            AlterSchemaOperation::RenameSchema { schema_name } => {
373                write!(f, "RENAME TO {}", schema_name)
374            }
375            AlterSchemaOperation::SwapRenameSchema { target_schema } => {
376                write!(f, "SWAP WITH {}", target_schema)
377            }
378        }
379    }
380}
381
382impl fmt::Display for AlterTableOperation {
383    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
384        match self {
385            AlterTableOperation::AddConstraint(c) => write!(f, "ADD {}", c),
386            AlterTableOperation::AddColumn { column_def } => {
387                write!(f, "ADD COLUMN {}", column_def)
388            }
389            AlterTableOperation::AlterColumn { column_name, op } => {
390                write!(f, "ALTER COLUMN {} {}", column_name, op)
391            }
392            AlterTableOperation::DropConstraint { name } => write!(f, "DROP CONSTRAINT {}", name),
393            AlterTableOperation::DropColumn {
394                column_name,
395                if_exists,
396                cascade,
397            } => write!(
398                f,
399                "DROP COLUMN {}{}{}",
400                if *if_exists { "IF EXISTS " } else { "" },
401                column_name,
402                if *cascade { " CASCADE" } else { "" }
403            ),
404            AlterTableOperation::RenameColumn {
405                old_column_name,
406                new_column_name,
407            } => write!(
408                f,
409                "RENAME COLUMN {} TO {}",
410                old_column_name, new_column_name
411            ),
412            AlterTableOperation::RenameTable { table_name } => {
413                write!(f, "RENAME TO {}", table_name)
414            }
415            AlterTableOperation::ChangeColumn {
416                old_name,
417                new_name,
418                data_type,
419                options,
420            } => {
421                write!(f, "CHANGE COLUMN {} {} {}", old_name, new_name, data_type)?;
422                if options.is_empty() {
423                    Ok(())
424                } else {
425                    write!(f, " {}", display_separated(options, " "))
426                }
427            }
428            AlterTableOperation::RenameConstraint { old_name, new_name } => {
429                write!(f, "RENAME CONSTRAINT {} TO {}", old_name, new_name)
430            }
431            AlterTableOperation::ChangeOwner { new_owner_name } => {
432                write!(f, "OWNER TO {}", new_owner_name)
433            }
434            AlterTableOperation::SetSchema { new_schema_name } => {
435                write!(f, "SET SCHEMA {}", new_schema_name)
436            }
437            AlterTableOperation::SetParallelism {
438                parallelism,
439                deferred,
440            } => {
441                write!(
442                    f,
443                    "SET PARALLELISM TO {}{}",
444                    parallelism,
445                    if *deferred { " DEFERRED" } else { "" }
446                )
447            }
448            AlterTableOperation::SetBackfillParallelism {
449                parallelism,
450                deferred,
451            } => {
452                write!(
453                    f,
454                    "SET BACKFILL_PARALLELISM TO {}{}",
455                    parallelism,
456                    if *deferred { " DEFERRED" } else { "" }
457                )
458            }
459            AlterTableOperation::SetConfig { entries } => {
460                write!(f, "SET CONFIG ({})", display_comma_separated(entries))
461            }
462            AlterTableOperation::ResetConfig { keys } => {
463                write!(f, "RESET CONFIG ({})", display_comma_separated(keys))
464            }
465            AlterTableOperation::RefreshSchema => {
466                write!(f, "REFRESH SCHEMA")
467            }
468            AlterTableOperation::SetSourceRateLimit { rate_limit } => {
469                write!(f, "SET SOURCE_RATE_LIMIT TO {}", rate_limit)
470            }
471            AlterTableOperation::SetBackfillRateLimit { rate_limit } => {
472                write!(f, "SET BACKFILL_RATE_LIMIT TO {}", rate_limit)
473            }
474            AlterTableOperation::SetDmlRateLimit { rate_limit } => {
475                write!(f, "SET DML_RATE_LIMIT TO {}", rate_limit)
476            }
477            AlterTableOperation::SwapRenameTable { target_table } => {
478                write!(f, "SWAP WITH {}", target_table)
479            }
480            AlterTableOperation::DropConnector => {
481                write!(f, "DROP CONNECTOR")
482            }
483            AlterTableOperation::AlterConnectorProps { alter_props } => {
484                write!(
485                    f,
486                    "CONNECTOR WITH ({})",
487                    display_comma_separated(alter_props)
488                )
489            }
490        }
491    }
492}
493
494impl fmt::Display for AlterIndexOperation {
495    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
496        match self {
497            AlterIndexOperation::RenameIndex { index_name } => {
498                write!(f, "RENAME TO {index_name}")
499            }
500            AlterIndexOperation::SetParallelism {
501                parallelism,
502                deferred,
503            } => {
504                write!(
505                    f,
506                    "SET PARALLELISM TO {}{}",
507                    parallelism,
508                    if *deferred { " DEFERRED" } else { "" }
509                )
510            }
511            AlterIndexOperation::SetBackfillParallelism {
512                parallelism,
513                deferred,
514            } => {
515                write!(
516                    f,
517                    "SET BACKFILL_PARALLELISM TO {}{}",
518                    parallelism,
519                    if *deferred { " DEFERRED" } else { "" }
520                )
521            }
522            AlterIndexOperation::SetConfig { entries } => {
523                write!(f, "SET CONFIG ({})", display_comma_separated(entries))
524            }
525            AlterIndexOperation::ResetConfig { keys } => {
526                write!(f, "RESET CONFIG ({})", display_comma_separated(keys))
527            }
528        }
529    }
530}
531
532impl fmt::Display for AlterViewOperation {
533    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
534        match self {
535            AlterViewOperation::RenameView { view_name } => {
536                write!(f, "RENAME TO {view_name}")
537            }
538            AlterViewOperation::ChangeOwner { new_owner_name } => {
539                write!(f, "OWNER TO {}", new_owner_name)
540            }
541            AlterViewOperation::SetSchema { new_schema_name } => {
542                write!(f, "SET SCHEMA {}", new_schema_name)
543            }
544            AlterViewOperation::SetParallelism {
545                parallelism,
546                deferred,
547            } => {
548                write!(
549                    f,
550                    "SET PARALLELISM TO {}{}",
551                    parallelism,
552                    if *deferred { " DEFERRED" } else { "" }
553                )
554            }
555            AlterViewOperation::SetBackfillParallelism {
556                parallelism,
557                deferred,
558            } => {
559                write!(
560                    f,
561                    "SET BACKFILL_PARALLELISM TO {}{}",
562                    parallelism,
563                    if *deferred { " DEFERRED" } else { "" }
564                )
565            }
566            AlterViewOperation::SetBackfillRateLimit { rate_limit } => {
567                write!(f, "SET BACKFILL_RATE_LIMIT TO {}", rate_limit)
568            }
569            AlterViewOperation::SwapRenameView { target_view } => {
570                write!(f, "SWAP WITH {}", target_view)
571            }
572            AlterViewOperation::SetResourceGroup {
573                resource_group,
574                deferred,
575            } => {
576                let deferred = if *deferred { " DEFERRED" } else { "" };
577
578                if let Some(resource_group) = resource_group {
579                    write!(f, "SET RESOURCE_GROUP TO {} {}", resource_group, deferred)
580                } else {
581                    write!(f, "RESET RESOURCE_GROUP {}", deferred)
582                }
583            }
584            AlterViewOperation::SetStreamingEnableUnalignedJoin { enable } => {
585                write!(f, "SET STREAMING_ENABLE_UNALIGNED_JOIN TO {}", enable)
586            }
587            AlterViewOperation::AsQuery { query } => {
588                write!(f, "AS {}", query)
589            }
590            AlterViewOperation::SetConfig { entries } => {
591                write!(f, "SET CONFIG ({})", display_comma_separated(entries))
592            }
593            AlterViewOperation::ResetConfig { keys } => {
594                write!(f, "RESET CONFIG ({})", display_comma_separated(keys))
595            }
596        }
597    }
598}
599
600impl fmt::Display for AlterSinkOperation {
601    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
602        match self {
603            AlterSinkOperation::RenameSink { sink_name } => {
604                write!(f, "RENAME TO {sink_name}")
605            }
606            AlterSinkOperation::ChangeOwner { new_owner_name } => {
607                write!(f, "OWNER TO {}", new_owner_name)
608            }
609            AlterSinkOperation::SetSchema { new_schema_name } => {
610                write!(f, "SET SCHEMA {}", new_schema_name)
611            }
612            AlterSinkOperation::SetParallelism {
613                parallelism,
614                deferred,
615            } => {
616                write!(
617                    f,
618                    "SET PARALLELISM TO {}{}",
619                    parallelism,
620                    if *deferred { " DEFERRED" } else { "" }
621                )
622            }
623            AlterSinkOperation::SetBackfillParallelism {
624                parallelism,
625                deferred,
626            } => {
627                write!(
628                    f,
629                    "SET BACKFILL_PARALLELISM TO {}{}",
630                    parallelism,
631                    if *deferred { " DEFERRED" } else { "" }
632                )
633            }
634            AlterSinkOperation::SetConfig { entries } => {
635                write!(f, "SET CONFIG ({})", display_comma_separated(entries))
636            }
637            AlterSinkOperation::ResetConfig { keys } => {
638                write!(f, "RESET CONFIG ({})", display_comma_separated(keys))
639            }
640            AlterSinkOperation::SwapRenameSink { target_sink } => {
641                write!(f, "SWAP WITH {}", target_sink)
642            }
643            AlterSinkOperation::SetSinkRateLimit { rate_limit } => {
644                write!(f, "SET SINK_RATE_LIMIT TO {}", rate_limit)
645            }
646            AlterSinkOperation::SetBackfillRateLimit { rate_limit } => {
647                write!(f, "SET BACKFILL_RATE_LIMIT TO {}", rate_limit)
648            }
649            AlterSinkOperation::AlterConnectorProps {
650                alter_props: changed_props,
651            } => {
652                write!(
653                    f,
654                    "CONNECTOR WITH ({})",
655                    display_comma_separated(changed_props)
656                )
657            }
658            AlterSinkOperation::SetStreamingEnableUnalignedJoin { enable } => {
659                write!(f, "SET STREAMING_ENABLE_UNALIGNED_JOIN TO {}", enable)
660            }
661        }
662    }
663}
664
665impl fmt::Display for AlterSubscriptionOperation {
666    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
667        match self {
668            AlterSubscriptionOperation::RenameSubscription { subscription_name } => {
669                write!(f, "RENAME TO {subscription_name}")
670            }
671            AlterSubscriptionOperation::ChangeOwner { new_owner_name } => {
672                write!(f, "OWNER TO {}", new_owner_name)
673            }
674            AlterSubscriptionOperation::SetSchema { new_schema_name } => {
675                write!(f, "SET SCHEMA {}", new_schema_name)
676            }
677            AlterSubscriptionOperation::SetRetention { retention } => {
678                write!(f, "SET RETENTION TO {}", retention)
679            }
680            AlterSubscriptionOperation::SwapRenameSubscription {
681                target_subscription,
682            } => {
683                write!(f, "SWAP WITH {}", target_subscription)
684            }
685        }
686    }
687}
688
689impl fmt::Display for AlterSourceOperation {
690    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
691        match self {
692            AlterSourceOperation::RenameSource { source_name } => {
693                write!(f, "RENAME TO {source_name}")
694            }
695            AlterSourceOperation::AddColumn { column_def } => {
696                write!(f, "ADD COLUMN {column_def}")
697            }
698            AlterSourceOperation::ChangeOwner { new_owner_name } => {
699                write!(f, "OWNER TO {}", new_owner_name)
700            }
701            AlterSourceOperation::SetSchema { new_schema_name } => {
702                write!(f, "SET SCHEMA {}", new_schema_name)
703            }
704            AlterSourceOperation::FormatEncode { format_encode } => {
705                write!(f, "{format_encode}")
706            }
707            AlterSourceOperation::RefreshSchema => {
708                write!(f, "REFRESH SCHEMA")
709            }
710            AlterSourceOperation::SetSourceRateLimit { rate_limit } => {
711                write!(f, "SET SOURCE_RATE_LIMIT TO {}", rate_limit)
712            }
713            AlterSourceOperation::SwapRenameSource { target_source } => {
714                write!(f, "SWAP WITH {}", target_source)
715            }
716            AlterSourceOperation::SetParallelism {
717                parallelism,
718                deferred,
719            } => {
720                write!(
721                    f,
722                    "SET PARALLELISM TO {}{}",
723                    parallelism,
724                    if *deferred { " DEFERRED" } else { "" }
725                )
726            }
727            AlterSourceOperation::SetBackfillParallelism {
728                parallelism,
729                deferred,
730            } => {
731                write!(
732                    f,
733                    "SET BACKFILL_PARALLELISM TO {}{}",
734                    parallelism,
735                    if *deferred { " DEFERRED" } else { "" }
736                )
737            }
738            AlterSourceOperation::SetConfig { entries } => {
739                write!(f, "SET CONFIG ({})", display_comma_separated(entries))
740            }
741            AlterSourceOperation::ResetConfig { keys } => {
742                write!(f, "RESET CONFIG ({})", display_comma_separated(keys))
743            }
744            AlterSourceOperation::ResetSource => {
745                write!(f, "RESET")
746            }
747            AlterSourceOperation::AlterConnectorProps { alter_props } => {
748                write!(
749                    f,
750                    "CONNECTOR WITH ({})",
751                    display_comma_separated(alter_props)
752                )
753            }
754        }
755    }
756}
757
758impl fmt::Display for AlterFunctionOperation {
759    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
760        match self {
761            AlterFunctionOperation::SetSchema { new_schema_name } => {
762                write!(f, "SET SCHEMA {new_schema_name}")
763            }
764            AlterFunctionOperation::ChangeOwner { new_owner_name } => {
765                write!(f, "OWNER TO {new_owner_name}")
766            }
767        }
768    }
769}
770
771impl fmt::Display for AlterConnectionOperation {
772    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
773        match self {
774            AlterConnectionOperation::SetSchema { new_schema_name } => {
775                write!(f, "SET SCHEMA {new_schema_name}")
776            }
777            AlterConnectionOperation::ChangeOwner { new_owner_name } => {
778                write!(f, "OWNER TO {new_owner_name}")
779            }
780            AlterConnectionOperation::AlterConnectorProps { alter_props } => {
781                write!(
782                    f,
783                    "CONNECTOR WITH ({})",
784                    display_comma_separated(alter_props)
785                )
786            }
787        }
788    }
789}
790
791impl fmt::Display for AlterSecretOperation {
792    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
793        match self {
794            AlterSecretOperation::ChangeCredential {
795                new_credential,
796                with_options,
797            } => {
798                write!(
799                    f,
800                    "WITH ({}) AS {}",
801                    display_comma_separated(with_options),
802                    new_credential
803                )
804            }
805            AlterSecretOperation::ChangeOwner { new_owner_name } => {
806                write!(f, "OWNER TO {new_owner_name}")
807            }
808        }
809    }
810}
811
812/// An `ALTER COLUMN` (`Statement::AlterTable`) operation
813#[derive(Debug, Clone, PartialEq, Eq, Hash)]
814pub enum AlterColumnOperation {
815    /// `SET NOT NULL`
816    SetNotNull,
817    /// `DROP NOT NULL`
818    DropNotNull,
819    /// `SET DEFAULT <expr>`
820    SetDefault { value: Expr },
821    /// `DROP DEFAULT`
822    DropDefault,
823    /// `[SET DATA] TYPE <data_type> [USING <expr>]`
824    SetDataType {
825        data_type: DataType,
826        /// PostgreSQL specific
827        using: Option<Expr>,
828    },
829}
830
831impl fmt::Display for AlterColumnOperation {
832    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
833        match self {
834            AlterColumnOperation::SetNotNull => write!(f, "SET NOT NULL",),
835            AlterColumnOperation::DropNotNull => write!(f, "DROP NOT NULL",),
836            AlterColumnOperation::SetDefault { value } => {
837                write!(f, "SET DEFAULT {}", value)
838            }
839            AlterColumnOperation::DropDefault => {
840                write!(f, "DROP DEFAULT")
841            }
842            AlterColumnOperation::SetDataType { data_type, using } => {
843                if let Some(expr) = using {
844                    write!(f, "SET DATA TYPE {} USING {}", data_type, expr)
845                } else {
846                    write!(f, "SET DATA TYPE {}", data_type)
847                }
848            }
849        }
850    }
851}
852
853impl fmt::Display for AlterFragmentOperation {
854    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
855        match self {
856            AlterFragmentOperation::AlterBackfillRateLimit { rate_limit } => {
857                write!(f, "SET BACKFILL_RATE_LIMIT TO {}", rate_limit)
858            }
859            AlterFragmentOperation::SetParallelism { parallelism } => {
860                write!(f, "SET PARALLELISM TO {}", parallelism)
861            }
862        }
863    }
864}
865
866/// The watermark on source.
867/// `WATERMARK FOR <column> AS (<expr>)`
868#[derive(Debug, Clone, PartialEq, Eq, Hash)]
869pub struct SourceWatermark {
870    pub column: Ident,
871    pub expr: Expr,
872    /// Whether `WITH TTL` is specified.
873    pub with_ttl: bool,
874}
875
876impl fmt::Display for SourceWatermark {
877    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
878        write!(f, "WATERMARK FOR {} AS {}", self.column, self.expr,)?;
879        if self.with_ttl {
880            write!(f, " WITH TTL")?;
881        }
882        Ok(())
883    }
884}
885
886/// A table-level constraint, specified in a `CREATE TABLE` or an
887/// `ALTER TABLE ADD <constraint>` statement.
888#[derive(Debug, Clone, PartialEq, Eq, Hash)]
889pub enum TableConstraint {
890    /// `[ CONSTRAINT <name> ] { PRIMARY KEY | UNIQUE } (<columns>)`
891    Unique {
892        name: Option<Ident>,
893        columns: Vec<Ident>,
894        /// Whether this is a `PRIMARY KEY` or just a `UNIQUE` constraint
895        is_primary: bool,
896    },
897    /// A referential integrity constraint (`[ CONSTRAINT <name> ] FOREIGN KEY (<columns>)
898    /// REFERENCES <foreign_table> (<referred_columns>)
899    /// { [ON DELETE <referential_action>] [ON UPDATE <referential_action>] |
900    ///   [ON UPDATE <referential_action>] [ON DELETE <referential_action>]
901    /// }`).
902    ForeignKey {
903        name: Option<Ident>,
904        columns: Vec<Ident>,
905        foreign_table: ObjectName,
906        referred_columns: Vec<Ident>,
907        on_delete: Option<ReferentialAction>,
908        on_update: Option<ReferentialAction>,
909    },
910    /// `[ CONSTRAINT <name> ] CHECK (<expr>)`
911    Check {
912        name: Option<Ident>,
913        expr: Box<Expr>,
914    },
915}
916
917impl fmt::Display for TableConstraint {
918    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
919        match self {
920            TableConstraint::Unique {
921                name,
922                columns,
923                is_primary,
924            } => write!(
925                f,
926                "{}{} ({})",
927                display_constraint_name(name),
928                if *is_primary { "PRIMARY KEY" } else { "UNIQUE" },
929                display_comma_separated(columns)
930            ),
931            TableConstraint::ForeignKey {
932                name,
933                columns,
934                foreign_table,
935                referred_columns,
936                on_delete,
937                on_update,
938            } => {
939                write!(
940                    f,
941                    "{}FOREIGN KEY ({}) REFERENCES {}({})",
942                    display_constraint_name(name),
943                    display_comma_separated(columns),
944                    foreign_table,
945                    display_comma_separated(referred_columns),
946                )?;
947                if let Some(action) = on_delete {
948                    write!(f, " ON DELETE {}", action)?;
949                }
950                if let Some(action) = on_update {
951                    write!(f, " ON UPDATE {}", action)?;
952                }
953                Ok(())
954            }
955            TableConstraint::Check { name, expr } => {
956                write!(f, "{}CHECK ({})", display_constraint_name(name), expr)
957            }
958        }
959    }
960}
961
962/// SQL column definition
963#[derive(Debug, Clone, PartialEq, Eq, Hash)]
964pub struct ColumnDef {
965    pub name: Ident,
966    pub data_type: Option<DataType>,
967    pub collation: Option<ObjectName>,
968    pub options: Vec<ColumnOptionDef>,
969}
970
971impl ColumnDef {
972    pub fn new(
973        name: Ident,
974        data_type: DataType,
975        collation: Option<ObjectName>,
976        options: Vec<ColumnOptionDef>,
977    ) -> Self {
978        ColumnDef {
979            name,
980            data_type: Some(data_type),
981            collation,
982            options,
983        }
984    }
985
986    pub fn is_generated(&self) -> bool {
987        self.options
988            .iter()
989            .any(|option| matches!(option.option, ColumnOption::GeneratedColumns(_)))
990    }
991}
992
993impl fmt::Display for ColumnDef {
994    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
995        write!(
996            f,
997            "{} {}",
998            self.name,
999            if let Some(data_type) = &self.data_type {
1000                data_type.to_string()
1001            } else {
1002                "None".to_owned()
1003            }
1004        )?;
1005        for option in &self.options {
1006            write!(f, " {}", option)?;
1007        }
1008        Ok(())
1009    }
1010}
1011
1012/// An optionally-named `ColumnOption`: `[ CONSTRAINT <name> ] <column-option>`.
1013///
1014/// Note that implementations are substantially more permissive than the ANSI
1015/// specification on what order column options can be presented in, and whether
1016/// they are allowed to be named. The specification distinguishes between
1017/// constraints (NOT NULL, UNIQUE, PRIMARY KEY, and CHECK), which can be named
1018/// and can appear in any order, and other options (DEFAULT, GENERATED), which
1019/// cannot be named and must appear in a fixed order. PostgreSQL, however,
1020/// allows preceding any option with `CONSTRAINT <name>`, even those that are
1021/// not really constraints, like NULL and DEFAULT. MSSQL is less permissive,
1022/// allowing DEFAULT, UNIQUE, PRIMARY KEY and CHECK to be named, but not NULL or
1023/// NOT NULL constraints (the last of which is in violation of the spec).
1024///
1025/// For maximum flexibility, we don't distinguish between constraint and
1026/// non-constraint options, lumping them all together under the umbrella of
1027/// "column options," and we allow any column option to be named.
1028#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1029pub struct ColumnOptionDef {
1030    pub name: Option<Ident>,
1031    pub option: ColumnOption,
1032}
1033
1034impl fmt::Display for ColumnOptionDef {
1035    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1036        write!(f, "{}{}", display_constraint_name(&self.name), self.option)
1037    }
1038}
1039
1040/// `ColumnOption`s are modifiers that follow a column definition in a `CREATE
1041/// TABLE` statement.
1042#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1043pub enum ColumnOption {
1044    /// `NULL`
1045    Null,
1046    /// `NOT NULL`
1047    NotNull,
1048    /// `DEFAULT <restricted-expr>`
1049    DefaultValue(Expr),
1050    /// Default value from previous bound `DefaultColumnDesc`. Used internally
1051    /// for schema change and should not be specified by users.
1052    DefaultValueInternal {
1053        /// Protobuf encoded `DefaultColumnDesc`.
1054        persisted: Box<[u8]>,
1055        /// Optional AST for unparsing. If `None`, the default value will be
1056        /// shown as `DEFAULT INTERNAL` which is for demonstrating and should
1057        /// not be specified by users.
1058        expr: Option<Expr>,
1059    },
1060    /// `{ PRIMARY KEY | UNIQUE }`
1061    Unique { is_primary: bool },
1062    /// A referential integrity constraint (`[FOREIGN KEY REFERENCES
1063    /// <foreign_table> (<referred_columns>)
1064    /// { [ON DELETE <referential_action>] [ON UPDATE <referential_action>] |
1065    ///   [ON UPDATE <referential_action>] [ON DELETE <referential_action>]
1066    /// }`).
1067    ForeignKey {
1068        foreign_table: ObjectName,
1069        referred_columns: Vec<Ident>,
1070        on_delete: Option<ReferentialAction>,
1071        on_update: Option<ReferentialAction>,
1072    },
1073    /// `CHECK (<expr>)`
1074    Check(Expr),
1075    /// Dialect-specific options, such as:
1076    /// - MySQL's `AUTO_INCREMENT` or SQLite's `AUTOINCREMENT`
1077    /// - ...
1078    DialectSpecific(Vec<Token>),
1079    /// AS ( <generation_expr> )`
1080    GeneratedColumns(Expr),
1081}
1082
1083impl fmt::Display for ColumnOption {
1084    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1085        use ColumnOption::*;
1086        match self {
1087            Null => write!(f, "NULL"),
1088            NotNull => write!(f, "NOT NULL"),
1089            DefaultValue(expr) => write!(f, "DEFAULT {}", expr),
1090            DefaultValueInternal { persisted: _, expr } => {
1091                if let Some(expr) = expr {
1092                    write!(f, "DEFAULT {}", expr)
1093                } else {
1094                    write!(f, "DEFAULT INTERNAL")
1095                }
1096            }
1097            Unique { is_primary } => {
1098                write!(f, "{}", if *is_primary { "PRIMARY KEY" } else { "UNIQUE" })
1099            }
1100            ForeignKey {
1101                foreign_table,
1102                referred_columns,
1103                on_delete,
1104                on_update,
1105            } => {
1106                write!(f, "REFERENCES {}", foreign_table)?;
1107                if !referred_columns.is_empty() {
1108                    write!(f, " ({})", display_comma_separated(referred_columns))?;
1109                }
1110                if let Some(action) = on_delete {
1111                    write!(f, " ON DELETE {}", action)?;
1112                }
1113                if let Some(action) = on_update {
1114                    write!(f, " ON UPDATE {}", action)?;
1115                }
1116                Ok(())
1117            }
1118            Check(expr) => write!(f, "CHECK ({})", expr),
1119            DialectSpecific(val) => write!(f, "{}", display_separated(val, " ")),
1120            GeneratedColumns(expr) => write!(f, "AS {}", expr),
1121        }
1122    }
1123}
1124
1125fn display_constraint_name(name: &'_ Option<Ident>) -> impl fmt::Display + '_ {
1126    struct ConstraintName<'a>(&'a Option<Ident>);
1127    impl fmt::Display for ConstraintName<'_> {
1128        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1129            if let Some(name) = self.0 {
1130                write!(f, "CONSTRAINT {} ", name)?;
1131            }
1132            Ok(())
1133        }
1134    }
1135    ConstraintName(name)
1136}
1137
1138/// `<referential_action> =
1139/// { RESTRICT | CASCADE | SET NULL | NO ACTION | SET DEFAULT }`
1140///
1141/// Used in foreign key constraints in `ON UPDATE` and `ON DELETE` options.
1142#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1143pub enum ReferentialAction {
1144    Restrict,
1145    Cascade,
1146    SetNull,
1147    NoAction,
1148    SetDefault,
1149}
1150
1151impl fmt::Display for ReferentialAction {
1152    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1153        f.write_str(match self {
1154            ReferentialAction::Restrict => "RESTRICT",
1155            ReferentialAction::Cascade => "CASCADE",
1156            ReferentialAction::SetNull => "SET NULL",
1157            ReferentialAction::NoAction => "NO ACTION",
1158            ReferentialAction::SetDefault => "SET DEFAULT",
1159        })
1160    }
1161}
1162
1163/// secure secret definition for webhook source
1164#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1165pub struct WebhookSourceInfo {
1166    pub secret_ref: Option<SecretRefValue>,
1167    pub signature_expr: Option<Expr>,
1168    pub wait_for_persistence: bool,
1169    pub is_batched: bool,
1170}