risingwave_sqlparser/ast/
ddl.rs

1// Licensed under the Apache License, Version 2.0 (the "License");
2// you may not use this file except in compliance with the License.
3// You may obtain a copy of the License at
4//
5//     http://www.apache.org/licenses/LICENSE-2.0
6//
7// Unless required by applicable law or agreed to in writing, software
8// distributed under the License is distributed on an "AS IS" BASIS,
9// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10// See the License for the specific language governing permissions and
11// limitations under the License.
12
13//! AST types specific to CREATE/ALTER variants of [`crate::ast::Statement`]
14//! (commonly referred to as Data Definition Language, or DDL)
15
16use std::fmt;
17
18use super::{ConfigParam, FormatEncodeOptions, SqlOption};
19use crate::ast::{
20    DataType, Expr, Ident, ObjectName, Query, SecretRefValue, SetVariableValue, Value,
21    display_comma_separated, display_separated,
22};
23use crate::tokenizer::Token;
24
25#[derive(Debug, Clone, PartialEq, Eq, Hash)]
26pub enum AlterDatabaseOperation {
27    ChangeOwner { new_owner_name: Ident },
28    RenameDatabase { database_name: ObjectName },
29    SetParam(ConfigParam),
30}
31
32#[derive(Debug, Clone, PartialEq, Eq, Hash)]
33pub enum AlterSchemaOperation {
34    ChangeOwner { new_owner_name: Ident },
35    RenameSchema { schema_name: ObjectName },
36    SwapRenameSchema { target_schema: ObjectName },
37}
38
39/// An `ALTER TABLE` (`Statement::AlterTable`) operation
40#[derive(Debug, Clone, PartialEq, Eq, Hash)]
41pub enum AlterTableOperation {
42    /// `ADD <table_constraint>`
43    AddConstraint(TableConstraint),
44    /// `ADD [ COLUMN ] <column_def>`
45    AddColumn {
46        column_def: ColumnDef,
47    },
48    /// TODO: implement `DROP CONSTRAINT <name>`
49    DropConstraint {
50        name: Ident,
51    },
52    /// `DROP [ COLUMN ] [ IF EXISTS ] <column_name> [ CASCADE ]`
53    DropColumn {
54        column_name: Ident,
55        if_exists: bool,
56        cascade: bool,
57    },
58    /// `RENAME [ COLUMN ] <old_column_name> TO <new_column_name>`
59    RenameColumn {
60        old_column_name: Ident,
61        new_column_name: Ident,
62    },
63    /// `RENAME TO <table_name>`
64    RenameTable {
65        table_name: ObjectName,
66    },
67    // CHANGE [ COLUMN ] <old_name> <new_name> <data_type> [ <options> ]
68    ChangeColumn {
69        old_name: Ident,
70        new_name: Ident,
71        data_type: DataType,
72        options: Vec<ColumnOption>,
73    },
74    /// `RENAME CONSTRAINT <old_constraint_name> TO <new_constraint_name>`
75    ///
76    /// Note: this is a PostgreSQL-specific operation.
77    RenameConstraint {
78        old_name: Ident,
79        new_name: Ident,
80    },
81    /// `ALTER [ COLUMN ]`
82    AlterColumn {
83        column_name: Ident,
84        op: AlterColumnOperation,
85    },
86    /// `OWNER TO <owner_name>`
87    ChangeOwner {
88        new_owner_name: Ident,
89    },
90    /// `SET SCHEMA <schema_name>`
91    SetSchema {
92        new_schema_name: ObjectName,
93    },
94    /// `SET PARALLELISM TO <parallelism> [ DEFERRED ]`
95    SetParallelism {
96        parallelism: SetVariableValue,
97        deferred: bool,
98    },
99    /// `SET CONFIG (key = value, ...)`
100    SetConfig {
101        entries: Vec<SqlOption>,
102    },
103    /// `RESET CONFIG (key, ...)`
104    ResetConfig {
105        keys: Vec<ObjectName>,
106    },
107    RefreshSchema,
108    /// `SET SOURCE_RATE_LIMIT TO <rate_limit>`
109    SetSourceRateLimit {
110        rate_limit: i32,
111    },
112    /// SET BACKFILL_RATE_LIMIT TO <rate_limit>
113    SetBackfillRateLimit {
114        rate_limit: i32,
115    },
116    /// `SET DML_RATE_LIMIT TO <rate_limit>`
117    SetDmlRateLimit {
118        rate_limit: i32,
119    },
120    /// `SWAP WITH <table_name>`
121    SwapRenameTable {
122        target_table: ObjectName,
123    },
124    /// `DROP CONNECTOR`
125    DropConnector,
126
127    /// `ALTER CONNECTOR WITH (<connector_props>)`
128    AlterConnectorProps {
129        alter_props: Vec<SqlOption>,
130    },
131}
132
133#[derive(Debug, Clone, PartialEq, Eq, Hash)]
134pub enum AlterIndexOperation {
135    RenameIndex {
136        index_name: ObjectName,
137    },
138    /// `SET PARALLELISM TO <parallelism> [ DEFERRED ]`
139    SetParallelism {
140        parallelism: SetVariableValue,
141        deferred: bool,
142    },
143    /// `SET CONFIG (key = value, ...)`
144    SetConfig {
145        entries: Vec<SqlOption>,
146    },
147    /// `RESET CONFIG (key, ...)`
148    ResetConfig {
149        keys: Vec<ObjectName>,
150    },
151}
152
153#[derive(Debug, Clone, PartialEq, Eq, Hash)]
154pub enum AlterViewOperation {
155    RenameView {
156        view_name: ObjectName,
157    },
158    ChangeOwner {
159        new_owner_name: Ident,
160    },
161    SetSchema {
162        new_schema_name: ObjectName,
163    },
164    /// `SET PARALLELISM TO <parallelism> [ DEFERRED ]`
165    SetParallelism {
166        parallelism: SetVariableValue,
167        deferred: bool,
168    },
169    /// `SET RESOURCE_GROUP TO 'RESOURCE GROUP' [ DEFERRED ]`
170    /// `RESET RESOURCE_GROUP [ DEFERRED ]`
171    SetResourceGroup {
172        resource_group: Option<SetVariableValue>,
173        deferred: bool,
174    },
175    /// `SET BACKFILL_RATE_LIMIT TO <rate_limit>`
176    SetBackfillRateLimit {
177        rate_limit: i32,
178    },
179    /// `SWAP WITH <view_name>`
180    SwapRenameView {
181        target_view: ObjectName,
182    },
183    SetStreamingEnableUnalignedJoin {
184        enable: bool,
185    },
186    /// `AS <query>`
187    AsQuery {
188        query: Box<Query>,
189    },
190    /// `SET CONFIG ( streaming.some_config_key = <some_config_value>, .. )`
191    SetConfig {
192        entries: Vec<SqlOption>,
193    },
194    /// `RESET CONFIG ( streaming.some_config_key, .. )`
195    ResetConfig {
196        keys: Vec<ObjectName>,
197    },
198}
199
200#[derive(Debug, Clone, PartialEq, Eq, Hash)]
201pub enum AlterSinkOperation {
202    RenameSink {
203        sink_name: ObjectName,
204    },
205    ChangeOwner {
206        new_owner_name: Ident,
207    },
208    SetSchema {
209        new_schema_name: ObjectName,
210    },
211    /// `SET PARALLELISM TO <parallelism> [ DEFERRED ]`
212    SetParallelism {
213        parallelism: SetVariableValue,
214        deferred: bool,
215    },
216    /// `SET CONFIG (key = value, ...)`
217    SetConfig {
218        entries: Vec<SqlOption>,
219    },
220    /// `RESET CONFIG (key, ...)`
221    ResetConfig {
222        keys: Vec<ObjectName>,
223    },
224    /// `SWAP WITH <sink_name>`
225    SwapRenameSink {
226        target_sink: ObjectName,
227    },
228    SetSinkRateLimit {
229        rate_limit: i32,
230    },
231    SetBackfillRateLimit {
232        rate_limit: i32,
233    },
234    AlterConnectorProps {
235        alter_props: Vec<SqlOption>,
236    },
237    SetStreamingEnableUnalignedJoin {
238        enable: bool,
239    },
240}
241
242#[derive(Debug, Clone, PartialEq, Eq, Hash)]
243pub enum AlterSubscriptionOperation {
244    RenameSubscription { subscription_name: ObjectName },
245    ChangeOwner { new_owner_name: Ident },
246    SetSchema { new_schema_name: ObjectName },
247    SwapRenameSubscription { target_subscription: ObjectName },
248}
249
250#[derive(Debug, Clone, PartialEq, Eq, Hash)]
251pub enum AlterSourceOperation {
252    RenameSource {
253        source_name: ObjectName,
254    },
255    AddColumn {
256        column_def: ColumnDef,
257    },
258    ChangeOwner {
259        new_owner_name: Ident,
260    },
261    SetSchema {
262        new_schema_name: ObjectName,
263    },
264    FormatEncode {
265        format_encode: FormatEncodeOptions,
266    },
267    RefreshSchema,
268    SetSourceRateLimit {
269        rate_limit: i32,
270    },
271    SwapRenameSource {
272        target_source: ObjectName,
273    },
274    /// `SET PARALLELISM TO <parallelism> [ DEFERRED ]`
275    SetParallelism {
276        parallelism: SetVariableValue,
277        deferred: bool,
278    },
279    /// `SET CONFIG (key = value, ...)`
280    SetConfig {
281        entries: Vec<SqlOption>,
282    },
283    /// `RESET CONFIG (key, ...)`
284    ResetConfig {
285        keys: Vec<ObjectName>,
286    },
287    /// `RESET` - Reset CDC source offset to latest
288    ResetSource,
289    AlterConnectorProps {
290        alter_props: Vec<SqlOption>,
291    },
292}
293
294#[derive(Debug, Clone, PartialEq, Eq, Hash)]
295pub enum AlterFunctionOperation {
296    SetSchema { new_schema_name: ObjectName },
297    ChangeOwner { new_owner_name: Ident },
298}
299
300#[derive(Debug, Clone, PartialEq, Eq, Hash)]
301pub enum AlterConnectionOperation {
302    SetSchema { new_schema_name: ObjectName },
303    ChangeOwner { new_owner_name: Ident },
304    AlterConnectorProps { alter_props: Vec<SqlOption> },
305}
306
307#[derive(Debug, Clone, PartialEq, Eq, Hash)]
308pub enum AlterSecretOperation {
309    ChangeCredential {
310        with_options: Vec<SqlOption>,
311        new_credential: Value,
312    },
313    ChangeOwner {
314        new_owner_name: Ident,
315    },
316}
317
318#[derive(Debug, Clone, PartialEq, Eq, Hash)]
319pub enum AlterFragmentOperation {
320    AlterBackfillRateLimit { rate_limit: i32 },
321    SetParallelism { parallelism: SetVariableValue },
322}
323
324impl fmt::Display for AlterDatabaseOperation {
325    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
326        match self {
327            AlterDatabaseOperation::ChangeOwner { new_owner_name } => {
328                write!(f, "OWNER TO {}", new_owner_name)
329            }
330            AlterDatabaseOperation::RenameDatabase { database_name } => {
331                write!(f, "RENAME TO {}", database_name)
332            }
333            AlterDatabaseOperation::SetParam(ConfigParam { param, value }) => {
334                write!(f, "SET {} TO {}", param, value)
335            }
336        }
337    }
338}
339
340impl fmt::Display for AlterSchemaOperation {
341    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
342        match self {
343            AlterSchemaOperation::ChangeOwner { new_owner_name } => {
344                write!(f, "OWNER TO {}", new_owner_name)
345            }
346            AlterSchemaOperation::RenameSchema { schema_name } => {
347                write!(f, "RENAME TO {}", schema_name)
348            }
349            AlterSchemaOperation::SwapRenameSchema { target_schema } => {
350                write!(f, "SWAP WITH {}", target_schema)
351            }
352        }
353    }
354}
355
356impl fmt::Display for AlterTableOperation {
357    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
358        match self {
359            AlterTableOperation::AddConstraint(c) => write!(f, "ADD {}", c),
360            AlterTableOperation::AddColumn { column_def } => {
361                write!(f, "ADD COLUMN {}", column_def)
362            }
363            AlterTableOperation::AlterColumn { column_name, op } => {
364                write!(f, "ALTER COLUMN {} {}", column_name, op)
365            }
366            AlterTableOperation::DropConstraint { name } => write!(f, "DROP CONSTRAINT {}", name),
367            AlterTableOperation::DropColumn {
368                column_name,
369                if_exists,
370                cascade,
371            } => write!(
372                f,
373                "DROP COLUMN {}{}{}",
374                if *if_exists { "IF EXISTS " } else { "" },
375                column_name,
376                if *cascade { " CASCADE" } else { "" }
377            ),
378            AlterTableOperation::RenameColumn {
379                old_column_name,
380                new_column_name,
381            } => write!(
382                f,
383                "RENAME COLUMN {} TO {}",
384                old_column_name, new_column_name
385            ),
386            AlterTableOperation::RenameTable { table_name } => {
387                write!(f, "RENAME TO {}", table_name)
388            }
389            AlterTableOperation::ChangeColumn {
390                old_name,
391                new_name,
392                data_type,
393                options,
394            } => {
395                write!(f, "CHANGE COLUMN {} {} {}", old_name, new_name, data_type)?;
396                if options.is_empty() {
397                    Ok(())
398                } else {
399                    write!(f, " {}", display_separated(options, " "))
400                }
401            }
402            AlterTableOperation::RenameConstraint { old_name, new_name } => {
403                write!(f, "RENAME CONSTRAINT {} TO {}", old_name, new_name)
404            }
405            AlterTableOperation::ChangeOwner { new_owner_name } => {
406                write!(f, "OWNER TO {}", new_owner_name)
407            }
408            AlterTableOperation::SetSchema { new_schema_name } => {
409                write!(f, "SET SCHEMA {}", new_schema_name)
410            }
411            AlterTableOperation::SetParallelism {
412                parallelism,
413                deferred,
414            } => {
415                write!(
416                    f,
417                    "SET PARALLELISM TO {}{}",
418                    parallelism,
419                    if *deferred { " DEFERRED" } else { "" }
420                )
421            }
422            AlterTableOperation::SetConfig { entries } => {
423                write!(f, "SET CONFIG ({})", display_comma_separated(entries))
424            }
425            AlterTableOperation::ResetConfig { keys } => {
426                write!(f, "RESET CONFIG ({})", display_comma_separated(keys))
427            }
428            AlterTableOperation::RefreshSchema => {
429                write!(f, "REFRESH SCHEMA")
430            }
431            AlterTableOperation::SetSourceRateLimit { rate_limit } => {
432                write!(f, "SET SOURCE_RATE_LIMIT TO {}", rate_limit)
433            }
434            AlterTableOperation::SetBackfillRateLimit { rate_limit } => {
435                write!(f, "SET BACKFILL_RATE_LIMIT TO {}", rate_limit)
436            }
437            AlterTableOperation::SetDmlRateLimit { rate_limit } => {
438                write!(f, "SET DML_RATE_LIMIT TO {}", rate_limit)
439            }
440            AlterTableOperation::SwapRenameTable { target_table } => {
441                write!(f, "SWAP WITH {}", target_table)
442            }
443            AlterTableOperation::DropConnector => {
444                write!(f, "DROP CONNECTOR")
445            }
446            AlterTableOperation::AlterConnectorProps { alter_props } => {
447                write!(
448                    f,
449                    "CONNECTOR WITH ({})",
450                    display_comma_separated(alter_props)
451                )
452            }
453        }
454    }
455}
456
457impl fmt::Display for AlterIndexOperation {
458    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
459        match self {
460            AlterIndexOperation::RenameIndex { index_name } => {
461                write!(f, "RENAME TO {index_name}")
462            }
463            AlterIndexOperation::SetParallelism {
464                parallelism,
465                deferred,
466            } => {
467                write!(
468                    f,
469                    "SET PARALLELISM TO {}{}",
470                    parallelism,
471                    if *deferred { " DEFERRED" } else { "" }
472                )
473            }
474            AlterIndexOperation::SetConfig { entries } => {
475                write!(f, "SET CONFIG ({})", display_comma_separated(entries))
476            }
477            AlterIndexOperation::ResetConfig { keys } => {
478                write!(f, "RESET CONFIG ({})", display_comma_separated(keys))
479            }
480        }
481    }
482}
483
484impl fmt::Display for AlterViewOperation {
485    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
486        match self {
487            AlterViewOperation::RenameView { view_name } => {
488                write!(f, "RENAME TO {view_name}")
489            }
490            AlterViewOperation::ChangeOwner { new_owner_name } => {
491                write!(f, "OWNER TO {}", new_owner_name)
492            }
493            AlterViewOperation::SetSchema { new_schema_name } => {
494                write!(f, "SET SCHEMA {}", new_schema_name)
495            }
496            AlterViewOperation::SetParallelism {
497                parallelism,
498                deferred,
499            } => {
500                write!(
501                    f,
502                    "SET PARALLELISM TO {}{}",
503                    parallelism,
504                    if *deferred { " DEFERRED" } else { "" }
505                )
506            }
507            AlterViewOperation::SetBackfillRateLimit { rate_limit } => {
508                write!(f, "SET BACKFILL_RATE_LIMIT TO {}", rate_limit)
509            }
510            AlterViewOperation::SwapRenameView { target_view } => {
511                write!(f, "SWAP WITH {}", target_view)
512            }
513            AlterViewOperation::SetResourceGroup {
514                resource_group,
515                deferred,
516            } => {
517                let deferred = if *deferred { " DEFERRED" } else { "" };
518
519                if let Some(resource_group) = resource_group {
520                    write!(f, "SET RESOURCE_GROUP TO {} {}", resource_group, deferred)
521                } else {
522                    write!(f, "RESET RESOURCE_GROUP {}", deferred)
523                }
524            }
525            AlterViewOperation::SetStreamingEnableUnalignedJoin { enable } => {
526                write!(f, "SET STREAMING_ENABLE_UNALIGNED_JOIN TO {}", enable)
527            }
528            AlterViewOperation::AsQuery { query } => {
529                write!(f, "AS {}", query)
530            }
531            AlterViewOperation::SetConfig { entries } => {
532                write!(f, "SET CONFIG ({})", display_comma_separated(entries))
533            }
534            AlterViewOperation::ResetConfig { keys } => {
535                write!(f, "RESET CONFIG ({})", display_comma_separated(keys))
536            }
537        }
538    }
539}
540
541impl fmt::Display for AlterSinkOperation {
542    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
543        match self {
544            AlterSinkOperation::RenameSink { sink_name } => {
545                write!(f, "RENAME TO {sink_name}")
546            }
547            AlterSinkOperation::ChangeOwner { new_owner_name } => {
548                write!(f, "OWNER TO {}", new_owner_name)
549            }
550            AlterSinkOperation::SetSchema { new_schema_name } => {
551                write!(f, "SET SCHEMA {}", new_schema_name)
552            }
553            AlterSinkOperation::SetParallelism {
554                parallelism,
555                deferred,
556            } => {
557                write!(
558                    f,
559                    "SET PARALLELISM TO {}{}",
560                    parallelism,
561                    if *deferred { " DEFERRED" } else { "" }
562                )
563            }
564            AlterSinkOperation::SetConfig { entries } => {
565                write!(f, "SET CONFIG ({})", display_comma_separated(entries))
566            }
567            AlterSinkOperation::ResetConfig { keys } => {
568                write!(f, "RESET CONFIG ({})", display_comma_separated(keys))
569            }
570            AlterSinkOperation::SwapRenameSink { target_sink } => {
571                write!(f, "SWAP WITH {}", target_sink)
572            }
573            AlterSinkOperation::SetSinkRateLimit { rate_limit } => {
574                write!(f, "SET SINK_RATE_LIMIT TO {}", rate_limit)
575            }
576            AlterSinkOperation::SetBackfillRateLimit { rate_limit } => {
577                write!(f, "SET BACKFILL_RATE_LIMIT TO {}", rate_limit)
578            }
579            AlterSinkOperation::AlterConnectorProps {
580                alter_props: changed_props,
581            } => {
582                write!(
583                    f,
584                    "CONNECTOR WITH ({})",
585                    display_comma_separated(changed_props)
586                )
587            }
588            AlterSinkOperation::SetStreamingEnableUnalignedJoin { enable } => {
589                write!(f, "SET STREAMING_ENABLE_UNALIGNED_JOIN TO {}", enable)
590            }
591        }
592    }
593}
594
595impl fmt::Display for AlterSubscriptionOperation {
596    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
597        match self {
598            AlterSubscriptionOperation::RenameSubscription { subscription_name } => {
599                write!(f, "RENAME TO {subscription_name}")
600            }
601            AlterSubscriptionOperation::ChangeOwner { new_owner_name } => {
602                write!(f, "OWNER TO {}", new_owner_name)
603            }
604            AlterSubscriptionOperation::SetSchema { new_schema_name } => {
605                write!(f, "SET SCHEMA {}", new_schema_name)
606            }
607            AlterSubscriptionOperation::SwapRenameSubscription {
608                target_subscription,
609            } => {
610                write!(f, "SWAP WITH {}", target_subscription)
611            }
612        }
613    }
614}
615
616impl fmt::Display for AlterSourceOperation {
617    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
618        match self {
619            AlterSourceOperation::RenameSource { source_name } => {
620                write!(f, "RENAME TO {source_name}")
621            }
622            AlterSourceOperation::AddColumn { column_def } => {
623                write!(f, "ADD COLUMN {column_def}")
624            }
625            AlterSourceOperation::ChangeOwner { new_owner_name } => {
626                write!(f, "OWNER TO {}", new_owner_name)
627            }
628            AlterSourceOperation::SetSchema { new_schema_name } => {
629                write!(f, "SET SCHEMA {}", new_schema_name)
630            }
631            AlterSourceOperation::FormatEncode { format_encode } => {
632                write!(f, "{format_encode}")
633            }
634            AlterSourceOperation::RefreshSchema => {
635                write!(f, "REFRESH SCHEMA")
636            }
637            AlterSourceOperation::SetSourceRateLimit { rate_limit } => {
638                write!(f, "SET SOURCE_RATE_LIMIT TO {}", rate_limit)
639            }
640            AlterSourceOperation::SwapRenameSource { target_source } => {
641                write!(f, "SWAP WITH {}", target_source)
642            }
643            AlterSourceOperation::SetParallelism {
644                parallelism,
645                deferred,
646            } => {
647                write!(
648                    f,
649                    "SET PARALLELISM TO {}{}",
650                    parallelism,
651                    if *deferred { " DEFERRED" } else { "" }
652                )
653            }
654            AlterSourceOperation::SetConfig { entries } => {
655                write!(f, "SET CONFIG ({})", display_comma_separated(entries))
656            }
657            AlterSourceOperation::ResetConfig { keys } => {
658                write!(f, "RESET CONFIG ({})", display_comma_separated(keys))
659            }
660            AlterSourceOperation::ResetSource => {
661                write!(f, "RESET")
662            }
663            AlterSourceOperation::AlterConnectorProps { alter_props } => {
664                write!(
665                    f,
666                    "CONNECTOR WITH ({})",
667                    display_comma_separated(alter_props)
668                )
669            }
670        }
671    }
672}
673
674impl fmt::Display for AlterFunctionOperation {
675    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
676        match self {
677            AlterFunctionOperation::SetSchema { new_schema_name } => {
678                write!(f, "SET SCHEMA {new_schema_name}")
679            }
680            AlterFunctionOperation::ChangeOwner { new_owner_name } => {
681                write!(f, "OWNER TO {new_owner_name}")
682            }
683        }
684    }
685}
686
687impl fmt::Display for AlterConnectionOperation {
688    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
689        match self {
690            AlterConnectionOperation::SetSchema { new_schema_name } => {
691                write!(f, "SET SCHEMA {new_schema_name}")
692            }
693            AlterConnectionOperation::ChangeOwner { new_owner_name } => {
694                write!(f, "OWNER TO {new_owner_name}")
695            }
696            AlterConnectionOperation::AlterConnectorProps { alter_props } => {
697                write!(
698                    f,
699                    "CONNECTOR WITH ({})",
700                    display_comma_separated(alter_props)
701                )
702            }
703        }
704    }
705}
706
707impl fmt::Display for AlterSecretOperation {
708    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
709        match self {
710            AlterSecretOperation::ChangeCredential {
711                new_credential,
712                with_options,
713            } => {
714                write!(
715                    f,
716                    "WITH ({}) AS {}",
717                    display_comma_separated(with_options),
718                    new_credential
719                )
720            }
721            AlterSecretOperation::ChangeOwner { new_owner_name } => {
722                write!(f, "OWNER TO {new_owner_name}")
723            }
724        }
725    }
726}
727
728/// An `ALTER COLUMN` (`Statement::AlterTable`) operation
729#[derive(Debug, Clone, PartialEq, Eq, Hash)]
730pub enum AlterColumnOperation {
731    /// `SET NOT NULL`
732    SetNotNull,
733    /// `DROP NOT NULL`
734    DropNotNull,
735    /// `SET DEFAULT <expr>`
736    SetDefault { value: Expr },
737    /// `DROP DEFAULT`
738    DropDefault,
739    /// `[SET DATA] TYPE <data_type> [USING <expr>]`
740    SetDataType {
741        data_type: DataType,
742        /// PostgreSQL specific
743        using: Option<Expr>,
744    },
745}
746
747impl fmt::Display for AlterColumnOperation {
748    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
749        match self {
750            AlterColumnOperation::SetNotNull => write!(f, "SET NOT NULL",),
751            AlterColumnOperation::DropNotNull => write!(f, "DROP NOT NULL",),
752            AlterColumnOperation::SetDefault { value } => {
753                write!(f, "SET DEFAULT {}", value)
754            }
755            AlterColumnOperation::DropDefault => {
756                write!(f, "DROP DEFAULT")
757            }
758            AlterColumnOperation::SetDataType { data_type, using } => {
759                if let Some(expr) = using {
760                    write!(f, "SET DATA TYPE {} USING {}", data_type, expr)
761                } else {
762                    write!(f, "SET DATA TYPE {}", data_type)
763                }
764            }
765        }
766    }
767}
768
769impl fmt::Display for AlterFragmentOperation {
770    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
771        match self {
772            AlterFragmentOperation::AlterBackfillRateLimit { rate_limit } => {
773                write!(f, "SET BACKFILL_RATE_LIMIT TO {}", rate_limit)
774            }
775            AlterFragmentOperation::SetParallelism { parallelism } => {
776                write!(f, "SET PARALLELISM TO {}", parallelism)
777            }
778        }
779    }
780}
781
782/// The watermark on source.
783/// `WATERMARK FOR <column> AS (<expr>)`
784#[derive(Debug, Clone, PartialEq, Eq, Hash)]
785pub struct SourceWatermark {
786    pub column: Ident,
787    pub expr: Expr,
788    /// Whether `WITH TTL` is specified.
789    pub with_ttl: bool,
790}
791
792impl fmt::Display for SourceWatermark {
793    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
794        write!(f, "WATERMARK FOR {} AS {}", self.column, self.expr,)?;
795        if self.with_ttl {
796            write!(f, " WITH TTL")?;
797        }
798        Ok(())
799    }
800}
801
802/// A table-level constraint, specified in a `CREATE TABLE` or an
803/// `ALTER TABLE ADD <constraint>` statement.
804#[derive(Debug, Clone, PartialEq, Eq, Hash)]
805pub enum TableConstraint {
806    /// `[ CONSTRAINT <name> ] { PRIMARY KEY | UNIQUE } (<columns>)`
807    Unique {
808        name: Option<Ident>,
809        columns: Vec<Ident>,
810        /// Whether this is a `PRIMARY KEY` or just a `UNIQUE` constraint
811        is_primary: bool,
812    },
813    /// A referential integrity constraint (`[ CONSTRAINT <name> ] FOREIGN KEY (<columns>)
814    /// REFERENCES <foreign_table> (<referred_columns>)
815    /// { [ON DELETE <referential_action>] [ON UPDATE <referential_action>] |
816    ///   [ON UPDATE <referential_action>] [ON DELETE <referential_action>]
817    /// }`).
818    ForeignKey {
819        name: Option<Ident>,
820        columns: Vec<Ident>,
821        foreign_table: ObjectName,
822        referred_columns: Vec<Ident>,
823        on_delete: Option<ReferentialAction>,
824        on_update: Option<ReferentialAction>,
825    },
826    /// `[ CONSTRAINT <name> ] CHECK (<expr>)`
827    Check {
828        name: Option<Ident>,
829        expr: Box<Expr>,
830    },
831}
832
833impl fmt::Display for TableConstraint {
834    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
835        match self {
836            TableConstraint::Unique {
837                name,
838                columns,
839                is_primary,
840            } => write!(
841                f,
842                "{}{} ({})",
843                display_constraint_name(name),
844                if *is_primary { "PRIMARY KEY" } else { "UNIQUE" },
845                display_comma_separated(columns)
846            ),
847            TableConstraint::ForeignKey {
848                name,
849                columns,
850                foreign_table,
851                referred_columns,
852                on_delete,
853                on_update,
854            } => {
855                write!(
856                    f,
857                    "{}FOREIGN KEY ({}) REFERENCES {}({})",
858                    display_constraint_name(name),
859                    display_comma_separated(columns),
860                    foreign_table,
861                    display_comma_separated(referred_columns),
862                )?;
863                if let Some(action) = on_delete {
864                    write!(f, " ON DELETE {}", action)?;
865                }
866                if let Some(action) = on_update {
867                    write!(f, " ON UPDATE {}", action)?;
868                }
869                Ok(())
870            }
871            TableConstraint::Check { name, expr } => {
872                write!(f, "{}CHECK ({})", display_constraint_name(name), expr)
873            }
874        }
875    }
876}
877
878/// SQL column definition
879#[derive(Debug, Clone, PartialEq, Eq, Hash)]
880pub struct ColumnDef {
881    pub name: Ident,
882    pub data_type: Option<DataType>,
883    pub collation: Option<ObjectName>,
884    pub options: Vec<ColumnOptionDef>,
885}
886
887impl ColumnDef {
888    pub fn new(
889        name: Ident,
890        data_type: DataType,
891        collation: Option<ObjectName>,
892        options: Vec<ColumnOptionDef>,
893    ) -> Self {
894        ColumnDef {
895            name,
896            data_type: Some(data_type),
897            collation,
898            options,
899        }
900    }
901
902    pub fn is_generated(&self) -> bool {
903        self.options
904            .iter()
905            .any(|option| matches!(option.option, ColumnOption::GeneratedColumns(_)))
906    }
907}
908
909impl fmt::Display for ColumnDef {
910    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
911        write!(
912            f,
913            "{} {}",
914            self.name,
915            if let Some(data_type) = &self.data_type {
916                data_type.to_string()
917            } else {
918                "None".to_owned()
919            }
920        )?;
921        for option in &self.options {
922            write!(f, " {}", option)?;
923        }
924        Ok(())
925    }
926}
927
928/// An optionally-named `ColumnOption`: `[ CONSTRAINT <name> ] <column-option>`.
929///
930/// Note that implementations are substantially more permissive than the ANSI
931/// specification on what order column options can be presented in, and whether
932/// they are allowed to be named. The specification distinguishes between
933/// constraints (NOT NULL, UNIQUE, PRIMARY KEY, and CHECK), which can be named
934/// and can appear in any order, and other options (DEFAULT, GENERATED), which
935/// cannot be named and must appear in a fixed order. PostgreSQL, however,
936/// allows preceding any option with `CONSTRAINT <name>`, even those that are
937/// not really constraints, like NULL and DEFAULT. MSSQL is less permissive,
938/// allowing DEFAULT, UNIQUE, PRIMARY KEY and CHECK to be named, but not NULL or
939/// NOT NULL constraints (the last of which is in violation of the spec).
940///
941/// For maximum flexibility, we don't distinguish between constraint and
942/// non-constraint options, lumping them all together under the umbrella of
943/// "column options," and we allow any column option to be named.
944#[derive(Debug, Clone, PartialEq, Eq, Hash)]
945pub struct ColumnOptionDef {
946    pub name: Option<Ident>,
947    pub option: ColumnOption,
948}
949
950impl fmt::Display for ColumnOptionDef {
951    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
952        write!(f, "{}{}", display_constraint_name(&self.name), self.option)
953    }
954}
955
956/// `ColumnOption`s are modifiers that follow a column definition in a `CREATE
957/// TABLE` statement.
958#[derive(Debug, Clone, PartialEq, Eq, Hash)]
959pub enum ColumnOption {
960    /// `NULL`
961    Null,
962    /// `NOT NULL`
963    NotNull,
964    /// `DEFAULT <restricted-expr>`
965    DefaultValue(Expr),
966    /// Default value from previous bound `DefaultColumnDesc`. Used internally
967    /// for schema change and should not be specified by users.
968    DefaultValueInternal {
969        /// Protobuf encoded `DefaultColumnDesc`.
970        persisted: Box<[u8]>,
971        /// Optional AST for unparsing. If `None`, the default value will be
972        /// shown as `DEFAULT INTERNAL` which is for demonstrating and should
973        /// not be specified by users.
974        expr: Option<Expr>,
975    },
976    /// `{ PRIMARY KEY | UNIQUE }`
977    Unique { is_primary: bool },
978    /// A referential integrity constraint (`[FOREIGN KEY REFERENCES
979    /// <foreign_table> (<referred_columns>)
980    /// { [ON DELETE <referential_action>] [ON UPDATE <referential_action>] |
981    ///   [ON UPDATE <referential_action>] [ON DELETE <referential_action>]
982    /// }`).
983    ForeignKey {
984        foreign_table: ObjectName,
985        referred_columns: Vec<Ident>,
986        on_delete: Option<ReferentialAction>,
987        on_update: Option<ReferentialAction>,
988    },
989    /// `CHECK (<expr>)`
990    Check(Expr),
991    /// Dialect-specific options, such as:
992    /// - MySQL's `AUTO_INCREMENT` or SQLite's `AUTOINCREMENT`
993    /// - ...
994    DialectSpecific(Vec<Token>),
995    /// AS ( <generation_expr> )`
996    GeneratedColumns(Expr),
997}
998
999impl fmt::Display for ColumnOption {
1000    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1001        use ColumnOption::*;
1002        match self {
1003            Null => write!(f, "NULL"),
1004            NotNull => write!(f, "NOT NULL"),
1005            DefaultValue(expr) => write!(f, "DEFAULT {}", expr),
1006            DefaultValueInternal { persisted: _, expr } => {
1007                if let Some(expr) = expr {
1008                    write!(f, "DEFAULT {}", expr)
1009                } else {
1010                    write!(f, "DEFAULT INTERNAL")
1011                }
1012            }
1013            Unique { is_primary } => {
1014                write!(f, "{}", if *is_primary { "PRIMARY KEY" } else { "UNIQUE" })
1015            }
1016            ForeignKey {
1017                foreign_table,
1018                referred_columns,
1019                on_delete,
1020                on_update,
1021            } => {
1022                write!(f, "REFERENCES {}", foreign_table)?;
1023                if !referred_columns.is_empty() {
1024                    write!(f, " ({})", display_comma_separated(referred_columns))?;
1025                }
1026                if let Some(action) = on_delete {
1027                    write!(f, " ON DELETE {}", action)?;
1028                }
1029                if let Some(action) = on_update {
1030                    write!(f, " ON UPDATE {}", action)?;
1031                }
1032                Ok(())
1033            }
1034            Check(expr) => write!(f, "CHECK ({})", expr),
1035            DialectSpecific(val) => write!(f, "{}", display_separated(val, " ")),
1036            GeneratedColumns(expr) => write!(f, "AS {}", expr),
1037        }
1038    }
1039}
1040
1041fn display_constraint_name(name: &'_ Option<Ident>) -> impl fmt::Display + '_ {
1042    struct ConstraintName<'a>(&'a Option<Ident>);
1043    impl fmt::Display for ConstraintName<'_> {
1044        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1045            if let Some(name) = self.0 {
1046                write!(f, "CONSTRAINT {} ", name)?;
1047            }
1048            Ok(())
1049        }
1050    }
1051    ConstraintName(name)
1052}
1053
1054/// `<referential_action> =
1055/// { RESTRICT | CASCADE | SET NULL | NO ACTION | SET DEFAULT }`
1056///
1057/// Used in foreign key constraints in `ON UPDATE` and `ON DELETE` options.
1058#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1059pub enum ReferentialAction {
1060    Restrict,
1061    Cascade,
1062    SetNull,
1063    NoAction,
1064    SetDefault,
1065}
1066
1067impl fmt::Display for ReferentialAction {
1068    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1069        f.write_str(match self {
1070            ReferentialAction::Restrict => "RESTRICT",
1071            ReferentialAction::Cascade => "CASCADE",
1072            ReferentialAction::SetNull => "SET NULL",
1073            ReferentialAction::NoAction => "NO ACTION",
1074            ReferentialAction::SetDefault => "SET DEFAULT",
1075        })
1076    }
1077}
1078
1079/// secure secret definition for webhook source
1080#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1081pub struct WebhookSourceInfo {
1082    pub secret_ref: Option<SecretRefValue>,
1083    pub signature_expr: Option<Expr>,
1084    pub wait_for_persistence: bool,
1085    pub is_batched: bool,
1086}