risingwave_sqlparser/ast/
ddl.rs

1// Licensed under the Apache License, Version 2.0 (the "License");
2// you may not use this file except in compliance with the License.
3// You may obtain a copy of the License at
4//
5//     http://www.apache.org/licenses/LICENSE-2.0
6//
7// Unless required by applicable law or agreed to in writing, software
8// distributed under the License is distributed on an "AS IS" BASIS,
9// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10// See the License for the specific language governing permissions and
11// limitations under the License.
12
13//! AST types specific to CREATE/ALTER variants of [`crate::ast::Statement`]
14//! (commonly referred to as Data Definition Language, or DDL)
15
16use std::fmt;
17
18use super::{ConfigParam, FormatEncodeOptions, SqlOption};
19use crate::ast::{
20    DataType, Expr, Ident, ObjectName, Query, SecretRefValue, SetVariableValue, Value,
21    display_comma_separated, display_separated,
22};
23use crate::tokenizer::Token;
24
25#[derive(Debug, Clone, PartialEq, Eq, Hash)]
26pub enum AlterDatabaseOperation {
27    ChangeOwner { new_owner_name: Ident },
28    RenameDatabase { database_name: ObjectName },
29    SetParam(ConfigParam),
30}
31
32#[derive(Debug, Clone, PartialEq, Eq, Hash)]
33pub enum AlterSchemaOperation {
34    ChangeOwner { new_owner_name: Ident },
35    RenameSchema { schema_name: ObjectName },
36    SwapRenameSchema { target_schema: ObjectName },
37}
38
39/// An `ALTER TABLE` (`Statement::AlterTable`) operation
40#[derive(Debug, Clone, PartialEq, Eq, Hash)]
41pub enum AlterTableOperation {
42    /// `ADD <table_constraint>`
43    AddConstraint(TableConstraint),
44    /// `ADD [ COLUMN ] <column_def>`
45    AddColumn {
46        column_def: ColumnDef,
47    },
48    /// TODO: implement `DROP CONSTRAINT <name>`
49    DropConstraint {
50        name: Ident,
51    },
52    /// `DROP [ COLUMN ] [ IF EXISTS ] <column_name> [ CASCADE ]`
53    DropColumn {
54        column_name: Ident,
55        if_exists: bool,
56        cascade: bool,
57    },
58    /// `RENAME [ COLUMN ] <old_column_name> TO <new_column_name>`
59    RenameColumn {
60        old_column_name: Ident,
61        new_column_name: Ident,
62    },
63    /// `RENAME TO <table_name>`
64    RenameTable {
65        table_name: ObjectName,
66    },
67    // CHANGE [ COLUMN ] <old_name> <new_name> <data_type> [ <options> ]
68    ChangeColumn {
69        old_name: Ident,
70        new_name: Ident,
71        data_type: DataType,
72        options: Vec<ColumnOption>,
73    },
74    /// `RENAME CONSTRAINT <old_constraint_name> TO <new_constraint_name>`
75    ///
76    /// Note: this is a PostgreSQL-specific operation.
77    RenameConstraint {
78        old_name: Ident,
79        new_name: Ident,
80    },
81    /// `ALTER [ COLUMN ]`
82    AlterColumn {
83        column_name: Ident,
84        op: AlterColumnOperation,
85    },
86    /// `OWNER TO <owner_name>`
87    ChangeOwner {
88        new_owner_name: Ident,
89    },
90    /// `SET SCHEMA <schema_name>`
91    SetSchema {
92        new_schema_name: ObjectName,
93    },
94    /// `SET PARALLELISM TO <parallelism> [ DEFERRED ]`
95    SetParallelism {
96        parallelism: SetVariableValue,
97        deferred: bool,
98    },
99    /// `SET CONFIG (key = value, ...)`
100    SetConfig {
101        entries: Vec<SqlOption>,
102    },
103    /// `RESET CONFIG (key, ...)`
104    ResetConfig {
105        keys: Vec<ObjectName>,
106    },
107    RefreshSchema,
108    /// `SET SOURCE_RATE_LIMIT TO <rate_limit>`
109    SetSourceRateLimit {
110        rate_limit: i32,
111    },
112    /// SET BACKFILL_RATE_LIMIT TO <rate_limit>
113    SetBackfillRateLimit {
114        rate_limit: i32,
115    },
116    /// `SET DML_RATE_LIMIT TO <rate_limit>`
117    SetDmlRateLimit {
118        rate_limit: i32,
119    },
120    /// `SWAP WITH <table_name>`
121    SwapRenameTable {
122        target_table: ObjectName,
123    },
124    /// `DROP CONNECTOR`
125    DropConnector,
126
127    /// `ALTER CONNECTOR WITH (<connector_props>)`
128    AlterConnectorProps {
129        alter_props: Vec<SqlOption>,
130    },
131}
132
133#[derive(Debug, Clone, PartialEq, Eq, Hash)]
134pub enum AlterIndexOperation {
135    RenameIndex {
136        index_name: ObjectName,
137    },
138    /// `SET PARALLELISM TO <parallelism> [ DEFERRED ]`
139    SetParallelism {
140        parallelism: SetVariableValue,
141        deferred: bool,
142    },
143    /// `SET CONFIG (key = value, ...)`
144    SetConfig {
145        entries: Vec<SqlOption>,
146    },
147    /// `RESET CONFIG (key, ...)`
148    ResetConfig {
149        keys: Vec<ObjectName>,
150    },
151}
152
153#[derive(Debug, Clone, PartialEq, Eq, Hash)]
154pub enum AlterViewOperation {
155    RenameView {
156        view_name: ObjectName,
157    },
158    ChangeOwner {
159        new_owner_name: Ident,
160    },
161    SetSchema {
162        new_schema_name: ObjectName,
163    },
164    /// `SET PARALLELISM TO <parallelism> [ DEFERRED ]`
165    SetParallelism {
166        parallelism: SetVariableValue,
167        deferred: bool,
168    },
169    /// `SET RESOURCE_GROUP TO 'RESOURCE GROUP' [ DEFERRED ]`
170    /// `RESET RESOURCE_GROUP [ DEFERRED ]`
171    SetResourceGroup {
172        resource_group: Option<SetVariableValue>,
173        deferred: bool,
174    },
175    /// `SET BACKFILL_RATE_LIMIT TO <rate_limit>`
176    SetBackfillRateLimit {
177        rate_limit: i32,
178    },
179    /// `SWAP WITH <view_name>`
180    SwapRenameView {
181        target_view: ObjectName,
182    },
183    SetStreamingEnableUnalignedJoin {
184        enable: bool,
185    },
186    /// `AS <query>`
187    AsQuery {
188        query: Box<Query>,
189    },
190    /// `SET CONFIG ( streaming.some_config_key = <some_config_value>, .. )`
191    SetConfig {
192        entries: Vec<SqlOption>,
193    },
194    /// `RESET CONFIG ( streaming.some_config_key, .. )`
195    ResetConfig {
196        keys: Vec<ObjectName>,
197    },
198}
199
200#[derive(Debug, Clone, PartialEq, Eq, Hash)]
201pub enum AlterSinkOperation {
202    RenameSink {
203        sink_name: ObjectName,
204    },
205    ChangeOwner {
206        new_owner_name: Ident,
207    },
208    SetSchema {
209        new_schema_name: ObjectName,
210    },
211    /// `SET PARALLELISM TO <parallelism> [ DEFERRED ]`
212    SetParallelism {
213        parallelism: SetVariableValue,
214        deferred: bool,
215    },
216    /// `SET CONFIG (key = value, ...)`
217    SetConfig {
218        entries: Vec<SqlOption>,
219    },
220    /// `RESET CONFIG (key, ...)`
221    ResetConfig {
222        keys: Vec<ObjectName>,
223    },
224    /// `SWAP WITH <sink_name>`
225    SwapRenameSink {
226        target_sink: ObjectName,
227    },
228    SetSinkRateLimit {
229        rate_limit: i32,
230    },
231    AlterConnectorProps {
232        alter_props: Vec<SqlOption>,
233    },
234    SetStreamingEnableUnalignedJoin {
235        enable: bool,
236    },
237}
238
239#[derive(Debug, Clone, PartialEq, Eq, Hash)]
240pub enum AlterSubscriptionOperation {
241    RenameSubscription { subscription_name: ObjectName },
242    ChangeOwner { new_owner_name: Ident },
243    SetSchema { new_schema_name: ObjectName },
244    SwapRenameSubscription { target_subscription: ObjectName },
245}
246
247#[derive(Debug, Clone, PartialEq, Eq, Hash)]
248pub enum AlterSourceOperation {
249    RenameSource {
250        source_name: ObjectName,
251    },
252    AddColumn {
253        column_def: ColumnDef,
254    },
255    ChangeOwner {
256        new_owner_name: Ident,
257    },
258    SetSchema {
259        new_schema_name: ObjectName,
260    },
261    FormatEncode {
262        format_encode: FormatEncodeOptions,
263    },
264    RefreshSchema,
265    SetSourceRateLimit {
266        rate_limit: i32,
267    },
268    SwapRenameSource {
269        target_source: ObjectName,
270    },
271    /// `SET PARALLELISM TO <parallelism> [ DEFERRED ]`
272    SetParallelism {
273        parallelism: SetVariableValue,
274        deferred: bool,
275    },
276    /// `SET CONFIG (key = value, ...)`
277    SetConfig {
278        entries: Vec<SqlOption>,
279    },
280    /// `RESET CONFIG (key, ...)`
281    ResetConfig {
282        keys: Vec<ObjectName>,
283    },
284    AlterConnectorProps {
285        alter_props: Vec<SqlOption>,
286    },
287}
288
289#[derive(Debug, Clone, PartialEq, Eq, Hash)]
290pub enum AlterFunctionOperation {
291    SetSchema { new_schema_name: ObjectName },
292}
293
294#[derive(Debug, Clone, PartialEq, Eq, Hash)]
295pub enum AlterConnectionOperation {
296    SetSchema { new_schema_name: ObjectName },
297    ChangeOwner { new_owner_name: Ident },
298    AlterConnectorProps { alter_props: Vec<SqlOption> },
299}
300
301#[derive(Debug, Clone, PartialEq, Eq, Hash)]
302pub enum AlterSecretOperation {
303    ChangeCredential { new_credential: Value },
304}
305
306#[derive(Debug, Clone, PartialEq, Eq, Hash)]
307pub enum AlterFragmentOperation {
308    AlterBackfillRateLimit { rate_limit: i32 },
309    SetParallelism { parallelism: SetVariableValue },
310}
311
312impl fmt::Display for AlterDatabaseOperation {
313    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
314        match self {
315            AlterDatabaseOperation::ChangeOwner { new_owner_name } => {
316                write!(f, "OWNER TO {}", new_owner_name)
317            }
318            AlterDatabaseOperation::RenameDatabase { database_name } => {
319                write!(f, "RENAME TO {}", database_name)
320            }
321            AlterDatabaseOperation::SetParam(ConfigParam { param, value }) => {
322                write!(f, "SET {} TO {}", param, value)
323            }
324        }
325    }
326}
327
328impl fmt::Display for AlterSchemaOperation {
329    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
330        match self {
331            AlterSchemaOperation::ChangeOwner { new_owner_name } => {
332                write!(f, "OWNER TO {}", new_owner_name)
333            }
334            AlterSchemaOperation::RenameSchema { schema_name } => {
335                write!(f, "RENAME TO {}", schema_name)
336            }
337            AlterSchemaOperation::SwapRenameSchema { target_schema } => {
338                write!(f, "SWAP WITH {}", target_schema)
339            }
340        }
341    }
342}
343
344impl fmt::Display for AlterTableOperation {
345    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
346        match self {
347            AlterTableOperation::AddConstraint(c) => write!(f, "ADD {}", c),
348            AlterTableOperation::AddColumn { column_def } => {
349                write!(f, "ADD COLUMN {}", column_def)
350            }
351            AlterTableOperation::AlterColumn { column_name, op } => {
352                write!(f, "ALTER COLUMN {} {}", column_name, op)
353            }
354            AlterTableOperation::DropConstraint { name } => write!(f, "DROP CONSTRAINT {}", name),
355            AlterTableOperation::DropColumn {
356                column_name,
357                if_exists,
358                cascade,
359            } => write!(
360                f,
361                "DROP COLUMN {}{}{}",
362                if *if_exists { "IF EXISTS " } else { "" },
363                column_name,
364                if *cascade { " CASCADE" } else { "" }
365            ),
366            AlterTableOperation::RenameColumn {
367                old_column_name,
368                new_column_name,
369            } => write!(
370                f,
371                "RENAME COLUMN {} TO {}",
372                old_column_name, new_column_name
373            ),
374            AlterTableOperation::RenameTable { table_name } => {
375                write!(f, "RENAME TO {}", table_name)
376            }
377            AlterTableOperation::ChangeColumn {
378                old_name,
379                new_name,
380                data_type,
381                options,
382            } => {
383                write!(f, "CHANGE COLUMN {} {} {}", old_name, new_name, data_type)?;
384                if options.is_empty() {
385                    Ok(())
386                } else {
387                    write!(f, " {}", display_separated(options, " "))
388                }
389            }
390            AlterTableOperation::RenameConstraint { old_name, new_name } => {
391                write!(f, "RENAME CONSTRAINT {} TO {}", old_name, new_name)
392            }
393            AlterTableOperation::ChangeOwner { new_owner_name } => {
394                write!(f, "OWNER TO {}", new_owner_name)
395            }
396            AlterTableOperation::SetSchema { new_schema_name } => {
397                write!(f, "SET SCHEMA {}", new_schema_name)
398            }
399            AlterTableOperation::SetParallelism {
400                parallelism,
401                deferred,
402            } => {
403                write!(
404                    f,
405                    "SET PARALLELISM TO {}{}",
406                    parallelism,
407                    if *deferred { " DEFERRED" } else { "" }
408                )
409            }
410            AlterTableOperation::SetConfig { entries } => {
411                write!(f, "SET CONFIG ({})", display_comma_separated(entries))
412            }
413            AlterTableOperation::ResetConfig { keys } => {
414                write!(f, "RESET CONFIG ({})", display_comma_separated(keys))
415            }
416            AlterTableOperation::RefreshSchema => {
417                write!(f, "REFRESH SCHEMA")
418            }
419            AlterTableOperation::SetSourceRateLimit { rate_limit } => {
420                write!(f, "SET SOURCE_RATE_LIMIT TO {}", rate_limit)
421            }
422            AlterTableOperation::SetBackfillRateLimit { rate_limit } => {
423                write!(f, "SET BACKFILL_RATE_LIMIT TO {}", rate_limit)
424            }
425            AlterTableOperation::SetDmlRateLimit { rate_limit } => {
426                write!(f, "SET DML_RATE_LIMIT TO {}", rate_limit)
427            }
428            AlterTableOperation::SwapRenameTable { target_table } => {
429                write!(f, "SWAP WITH {}", target_table)
430            }
431            AlterTableOperation::DropConnector => {
432                write!(f, "DROP CONNECTOR")
433            }
434            AlterTableOperation::AlterConnectorProps { alter_props } => {
435                write!(
436                    f,
437                    "CONNECTOR WITH ({})",
438                    display_comma_separated(alter_props)
439                )
440            }
441        }
442    }
443}
444
445impl fmt::Display for AlterIndexOperation {
446    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
447        match self {
448            AlterIndexOperation::RenameIndex { index_name } => {
449                write!(f, "RENAME TO {index_name}")
450            }
451            AlterIndexOperation::SetParallelism {
452                parallelism,
453                deferred,
454            } => {
455                write!(
456                    f,
457                    "SET PARALLELISM TO {}{}",
458                    parallelism,
459                    if *deferred { " DEFERRED" } else { "" }
460                )
461            }
462            AlterIndexOperation::SetConfig { entries } => {
463                write!(f, "SET CONFIG ({})", display_comma_separated(entries))
464            }
465            AlterIndexOperation::ResetConfig { keys } => {
466                write!(f, "RESET CONFIG ({})", display_comma_separated(keys))
467            }
468        }
469    }
470}
471
472impl fmt::Display for AlterViewOperation {
473    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
474        match self {
475            AlterViewOperation::RenameView { view_name } => {
476                write!(f, "RENAME TO {view_name}")
477            }
478            AlterViewOperation::ChangeOwner { new_owner_name } => {
479                write!(f, "OWNER TO {}", new_owner_name)
480            }
481            AlterViewOperation::SetSchema { new_schema_name } => {
482                write!(f, "SET SCHEMA {}", new_schema_name)
483            }
484            AlterViewOperation::SetParallelism {
485                parallelism,
486                deferred,
487            } => {
488                write!(
489                    f,
490                    "SET PARALLELISM TO {}{}",
491                    parallelism,
492                    if *deferred { " DEFERRED" } else { "" }
493                )
494            }
495            AlterViewOperation::SetBackfillRateLimit { rate_limit } => {
496                write!(f, "SET BACKFILL_RATE_LIMIT TO {}", rate_limit)
497            }
498            AlterViewOperation::SwapRenameView { target_view } => {
499                write!(f, "SWAP WITH {}", target_view)
500            }
501            AlterViewOperation::SetResourceGroup {
502                resource_group,
503                deferred,
504            } => {
505                let deferred = if *deferred { " DEFERRED" } else { "" };
506
507                if let Some(resource_group) = resource_group {
508                    write!(f, "SET RESOURCE_GROUP TO {} {}", resource_group, deferred)
509                } else {
510                    write!(f, "RESET RESOURCE_GROUP {}", deferred)
511                }
512            }
513            AlterViewOperation::SetStreamingEnableUnalignedJoin { enable } => {
514                write!(f, "SET STREAMING_ENABLE_UNALIGNED_JOIN TO {}", enable)
515            }
516            AlterViewOperation::AsQuery { query } => {
517                write!(f, "AS {}", query)
518            }
519            AlterViewOperation::SetConfig { entries } => {
520                write!(f, "SET CONFIG ({})", display_comma_separated(entries))
521            }
522            AlterViewOperation::ResetConfig { keys } => {
523                write!(f, "RESET CONFIG ({})", display_comma_separated(keys))
524            }
525        }
526    }
527}
528
529impl fmt::Display for AlterSinkOperation {
530    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
531        match self {
532            AlterSinkOperation::RenameSink { sink_name } => {
533                write!(f, "RENAME TO {sink_name}")
534            }
535            AlterSinkOperation::ChangeOwner { new_owner_name } => {
536                write!(f, "OWNER TO {}", new_owner_name)
537            }
538            AlterSinkOperation::SetSchema { new_schema_name } => {
539                write!(f, "SET SCHEMA {}", new_schema_name)
540            }
541            AlterSinkOperation::SetParallelism {
542                parallelism,
543                deferred,
544            } => {
545                write!(
546                    f,
547                    "SET PARALLELISM TO {}{}",
548                    parallelism,
549                    if *deferred { " DEFERRED" } else { "" }
550                )
551            }
552            AlterSinkOperation::SetConfig { entries } => {
553                write!(f, "SET CONFIG ({})", display_comma_separated(entries))
554            }
555            AlterSinkOperation::ResetConfig { keys } => {
556                write!(f, "RESET CONFIG ({})", display_comma_separated(keys))
557            }
558            AlterSinkOperation::SwapRenameSink { target_sink } => {
559                write!(f, "SWAP WITH {}", target_sink)
560            }
561            AlterSinkOperation::SetSinkRateLimit { rate_limit } => {
562                write!(f, "SET SINK_RATE_LIMIT TO {}", rate_limit)
563            }
564            AlterSinkOperation::AlterConnectorProps {
565                alter_props: changed_props,
566            } => {
567                write!(
568                    f,
569                    "CONNECTOR WITH ({})",
570                    display_comma_separated(changed_props)
571                )
572            }
573            AlterSinkOperation::SetStreamingEnableUnalignedJoin { enable } => {
574                write!(f, "SET STREAMING_ENABLE_UNALIGNED_JOIN TO {}", enable)
575            }
576        }
577    }
578}
579
580impl fmt::Display for AlterSubscriptionOperation {
581    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
582        match self {
583            AlterSubscriptionOperation::RenameSubscription { subscription_name } => {
584                write!(f, "RENAME TO {subscription_name}")
585            }
586            AlterSubscriptionOperation::ChangeOwner { new_owner_name } => {
587                write!(f, "OWNER TO {}", new_owner_name)
588            }
589            AlterSubscriptionOperation::SetSchema { new_schema_name } => {
590                write!(f, "SET SCHEMA {}", new_schema_name)
591            }
592            AlterSubscriptionOperation::SwapRenameSubscription {
593                target_subscription,
594            } => {
595                write!(f, "SWAP WITH {}", target_subscription)
596            }
597        }
598    }
599}
600
601impl fmt::Display for AlterSourceOperation {
602    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
603        match self {
604            AlterSourceOperation::RenameSource { source_name } => {
605                write!(f, "RENAME TO {source_name}")
606            }
607            AlterSourceOperation::AddColumn { column_def } => {
608                write!(f, "ADD COLUMN {column_def}")
609            }
610            AlterSourceOperation::ChangeOwner { new_owner_name } => {
611                write!(f, "OWNER TO {}", new_owner_name)
612            }
613            AlterSourceOperation::SetSchema { new_schema_name } => {
614                write!(f, "SET SCHEMA {}", new_schema_name)
615            }
616            AlterSourceOperation::FormatEncode { format_encode } => {
617                write!(f, "{format_encode}")
618            }
619            AlterSourceOperation::RefreshSchema => {
620                write!(f, "REFRESH SCHEMA")
621            }
622            AlterSourceOperation::SetSourceRateLimit { rate_limit } => {
623                write!(f, "SET SOURCE_RATE_LIMIT TO {}", rate_limit)
624            }
625            AlterSourceOperation::SwapRenameSource { target_source } => {
626                write!(f, "SWAP WITH {}", target_source)
627            }
628            AlterSourceOperation::SetParallelism {
629                parallelism,
630                deferred,
631            } => {
632                write!(
633                    f,
634                    "SET PARALLELISM TO {}{}",
635                    parallelism,
636                    if *deferred { " DEFERRED" } else { "" }
637                )
638            }
639            AlterSourceOperation::SetConfig { entries } => {
640                write!(f, "SET CONFIG ({})", display_comma_separated(entries))
641            }
642            AlterSourceOperation::ResetConfig { keys } => {
643                write!(f, "RESET CONFIG ({})", display_comma_separated(keys))
644            }
645            AlterSourceOperation::AlterConnectorProps { alter_props } => {
646                write!(
647                    f,
648                    "CONNECTOR WITH ({})",
649                    display_comma_separated(alter_props)
650                )
651            }
652        }
653    }
654}
655
656impl fmt::Display for AlterFunctionOperation {
657    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
658        match self {
659            AlterFunctionOperation::SetSchema { new_schema_name } => {
660                write!(f, "SET SCHEMA {new_schema_name}")
661            }
662        }
663    }
664}
665
666impl fmt::Display for AlterConnectionOperation {
667    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
668        match self {
669            AlterConnectionOperation::SetSchema { new_schema_name } => {
670                write!(f, "SET SCHEMA {new_schema_name}")
671            }
672            AlterConnectionOperation::ChangeOwner { new_owner_name } => {
673                write!(f, "OWNER TO {new_owner_name}")
674            }
675            AlterConnectionOperation::AlterConnectorProps { alter_props } => {
676                write!(
677                    f,
678                    "CONNECTOR WITH ({})",
679                    display_comma_separated(alter_props)
680                )
681            }
682        }
683    }
684}
685
686impl fmt::Display for AlterSecretOperation {
687    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
688        match self {
689            AlterSecretOperation::ChangeCredential { new_credential } => {
690                write!(f, "AS {new_credential}")
691            }
692        }
693    }
694}
695
696/// An `ALTER COLUMN` (`Statement::AlterTable`) operation
697#[derive(Debug, Clone, PartialEq, Eq, Hash)]
698pub enum AlterColumnOperation {
699    /// `SET NOT NULL`
700    SetNotNull,
701    /// `DROP NOT NULL`
702    DropNotNull,
703    /// `SET DEFAULT <expr>`
704    SetDefault { value: Expr },
705    /// `DROP DEFAULT`
706    DropDefault,
707    /// `[SET DATA] TYPE <data_type> [USING <expr>]`
708    SetDataType {
709        data_type: DataType,
710        /// PostgreSQL specific
711        using: Option<Expr>,
712    },
713}
714
715impl fmt::Display for AlterColumnOperation {
716    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
717        match self {
718            AlterColumnOperation::SetNotNull => write!(f, "SET NOT NULL",),
719            AlterColumnOperation::DropNotNull => write!(f, "DROP NOT NULL",),
720            AlterColumnOperation::SetDefault { value } => {
721                write!(f, "SET DEFAULT {}", value)
722            }
723            AlterColumnOperation::DropDefault => {
724                write!(f, "DROP DEFAULT")
725            }
726            AlterColumnOperation::SetDataType { data_type, using } => {
727                if let Some(expr) = using {
728                    write!(f, "SET DATA TYPE {} USING {}", data_type, expr)
729                } else {
730                    write!(f, "SET DATA TYPE {}", data_type)
731                }
732            }
733        }
734    }
735}
736
737impl fmt::Display for AlterFragmentOperation {
738    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
739        match self {
740            AlterFragmentOperation::AlterBackfillRateLimit { rate_limit } => {
741                write!(f, "SET BACKFILL_RATE_LIMIT TO {}", rate_limit)
742            }
743            AlterFragmentOperation::SetParallelism { parallelism } => {
744                write!(f, "SET PARALLELISM TO {}", parallelism)
745            }
746        }
747    }
748}
749
750/// The watermark on source.
751/// `WATERMARK FOR <column> AS (<expr>)`
752#[derive(Debug, Clone, PartialEq, Eq, Hash)]
753pub struct SourceWatermark {
754    pub column: Ident,
755    pub expr: Expr,
756}
757
758impl fmt::Display for SourceWatermark {
759    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
760        write!(f, "WATERMARK FOR {} AS {}", self.column, self.expr,)
761    }
762}
763
764/// A table-level constraint, specified in a `CREATE TABLE` or an
765/// `ALTER TABLE ADD <constraint>` statement.
766#[derive(Debug, Clone, PartialEq, Eq, Hash)]
767pub enum TableConstraint {
768    /// `[ CONSTRAINT <name> ] { PRIMARY KEY | UNIQUE } (<columns>)`
769    Unique {
770        name: Option<Ident>,
771        columns: Vec<Ident>,
772        /// Whether this is a `PRIMARY KEY` or just a `UNIQUE` constraint
773        is_primary: bool,
774    },
775    /// A referential integrity constraint (`[ CONSTRAINT <name> ] FOREIGN KEY (<columns>)
776    /// REFERENCES <foreign_table> (<referred_columns>)
777    /// { [ON DELETE <referential_action>] [ON UPDATE <referential_action>] |
778    ///   [ON UPDATE <referential_action>] [ON DELETE <referential_action>]
779    /// }`).
780    ForeignKey {
781        name: Option<Ident>,
782        columns: Vec<Ident>,
783        foreign_table: ObjectName,
784        referred_columns: Vec<Ident>,
785        on_delete: Option<ReferentialAction>,
786        on_update: Option<ReferentialAction>,
787    },
788    /// `[ CONSTRAINT <name> ] CHECK (<expr>)`
789    Check {
790        name: Option<Ident>,
791        expr: Box<Expr>,
792    },
793}
794
795impl fmt::Display for TableConstraint {
796    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
797        match self {
798            TableConstraint::Unique {
799                name,
800                columns,
801                is_primary,
802            } => write!(
803                f,
804                "{}{} ({})",
805                display_constraint_name(name),
806                if *is_primary { "PRIMARY KEY" } else { "UNIQUE" },
807                display_comma_separated(columns)
808            ),
809            TableConstraint::ForeignKey {
810                name,
811                columns,
812                foreign_table,
813                referred_columns,
814                on_delete,
815                on_update,
816            } => {
817                write!(
818                    f,
819                    "{}FOREIGN KEY ({}) REFERENCES {}({})",
820                    display_constraint_name(name),
821                    display_comma_separated(columns),
822                    foreign_table,
823                    display_comma_separated(referred_columns),
824                )?;
825                if let Some(action) = on_delete {
826                    write!(f, " ON DELETE {}", action)?;
827                }
828                if let Some(action) = on_update {
829                    write!(f, " ON UPDATE {}", action)?;
830                }
831                Ok(())
832            }
833            TableConstraint::Check { name, expr } => {
834                write!(f, "{}CHECK ({})", display_constraint_name(name), expr)
835            }
836        }
837    }
838}
839
840/// SQL column definition
841#[derive(Debug, Clone, PartialEq, Eq, Hash)]
842pub struct ColumnDef {
843    pub name: Ident,
844    pub data_type: Option<DataType>,
845    pub collation: Option<ObjectName>,
846    pub options: Vec<ColumnOptionDef>,
847}
848
849impl ColumnDef {
850    pub fn new(
851        name: Ident,
852        data_type: DataType,
853        collation: Option<ObjectName>,
854        options: Vec<ColumnOptionDef>,
855    ) -> Self {
856        ColumnDef {
857            name,
858            data_type: Some(data_type),
859            collation,
860            options,
861        }
862    }
863
864    pub fn is_generated(&self) -> bool {
865        self.options
866            .iter()
867            .any(|option| matches!(option.option, ColumnOption::GeneratedColumns(_)))
868    }
869}
870
871impl fmt::Display for ColumnDef {
872    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
873        write!(
874            f,
875            "{} {}",
876            self.name,
877            if let Some(data_type) = &self.data_type {
878                data_type.to_string()
879            } else {
880                "None".to_owned()
881            }
882        )?;
883        for option in &self.options {
884            write!(f, " {}", option)?;
885        }
886        Ok(())
887    }
888}
889
890/// An optionally-named `ColumnOption`: `[ CONSTRAINT <name> ] <column-option>`.
891///
892/// Note that implementations are substantially more permissive than the ANSI
893/// specification on what order column options can be presented in, and whether
894/// they are allowed to be named. The specification distinguishes between
895/// constraints (NOT NULL, UNIQUE, PRIMARY KEY, and CHECK), which can be named
896/// and can appear in any order, and other options (DEFAULT, GENERATED), which
897/// cannot be named and must appear in a fixed order. PostgreSQL, however,
898/// allows preceding any option with `CONSTRAINT <name>`, even those that are
899/// not really constraints, like NULL and DEFAULT. MSSQL is less permissive,
900/// allowing DEFAULT, UNIQUE, PRIMARY KEY and CHECK to be named, but not NULL or
901/// NOT NULL constraints (the last of which is in violation of the spec).
902///
903/// For maximum flexibility, we don't distinguish between constraint and
904/// non-constraint options, lumping them all together under the umbrella of
905/// "column options," and we allow any column option to be named.
906#[derive(Debug, Clone, PartialEq, Eq, Hash)]
907pub struct ColumnOptionDef {
908    pub name: Option<Ident>,
909    pub option: ColumnOption,
910}
911
912impl fmt::Display for ColumnOptionDef {
913    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
914        write!(f, "{}{}", display_constraint_name(&self.name), self.option)
915    }
916}
917
918/// `ColumnOption`s are modifiers that follow a column definition in a `CREATE
919/// TABLE` statement.
920#[derive(Debug, Clone, PartialEq, Eq, Hash)]
921pub enum ColumnOption {
922    /// `NULL`
923    Null,
924    /// `NOT NULL`
925    NotNull,
926    /// `DEFAULT <restricted-expr>`
927    DefaultValue(Expr),
928    /// Default value from previous bound `DefaultColumnDesc`. Used internally
929    /// for schema change and should not be specified by users.
930    DefaultValueInternal {
931        /// Protobuf encoded `DefaultColumnDesc`.
932        persisted: Box<[u8]>,
933        /// Optional AST for unparsing. If `None`, the default value will be
934        /// shown as `DEFAULT INTERNAL` which is for demonstrating and should
935        /// not be specified by users.
936        expr: Option<Expr>,
937    },
938    /// `{ PRIMARY KEY | UNIQUE }`
939    Unique { is_primary: bool },
940    /// A referential integrity constraint (`[FOREIGN KEY REFERENCES
941    /// <foreign_table> (<referred_columns>)
942    /// { [ON DELETE <referential_action>] [ON UPDATE <referential_action>] |
943    ///   [ON UPDATE <referential_action>] [ON DELETE <referential_action>]
944    /// }`).
945    ForeignKey {
946        foreign_table: ObjectName,
947        referred_columns: Vec<Ident>,
948        on_delete: Option<ReferentialAction>,
949        on_update: Option<ReferentialAction>,
950    },
951    /// `CHECK (<expr>)`
952    Check(Expr),
953    /// Dialect-specific options, such as:
954    /// - MySQL's `AUTO_INCREMENT` or SQLite's `AUTOINCREMENT`
955    /// - ...
956    DialectSpecific(Vec<Token>),
957    /// AS ( <generation_expr> )`
958    GeneratedColumns(Expr),
959}
960
961impl fmt::Display for ColumnOption {
962    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
963        use ColumnOption::*;
964        match self {
965            Null => write!(f, "NULL"),
966            NotNull => write!(f, "NOT NULL"),
967            DefaultValue(expr) => write!(f, "DEFAULT {}", expr),
968            DefaultValueInternal { persisted: _, expr } => {
969                if let Some(expr) = expr {
970                    write!(f, "DEFAULT {}", expr)
971                } else {
972                    write!(f, "DEFAULT INTERNAL")
973                }
974            }
975            Unique { is_primary } => {
976                write!(f, "{}", if *is_primary { "PRIMARY KEY" } else { "UNIQUE" })
977            }
978            ForeignKey {
979                foreign_table,
980                referred_columns,
981                on_delete,
982                on_update,
983            } => {
984                write!(f, "REFERENCES {}", foreign_table)?;
985                if !referred_columns.is_empty() {
986                    write!(f, " ({})", display_comma_separated(referred_columns))?;
987                }
988                if let Some(action) = on_delete {
989                    write!(f, " ON DELETE {}", action)?;
990                }
991                if let Some(action) = on_update {
992                    write!(f, " ON UPDATE {}", action)?;
993                }
994                Ok(())
995            }
996            Check(expr) => write!(f, "CHECK ({})", expr),
997            DialectSpecific(val) => write!(f, "{}", display_separated(val, " ")),
998            GeneratedColumns(expr) => write!(f, "AS {}", expr),
999        }
1000    }
1001}
1002
1003fn display_constraint_name(name: &'_ Option<Ident>) -> impl fmt::Display + '_ {
1004    struct ConstraintName<'a>(&'a Option<Ident>);
1005    impl fmt::Display for ConstraintName<'_> {
1006        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1007            if let Some(name) = self.0 {
1008                write!(f, "CONSTRAINT {} ", name)?;
1009            }
1010            Ok(())
1011        }
1012    }
1013    ConstraintName(name)
1014}
1015
1016/// `<referential_action> =
1017/// { RESTRICT | CASCADE | SET NULL | NO ACTION | SET DEFAULT }`
1018///
1019/// Used in foreign key constraints in `ON UPDATE` and `ON DELETE` options.
1020#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1021pub enum ReferentialAction {
1022    Restrict,
1023    Cascade,
1024    SetNull,
1025    NoAction,
1026    SetDefault,
1027}
1028
1029impl fmt::Display for ReferentialAction {
1030    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1031        f.write_str(match self {
1032            ReferentialAction::Restrict => "RESTRICT",
1033            ReferentialAction::Cascade => "CASCADE",
1034            ReferentialAction::SetNull => "SET NULL",
1035            ReferentialAction::NoAction => "NO ACTION",
1036            ReferentialAction::SetDefault => "SET DEFAULT",
1037        })
1038    }
1039}
1040
1041/// secure secret definition for webhook source
1042#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1043pub struct WebhookSourceInfo {
1044    pub secret_ref: Option<SecretRefValue>,
1045    pub signature_expr: Expr,
1046    pub wait_for_persistence: bool,
1047    pub is_batched: bool,
1048}