risingwave_sqlparser/ast/
ddl.rs

1// Licensed under the Apache License, Version 2.0 (the "License");
2// you may not use this file except in compliance with the License.
3// You may obtain a copy of the License at
4//
5//     http://www.apache.org/licenses/LICENSE-2.0
6//
7// Unless required by applicable law or agreed to in writing, software
8// distributed under the License is distributed on an "AS IS" BASIS,
9// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10// See the License for the specific language governing permissions and
11// limitations under the License.
12
13//! AST types specific to CREATE/ALTER variants of [`crate::ast::Statement`]
14//! (commonly referred to as Data Definition Language, or DDL)
15
16use std::fmt;
17
18use super::{ConfigParam, FormatEncodeOptions, SqlOption};
19use crate::ast::{
20    DataType, Expr, Ident, ObjectName, Query, SecretRefValue, SetVariableValue, Value,
21    display_comma_separated, display_separated,
22};
23use crate::tokenizer::Token;
24
25#[derive(Debug, Clone, PartialEq, Eq, Hash)]
26pub enum AlterDatabaseOperation {
27    ChangeOwner { new_owner_name: Ident },
28    RenameDatabase { database_name: ObjectName },
29    SetParam(ConfigParam),
30}
31
32#[derive(Debug, Clone, PartialEq, Eq, Hash)]
33pub enum AlterSchemaOperation {
34    ChangeOwner { new_owner_name: Ident },
35    RenameSchema { schema_name: ObjectName },
36    SwapRenameSchema { target_schema: ObjectName },
37}
38
39/// An `ALTER TABLE` (`Statement::AlterTable`) operation
40#[derive(Debug, Clone, PartialEq, Eq, Hash)]
41pub enum AlterTableOperation {
42    /// `ADD <table_constraint>`
43    AddConstraint(TableConstraint),
44    /// `ADD [ COLUMN ] <column_def>`
45    AddColumn {
46        column_def: ColumnDef,
47    },
48    /// TODO: implement `DROP CONSTRAINT <name>`
49    DropConstraint {
50        name: Ident,
51    },
52    /// `DROP [ COLUMN ] [ IF EXISTS ] <column_name> [ CASCADE ]`
53    DropColumn {
54        column_name: Ident,
55        if_exists: bool,
56        cascade: bool,
57    },
58    /// `RENAME [ COLUMN ] <old_column_name> TO <new_column_name>`
59    RenameColumn {
60        old_column_name: Ident,
61        new_column_name: Ident,
62    },
63    /// `RENAME TO <table_name>`
64    RenameTable {
65        table_name: ObjectName,
66    },
67    // CHANGE [ COLUMN ] <old_name> <new_name> <data_type> [ <options> ]
68    ChangeColumn {
69        old_name: Ident,
70        new_name: Ident,
71        data_type: DataType,
72        options: Vec<ColumnOption>,
73    },
74    /// `RENAME CONSTRAINT <old_constraint_name> TO <new_constraint_name>`
75    ///
76    /// Note: this is a PostgreSQL-specific operation.
77    RenameConstraint {
78        old_name: Ident,
79        new_name: Ident,
80    },
81    /// `ALTER [ COLUMN ]`
82    AlterColumn {
83        column_name: Ident,
84        op: AlterColumnOperation,
85    },
86    /// `OWNER TO <owner_name>`
87    ChangeOwner {
88        new_owner_name: Ident,
89    },
90    /// `SET SCHEMA <schema_name>`
91    SetSchema {
92        new_schema_name: ObjectName,
93    },
94    /// `SET PARALLELISM TO <parallelism> [ DEFERRED ]`
95    SetParallelism {
96        parallelism: SetVariableValue,
97        deferred: bool,
98    },
99    /// `SET CONFIG (key = value, ...)`
100    SetConfig {
101        entries: Vec<SqlOption>,
102    },
103    /// `RESET CONFIG (key, ...)`
104    ResetConfig {
105        keys: Vec<ObjectName>,
106    },
107    RefreshSchema,
108    /// `SET SOURCE_RATE_LIMIT TO <rate_limit>`
109    SetSourceRateLimit {
110        rate_limit: i32,
111    },
112    /// SET BACKFILL_RATE_LIMIT TO <rate_limit>
113    SetBackfillRateLimit {
114        rate_limit: i32,
115    },
116    /// `SET DML_RATE_LIMIT TO <rate_limit>`
117    SetDmlRateLimit {
118        rate_limit: i32,
119    },
120    /// `SWAP WITH <table_name>`
121    SwapRenameTable {
122        target_table: ObjectName,
123    },
124    /// `DROP CONNECTOR`
125    DropConnector,
126
127    /// `ALTER CONNECTOR WITH (<connector_props>)`
128    AlterConnectorProps {
129        alter_props: Vec<SqlOption>,
130    },
131}
132
133#[derive(Debug, Clone, PartialEq, Eq, Hash)]
134pub enum AlterIndexOperation {
135    RenameIndex {
136        index_name: ObjectName,
137    },
138    /// `SET PARALLELISM TO <parallelism> [ DEFERRED ]`
139    SetParallelism {
140        parallelism: SetVariableValue,
141        deferred: bool,
142    },
143    /// `SET CONFIG (key = value, ...)`
144    SetConfig {
145        entries: Vec<SqlOption>,
146    },
147    /// `RESET CONFIG (key, ...)`
148    ResetConfig {
149        keys: Vec<ObjectName>,
150    },
151}
152
153#[derive(Debug, Clone, PartialEq, Eq, Hash)]
154pub enum AlterViewOperation {
155    RenameView {
156        view_name: ObjectName,
157    },
158    ChangeOwner {
159        new_owner_name: Ident,
160    },
161    SetSchema {
162        new_schema_name: ObjectName,
163    },
164    /// `SET PARALLELISM TO <parallelism> [ DEFERRED ]`
165    SetParallelism {
166        parallelism: SetVariableValue,
167        deferred: bool,
168    },
169    /// `SET RESOURCE_GROUP TO 'RESOURCE GROUP' [ DEFERRED ]`
170    /// `RESET RESOURCE_GROUP [ DEFERRED ]`
171    SetResourceGroup {
172        resource_group: Option<SetVariableValue>,
173        deferred: bool,
174    },
175    /// `SET BACKFILL_RATE_LIMIT TO <rate_limit>`
176    SetBackfillRateLimit {
177        rate_limit: i32,
178    },
179    /// `SWAP WITH <view_name>`
180    SwapRenameView {
181        target_view: ObjectName,
182    },
183    SetStreamingEnableUnalignedJoin {
184        enable: bool,
185    },
186    /// `AS <query>`
187    AsQuery {
188        query: Box<Query>,
189    },
190    /// `SET CONFIG ( streaming.some_config_key = <some_config_value>, .. )`
191    SetConfig {
192        entries: Vec<SqlOption>,
193    },
194    /// `RESET CONFIG ( streaming.some_config_key, .. )`
195    ResetConfig {
196        keys: Vec<ObjectName>,
197    },
198}
199
200#[derive(Debug, Clone, PartialEq, Eq, Hash)]
201pub enum AlterSinkOperation {
202    RenameSink {
203        sink_name: ObjectName,
204    },
205    ChangeOwner {
206        new_owner_name: Ident,
207    },
208    SetSchema {
209        new_schema_name: ObjectName,
210    },
211    /// `SET PARALLELISM TO <parallelism> [ DEFERRED ]`
212    SetParallelism {
213        parallelism: SetVariableValue,
214        deferred: bool,
215    },
216    /// `SET CONFIG (key = value, ...)`
217    SetConfig {
218        entries: Vec<SqlOption>,
219    },
220    /// `RESET CONFIG (key, ...)`
221    ResetConfig {
222        keys: Vec<ObjectName>,
223    },
224    /// `SWAP WITH <sink_name>`
225    SwapRenameSink {
226        target_sink: ObjectName,
227    },
228    SetSinkRateLimit {
229        rate_limit: i32,
230    },
231    AlterConnectorProps {
232        alter_props: Vec<SqlOption>,
233    },
234    SetStreamingEnableUnalignedJoin {
235        enable: bool,
236    },
237}
238
239#[derive(Debug, Clone, PartialEq, Eq, Hash)]
240pub enum AlterSubscriptionOperation {
241    RenameSubscription { subscription_name: ObjectName },
242    ChangeOwner { new_owner_name: Ident },
243    SetSchema { new_schema_name: ObjectName },
244    SwapRenameSubscription { target_subscription: ObjectName },
245}
246
247#[derive(Debug, Clone, PartialEq, Eq, Hash)]
248pub enum AlterSourceOperation {
249    RenameSource {
250        source_name: ObjectName,
251    },
252    AddColumn {
253        column_def: ColumnDef,
254    },
255    ChangeOwner {
256        new_owner_name: Ident,
257    },
258    SetSchema {
259        new_schema_name: ObjectName,
260    },
261    FormatEncode {
262        format_encode: FormatEncodeOptions,
263    },
264    RefreshSchema,
265    SetSourceRateLimit {
266        rate_limit: i32,
267    },
268    SwapRenameSource {
269        target_source: ObjectName,
270    },
271    /// `SET PARALLELISM TO <parallelism> [ DEFERRED ]`
272    SetParallelism {
273        parallelism: SetVariableValue,
274        deferred: bool,
275    },
276    /// `SET CONFIG (key = value, ...)`
277    SetConfig {
278        entries: Vec<SqlOption>,
279    },
280    /// `RESET CONFIG (key, ...)`
281    ResetConfig {
282        keys: Vec<ObjectName>,
283    },
284    AlterConnectorProps {
285        alter_props: Vec<SqlOption>,
286    },
287}
288
289#[derive(Debug, Clone, PartialEq, Eq, Hash)]
290pub enum AlterFunctionOperation {
291    SetSchema { new_schema_name: ObjectName },
292}
293
294#[derive(Debug, Clone, PartialEq, Eq, Hash)]
295pub enum AlterConnectionOperation {
296    SetSchema { new_schema_name: ObjectName },
297    ChangeOwner { new_owner_name: Ident },
298    AlterConnectorProps { alter_props: Vec<SqlOption> },
299}
300
301#[derive(Debug, Clone, PartialEq, Eq, Hash)]
302pub enum AlterSecretOperation {
303    ChangeCredential { new_credential: Value },
304}
305
306#[derive(Debug, Clone, PartialEq, Eq, Hash)]
307pub enum AlterFragmentOperation {
308    AlterBackfillRateLimit { rate_limit: i32 },
309    SetParallelism { parallelism: SetVariableValue },
310}
311
312impl fmt::Display for AlterDatabaseOperation {
313    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
314        match self {
315            AlterDatabaseOperation::ChangeOwner { new_owner_name } => {
316                write!(f, "OWNER TO {}", new_owner_name)
317            }
318            AlterDatabaseOperation::RenameDatabase { database_name } => {
319                write!(f, "RENAME TO {}", database_name)
320            }
321            AlterDatabaseOperation::SetParam(ConfigParam { param, value }) => {
322                write!(f, "SET {} TO {}", param, value)
323            }
324        }
325    }
326}
327
328impl fmt::Display for AlterSchemaOperation {
329    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
330        match self {
331            AlterSchemaOperation::ChangeOwner { new_owner_name } => {
332                write!(f, "OWNER TO {}", new_owner_name)
333            }
334            AlterSchemaOperation::RenameSchema { schema_name } => {
335                write!(f, "RENAME TO {}", schema_name)
336            }
337            AlterSchemaOperation::SwapRenameSchema { target_schema } => {
338                write!(f, "SWAP WITH {}", target_schema)
339            }
340        }
341    }
342}
343
344impl fmt::Display for AlterTableOperation {
345    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
346        match self {
347            AlterTableOperation::AddConstraint(c) => write!(f, "ADD {}", c),
348            AlterTableOperation::AddColumn { column_def } => {
349                write!(f, "ADD COLUMN {}", column_def)
350            }
351            AlterTableOperation::AlterColumn { column_name, op } => {
352                write!(f, "ALTER COLUMN {} {}", column_name, op)
353            }
354            AlterTableOperation::DropConstraint { name } => write!(f, "DROP CONSTRAINT {}", name),
355            AlterTableOperation::DropColumn {
356                column_name,
357                if_exists,
358                cascade,
359            } => write!(
360                f,
361                "DROP COLUMN {}{}{}",
362                if *if_exists { "IF EXISTS " } else { "" },
363                column_name,
364                if *cascade { " CASCADE" } else { "" }
365            ),
366            AlterTableOperation::RenameColumn {
367                old_column_name,
368                new_column_name,
369            } => write!(
370                f,
371                "RENAME COLUMN {} TO {}",
372                old_column_name, new_column_name
373            ),
374            AlterTableOperation::RenameTable { table_name } => {
375                write!(f, "RENAME TO {}", table_name)
376            }
377            AlterTableOperation::ChangeColumn {
378                old_name,
379                new_name,
380                data_type,
381                options,
382            } => {
383                write!(f, "CHANGE COLUMN {} {} {}", old_name, new_name, data_type)?;
384                if options.is_empty() {
385                    Ok(())
386                } else {
387                    write!(f, " {}", display_separated(options, " "))
388                }
389            }
390            AlterTableOperation::RenameConstraint { old_name, new_name } => {
391                write!(f, "RENAME CONSTRAINT {} TO {}", old_name, new_name)
392            }
393            AlterTableOperation::ChangeOwner { new_owner_name } => {
394                write!(f, "OWNER TO {}", new_owner_name)
395            }
396            AlterTableOperation::SetSchema { new_schema_name } => {
397                write!(f, "SET SCHEMA {}", new_schema_name)
398            }
399            AlterTableOperation::SetParallelism {
400                parallelism,
401                deferred,
402            } => {
403                write!(
404                    f,
405                    "SET PARALLELISM TO {}{}",
406                    parallelism,
407                    if *deferred { " DEFERRED" } else { "" }
408                )
409            }
410            AlterTableOperation::SetConfig { entries } => {
411                write!(f, "SET CONFIG ({})", display_comma_separated(entries))
412            }
413            AlterTableOperation::ResetConfig { keys } => {
414                write!(f, "RESET CONFIG ({})", display_comma_separated(keys))
415            }
416            AlterTableOperation::RefreshSchema => {
417                write!(f, "REFRESH SCHEMA")
418            }
419            AlterTableOperation::SetSourceRateLimit { rate_limit } => {
420                write!(f, "SET SOURCE_RATE_LIMIT TO {}", rate_limit)
421            }
422            AlterTableOperation::SetBackfillRateLimit { rate_limit } => {
423                write!(f, "SET BACKFILL_RATE_LIMIT TO {}", rate_limit)
424            }
425            AlterTableOperation::SetDmlRateLimit { rate_limit } => {
426                write!(f, "SET DML_RATE_LIMIT TO {}", rate_limit)
427            }
428            AlterTableOperation::SwapRenameTable { target_table } => {
429                write!(f, "SWAP WITH {}", target_table)
430            }
431            AlterTableOperation::DropConnector => {
432                write!(f, "DROP CONNECTOR")
433            }
434            AlterTableOperation::AlterConnectorProps { alter_props } => {
435                write!(
436                    f,
437                    "CONNECTOR WITH ({})",
438                    display_comma_separated(alter_props)
439                )
440            }
441        }
442    }
443}
444
445impl fmt::Display for AlterIndexOperation {
446    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
447        match self {
448            AlterIndexOperation::RenameIndex { index_name } => {
449                write!(f, "RENAME TO {index_name}")
450            }
451            AlterIndexOperation::SetParallelism {
452                parallelism,
453                deferred,
454            } => {
455                write!(
456                    f,
457                    "SET PARALLELISM TO {}{}",
458                    parallelism,
459                    if *deferred { " DEFERRED" } else { "" }
460                )
461            }
462            AlterIndexOperation::SetConfig { entries } => {
463                write!(f, "SET CONFIG ({})", display_comma_separated(entries))
464            }
465            AlterIndexOperation::ResetConfig { keys } => {
466                write!(f, "RESET CONFIG ({})", display_comma_separated(keys))
467            }
468        }
469    }
470}
471
472impl fmt::Display for AlterViewOperation {
473    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
474        match self {
475            AlterViewOperation::RenameView { view_name } => {
476                write!(f, "RENAME TO {view_name}")
477            }
478            AlterViewOperation::ChangeOwner { new_owner_name } => {
479                write!(f, "OWNER TO {}", new_owner_name)
480            }
481            AlterViewOperation::SetSchema { new_schema_name } => {
482                write!(f, "SET SCHEMA {}", new_schema_name)
483            }
484            AlterViewOperation::SetParallelism {
485                parallelism,
486                deferred,
487            } => {
488                write!(
489                    f,
490                    "SET PARALLELISM TO {}{}",
491                    parallelism,
492                    if *deferred { " DEFERRED" } else { "" }
493                )
494            }
495            AlterViewOperation::SetBackfillRateLimit { rate_limit } => {
496                write!(f, "SET BACKFILL_RATE_LIMIT TO {}", rate_limit)
497            }
498            AlterViewOperation::SwapRenameView { target_view } => {
499                write!(f, "SWAP WITH {}", target_view)
500            }
501            AlterViewOperation::SetResourceGroup {
502                resource_group,
503                deferred,
504            } => {
505                let deferred = if *deferred { " DEFERRED" } else { "" };
506
507                if let Some(resource_group) = resource_group {
508                    write!(f, "SET RESOURCE_GROUP TO {} {}", resource_group, deferred)
509                } else {
510                    write!(f, "RESET RESOURCE_GROUP {}", deferred)
511                }
512            }
513            AlterViewOperation::SetStreamingEnableUnalignedJoin { enable } => {
514                write!(f, "SET STREAMING_ENABLE_UNALIGNED_JOIN TO {}", enable)
515            }
516            AlterViewOperation::AsQuery { query } => {
517                write!(f, "AS {}", query)
518            }
519            AlterViewOperation::SetConfig { entries } => {
520                write!(f, "SET CONFIG ({})", display_comma_separated(entries))
521            }
522            AlterViewOperation::ResetConfig { keys } => {
523                write!(f, "RESET CONFIG ({})", display_comma_separated(keys))
524            }
525        }
526    }
527}
528
529impl fmt::Display for AlterSinkOperation {
530    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
531        match self {
532            AlterSinkOperation::RenameSink { sink_name } => {
533                write!(f, "RENAME TO {sink_name}")
534            }
535            AlterSinkOperation::ChangeOwner { new_owner_name } => {
536                write!(f, "OWNER TO {}", new_owner_name)
537            }
538            AlterSinkOperation::SetSchema { new_schema_name } => {
539                write!(f, "SET SCHEMA {}", new_schema_name)
540            }
541            AlterSinkOperation::SetParallelism {
542                parallelism,
543                deferred,
544            } => {
545                write!(
546                    f,
547                    "SET PARALLELISM TO {}{}",
548                    parallelism,
549                    if *deferred { " DEFERRED" } else { "" }
550                )
551            }
552            AlterSinkOperation::SetConfig { entries } => {
553                write!(f, "SET CONFIG ({})", display_comma_separated(entries))
554            }
555            AlterSinkOperation::ResetConfig { keys } => {
556                write!(f, "RESET CONFIG ({})", display_comma_separated(keys))
557            }
558            AlterSinkOperation::SwapRenameSink { target_sink } => {
559                write!(f, "SWAP WITH {}", target_sink)
560            }
561            AlterSinkOperation::SetSinkRateLimit { rate_limit } => {
562                write!(f, "SET SINK_RATE_LIMIT TO {}", rate_limit)
563            }
564            AlterSinkOperation::AlterConnectorProps {
565                alter_props: changed_props,
566            } => {
567                write!(
568                    f,
569                    "CONNECTOR WITH ({})",
570                    display_comma_separated(changed_props)
571                )
572            }
573            AlterSinkOperation::SetStreamingEnableUnalignedJoin { enable } => {
574                write!(f, "SET STREAMING_ENABLE_UNALIGNED_JOIN TO {}", enable)
575            }
576        }
577    }
578}
579
580impl fmt::Display for AlterSubscriptionOperation {
581    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
582        match self {
583            AlterSubscriptionOperation::RenameSubscription { subscription_name } => {
584                write!(f, "RENAME TO {subscription_name}")
585            }
586            AlterSubscriptionOperation::ChangeOwner { new_owner_name } => {
587                write!(f, "OWNER TO {}", new_owner_name)
588            }
589            AlterSubscriptionOperation::SetSchema { new_schema_name } => {
590                write!(f, "SET SCHEMA {}", new_schema_name)
591            }
592            AlterSubscriptionOperation::SwapRenameSubscription {
593                target_subscription,
594            } => {
595                write!(f, "SWAP WITH {}", target_subscription)
596            }
597        }
598    }
599}
600
601impl fmt::Display for AlterSourceOperation {
602    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
603        match self {
604            AlterSourceOperation::RenameSource { source_name } => {
605                write!(f, "RENAME TO {source_name}")
606            }
607            AlterSourceOperation::AddColumn { column_def } => {
608                write!(f, "ADD COLUMN {column_def}")
609            }
610            AlterSourceOperation::ChangeOwner { new_owner_name } => {
611                write!(f, "OWNER TO {}", new_owner_name)
612            }
613            AlterSourceOperation::SetSchema { new_schema_name } => {
614                write!(f, "SET SCHEMA {}", new_schema_name)
615            }
616            AlterSourceOperation::FormatEncode { format_encode } => {
617                write!(f, "{format_encode}")
618            }
619            AlterSourceOperation::RefreshSchema => {
620                write!(f, "REFRESH SCHEMA")
621            }
622            AlterSourceOperation::SetSourceRateLimit { rate_limit } => {
623                write!(f, "SET SOURCE_RATE_LIMIT TO {}", rate_limit)
624            }
625            AlterSourceOperation::SwapRenameSource { target_source } => {
626                write!(f, "SWAP WITH {}", target_source)
627            }
628            AlterSourceOperation::SetParallelism {
629                parallelism,
630                deferred,
631            } => {
632                write!(
633                    f,
634                    "SET PARALLELISM TO {}{}",
635                    parallelism,
636                    if *deferred { " DEFERRED" } else { "" }
637                )
638            }
639            AlterSourceOperation::SetConfig { entries } => {
640                write!(f, "SET CONFIG ({})", display_comma_separated(entries))
641            }
642            AlterSourceOperation::ResetConfig { keys } => {
643                write!(f, "RESET CONFIG ({})", display_comma_separated(keys))
644            }
645            AlterSourceOperation::AlterConnectorProps { alter_props } => {
646                write!(
647                    f,
648                    "CONNECTOR WITH ({})",
649                    display_comma_separated(alter_props)
650                )
651            }
652        }
653    }
654}
655
656impl fmt::Display for AlterFunctionOperation {
657    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
658        match self {
659            AlterFunctionOperation::SetSchema { new_schema_name } => {
660                write!(f, "SET SCHEMA {new_schema_name}")
661            }
662        }
663    }
664}
665
666impl fmt::Display for AlterConnectionOperation {
667    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
668        match self {
669            AlterConnectionOperation::SetSchema { new_schema_name } => {
670                write!(f, "SET SCHEMA {new_schema_name}")
671            }
672            AlterConnectionOperation::ChangeOwner { new_owner_name } => {
673                write!(f, "OWNER TO {new_owner_name}")
674            }
675            AlterConnectionOperation::AlterConnectorProps { alter_props } => {
676                write!(
677                    f,
678                    "CONNECTOR WITH ({})",
679                    display_comma_separated(alter_props)
680                )
681            }
682        }
683    }
684}
685
686impl fmt::Display for AlterSecretOperation {
687    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
688        match self {
689            AlterSecretOperation::ChangeCredential { new_credential } => {
690                write!(f, "AS {new_credential}")
691            }
692        }
693    }
694}
695
696/// An `ALTER COLUMN` (`Statement::AlterTable`) operation
697#[derive(Debug, Clone, PartialEq, Eq, Hash)]
698pub enum AlterColumnOperation {
699    /// `SET NOT NULL`
700    SetNotNull,
701    /// `DROP NOT NULL`
702    DropNotNull,
703    /// `SET DEFAULT <expr>`
704    SetDefault { value: Expr },
705    /// `DROP DEFAULT`
706    DropDefault,
707    /// `[SET DATA] TYPE <data_type> [USING <expr>]`
708    SetDataType {
709        data_type: DataType,
710        /// PostgreSQL specific
711        using: Option<Expr>,
712    },
713}
714
715impl fmt::Display for AlterColumnOperation {
716    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
717        match self {
718            AlterColumnOperation::SetNotNull => write!(f, "SET NOT NULL",),
719            AlterColumnOperation::DropNotNull => write!(f, "DROP NOT NULL",),
720            AlterColumnOperation::SetDefault { value } => {
721                write!(f, "SET DEFAULT {}", value)
722            }
723            AlterColumnOperation::DropDefault => {
724                write!(f, "DROP DEFAULT")
725            }
726            AlterColumnOperation::SetDataType { data_type, using } => {
727                if let Some(expr) = using {
728                    write!(f, "SET DATA TYPE {} USING {}", data_type, expr)
729                } else {
730                    write!(f, "SET DATA TYPE {}", data_type)
731                }
732            }
733        }
734    }
735}
736
737impl fmt::Display for AlterFragmentOperation {
738    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
739        match self {
740            AlterFragmentOperation::AlterBackfillRateLimit { rate_limit } => {
741                write!(f, "SET BACKFILL_RATE_LIMIT TO {}", rate_limit)
742            }
743            AlterFragmentOperation::SetParallelism { parallelism } => {
744                write!(f, "SET PARALLELISM TO {}", parallelism)
745            }
746        }
747    }
748}
749
750/// The watermark on source.
751/// `WATERMARK FOR <column> AS (<expr>)`
752#[derive(Debug, Clone, PartialEq, Eq, Hash)]
753pub struct SourceWatermark {
754    pub column: Ident,
755    pub expr: Expr,
756    /// Whether `WITH TTL` is specified.
757    pub with_ttl: bool,
758}
759
760impl fmt::Display for SourceWatermark {
761    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
762        write!(f, "WATERMARK FOR {} AS {}", self.column, self.expr,)?;
763        if self.with_ttl {
764            write!(f, " WITH TTL")?;
765        }
766        Ok(())
767    }
768}
769
770/// A table-level constraint, specified in a `CREATE TABLE` or an
771/// `ALTER TABLE ADD <constraint>` statement.
772#[derive(Debug, Clone, PartialEq, Eq, Hash)]
773pub enum TableConstraint {
774    /// `[ CONSTRAINT <name> ] { PRIMARY KEY | UNIQUE } (<columns>)`
775    Unique {
776        name: Option<Ident>,
777        columns: Vec<Ident>,
778        /// Whether this is a `PRIMARY KEY` or just a `UNIQUE` constraint
779        is_primary: bool,
780    },
781    /// A referential integrity constraint (`[ CONSTRAINT <name> ] FOREIGN KEY (<columns>)
782    /// REFERENCES <foreign_table> (<referred_columns>)
783    /// { [ON DELETE <referential_action>] [ON UPDATE <referential_action>] |
784    ///   [ON UPDATE <referential_action>] [ON DELETE <referential_action>]
785    /// }`).
786    ForeignKey {
787        name: Option<Ident>,
788        columns: Vec<Ident>,
789        foreign_table: ObjectName,
790        referred_columns: Vec<Ident>,
791        on_delete: Option<ReferentialAction>,
792        on_update: Option<ReferentialAction>,
793    },
794    /// `[ CONSTRAINT <name> ] CHECK (<expr>)`
795    Check {
796        name: Option<Ident>,
797        expr: Box<Expr>,
798    },
799}
800
801impl fmt::Display for TableConstraint {
802    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
803        match self {
804            TableConstraint::Unique {
805                name,
806                columns,
807                is_primary,
808            } => write!(
809                f,
810                "{}{} ({})",
811                display_constraint_name(name),
812                if *is_primary { "PRIMARY KEY" } else { "UNIQUE" },
813                display_comma_separated(columns)
814            ),
815            TableConstraint::ForeignKey {
816                name,
817                columns,
818                foreign_table,
819                referred_columns,
820                on_delete,
821                on_update,
822            } => {
823                write!(
824                    f,
825                    "{}FOREIGN KEY ({}) REFERENCES {}({})",
826                    display_constraint_name(name),
827                    display_comma_separated(columns),
828                    foreign_table,
829                    display_comma_separated(referred_columns),
830                )?;
831                if let Some(action) = on_delete {
832                    write!(f, " ON DELETE {}", action)?;
833                }
834                if let Some(action) = on_update {
835                    write!(f, " ON UPDATE {}", action)?;
836                }
837                Ok(())
838            }
839            TableConstraint::Check { name, expr } => {
840                write!(f, "{}CHECK ({})", display_constraint_name(name), expr)
841            }
842        }
843    }
844}
845
846/// SQL column definition
847#[derive(Debug, Clone, PartialEq, Eq, Hash)]
848pub struct ColumnDef {
849    pub name: Ident,
850    pub data_type: Option<DataType>,
851    pub collation: Option<ObjectName>,
852    pub options: Vec<ColumnOptionDef>,
853}
854
855impl ColumnDef {
856    pub fn new(
857        name: Ident,
858        data_type: DataType,
859        collation: Option<ObjectName>,
860        options: Vec<ColumnOptionDef>,
861    ) -> Self {
862        ColumnDef {
863            name,
864            data_type: Some(data_type),
865            collation,
866            options,
867        }
868    }
869
870    pub fn is_generated(&self) -> bool {
871        self.options
872            .iter()
873            .any(|option| matches!(option.option, ColumnOption::GeneratedColumns(_)))
874    }
875}
876
877impl fmt::Display for ColumnDef {
878    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
879        write!(
880            f,
881            "{} {}",
882            self.name,
883            if let Some(data_type) = &self.data_type {
884                data_type.to_string()
885            } else {
886                "None".to_owned()
887            }
888        )?;
889        for option in &self.options {
890            write!(f, " {}", option)?;
891        }
892        Ok(())
893    }
894}
895
896/// An optionally-named `ColumnOption`: `[ CONSTRAINT <name> ] <column-option>`.
897///
898/// Note that implementations are substantially more permissive than the ANSI
899/// specification on what order column options can be presented in, and whether
900/// they are allowed to be named. The specification distinguishes between
901/// constraints (NOT NULL, UNIQUE, PRIMARY KEY, and CHECK), which can be named
902/// and can appear in any order, and other options (DEFAULT, GENERATED), which
903/// cannot be named and must appear in a fixed order. PostgreSQL, however,
904/// allows preceding any option with `CONSTRAINT <name>`, even those that are
905/// not really constraints, like NULL and DEFAULT. MSSQL is less permissive,
906/// allowing DEFAULT, UNIQUE, PRIMARY KEY and CHECK to be named, but not NULL or
907/// NOT NULL constraints (the last of which is in violation of the spec).
908///
909/// For maximum flexibility, we don't distinguish between constraint and
910/// non-constraint options, lumping them all together under the umbrella of
911/// "column options," and we allow any column option to be named.
912#[derive(Debug, Clone, PartialEq, Eq, Hash)]
913pub struct ColumnOptionDef {
914    pub name: Option<Ident>,
915    pub option: ColumnOption,
916}
917
918impl fmt::Display for ColumnOptionDef {
919    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
920        write!(f, "{}{}", display_constraint_name(&self.name), self.option)
921    }
922}
923
924/// `ColumnOption`s are modifiers that follow a column definition in a `CREATE
925/// TABLE` statement.
926#[derive(Debug, Clone, PartialEq, Eq, Hash)]
927pub enum ColumnOption {
928    /// `NULL`
929    Null,
930    /// `NOT NULL`
931    NotNull,
932    /// `DEFAULT <restricted-expr>`
933    DefaultValue(Expr),
934    /// Default value from previous bound `DefaultColumnDesc`. Used internally
935    /// for schema change and should not be specified by users.
936    DefaultValueInternal {
937        /// Protobuf encoded `DefaultColumnDesc`.
938        persisted: Box<[u8]>,
939        /// Optional AST for unparsing. If `None`, the default value will be
940        /// shown as `DEFAULT INTERNAL` which is for demonstrating and should
941        /// not be specified by users.
942        expr: Option<Expr>,
943    },
944    /// `{ PRIMARY KEY | UNIQUE }`
945    Unique { is_primary: bool },
946    /// A referential integrity constraint (`[FOREIGN KEY REFERENCES
947    /// <foreign_table> (<referred_columns>)
948    /// { [ON DELETE <referential_action>] [ON UPDATE <referential_action>] |
949    ///   [ON UPDATE <referential_action>] [ON DELETE <referential_action>]
950    /// }`).
951    ForeignKey {
952        foreign_table: ObjectName,
953        referred_columns: Vec<Ident>,
954        on_delete: Option<ReferentialAction>,
955        on_update: Option<ReferentialAction>,
956    },
957    /// `CHECK (<expr>)`
958    Check(Expr),
959    /// Dialect-specific options, such as:
960    /// - MySQL's `AUTO_INCREMENT` or SQLite's `AUTOINCREMENT`
961    /// - ...
962    DialectSpecific(Vec<Token>),
963    /// AS ( <generation_expr> )`
964    GeneratedColumns(Expr),
965}
966
967impl fmt::Display for ColumnOption {
968    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
969        use ColumnOption::*;
970        match self {
971            Null => write!(f, "NULL"),
972            NotNull => write!(f, "NOT NULL"),
973            DefaultValue(expr) => write!(f, "DEFAULT {}", expr),
974            DefaultValueInternal { persisted: _, expr } => {
975                if let Some(expr) = expr {
976                    write!(f, "DEFAULT {}", expr)
977                } else {
978                    write!(f, "DEFAULT INTERNAL")
979                }
980            }
981            Unique { is_primary } => {
982                write!(f, "{}", if *is_primary { "PRIMARY KEY" } else { "UNIQUE" })
983            }
984            ForeignKey {
985                foreign_table,
986                referred_columns,
987                on_delete,
988                on_update,
989            } => {
990                write!(f, "REFERENCES {}", foreign_table)?;
991                if !referred_columns.is_empty() {
992                    write!(f, " ({})", display_comma_separated(referred_columns))?;
993                }
994                if let Some(action) = on_delete {
995                    write!(f, " ON DELETE {}", action)?;
996                }
997                if let Some(action) = on_update {
998                    write!(f, " ON UPDATE {}", action)?;
999                }
1000                Ok(())
1001            }
1002            Check(expr) => write!(f, "CHECK ({})", expr),
1003            DialectSpecific(val) => write!(f, "{}", display_separated(val, " ")),
1004            GeneratedColumns(expr) => write!(f, "AS {}", expr),
1005        }
1006    }
1007}
1008
1009fn display_constraint_name(name: &'_ Option<Ident>) -> impl fmt::Display + '_ {
1010    struct ConstraintName<'a>(&'a Option<Ident>);
1011    impl fmt::Display for ConstraintName<'_> {
1012        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1013            if let Some(name) = self.0 {
1014                write!(f, "CONSTRAINT {} ", name)?;
1015            }
1016            Ok(())
1017        }
1018    }
1019    ConstraintName(name)
1020}
1021
1022/// `<referential_action> =
1023/// { RESTRICT | CASCADE | SET NULL | NO ACTION | SET DEFAULT }`
1024///
1025/// Used in foreign key constraints in `ON UPDATE` and `ON DELETE` options.
1026#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1027pub enum ReferentialAction {
1028    Restrict,
1029    Cascade,
1030    SetNull,
1031    NoAction,
1032    SetDefault,
1033}
1034
1035impl fmt::Display for ReferentialAction {
1036    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1037        f.write_str(match self {
1038            ReferentialAction::Restrict => "RESTRICT",
1039            ReferentialAction::Cascade => "CASCADE",
1040            ReferentialAction::SetNull => "SET NULL",
1041            ReferentialAction::NoAction => "NO ACTION",
1042            ReferentialAction::SetDefault => "SET DEFAULT",
1043        })
1044    }
1045}
1046
1047/// secure secret definition for webhook source
1048#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1049pub struct WebhookSourceInfo {
1050    pub secret_ref: Option<SecretRefValue>,
1051    pub signature_expr: Expr,
1052    pub wait_for_persistence: bool,
1053    pub is_batched: bool,
1054}