risingwave_sqlparser/ast/
ddl.rs

1// Licensed under the Apache License, Version 2.0 (the "License");
2// you may not use this file except in compliance with the License.
3// You may obtain a copy of the License at
4//
5//     http://www.apache.org/licenses/LICENSE-2.0
6//
7// Unless required by applicable law or agreed to in writing, software
8// distributed under the License is distributed on an "AS IS" BASIS,
9// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10// See the License for the specific language governing permissions and
11// limitations under the License.
12
13//! AST types specific to CREATE/ALTER variants of [`crate::ast::Statement`]
14//! (commonly referred to as Data Definition Language, or DDL)
15
16#[cfg(not(feature = "std"))]
17use alloc::{boxed::Box, string::ToString, vec::Vec};
18use core::fmt;
19
20#[cfg(feature = "serde")]
21use serde::{Deserialize, Serialize};
22
23use super::{ConfigParam, FormatEncodeOptions, SqlOption};
24use crate::ast::{
25    DataType, Expr, Ident, ObjectName, Query, SecretRefValue, SetVariableValue, Value,
26    display_comma_separated, display_separated,
27};
28use crate::tokenizer::Token;
29
30#[derive(Debug, Clone, PartialEq, Eq, Hash)]
31#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
32pub enum AlterDatabaseOperation {
33    ChangeOwner { new_owner_name: Ident },
34    RenameDatabase { database_name: ObjectName },
35    SetParam(ConfigParam),
36}
37
38#[derive(Debug, Clone, PartialEq, Eq, Hash)]
39#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
40pub enum AlterSchemaOperation {
41    ChangeOwner { new_owner_name: Ident },
42    RenameSchema { schema_name: ObjectName },
43    SwapRenameSchema { target_schema: ObjectName },
44}
45
46/// An `ALTER TABLE` (`Statement::AlterTable`) operation
47#[derive(Debug, Clone, PartialEq, Eq, Hash)]
48#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
49pub enum AlterTableOperation {
50    /// `ADD <table_constraint>`
51    AddConstraint(TableConstraint),
52    /// `ADD [ COLUMN ] <column_def>`
53    AddColumn {
54        column_def: ColumnDef,
55    },
56    /// TODO: implement `DROP CONSTRAINT <name>`
57    DropConstraint {
58        name: Ident,
59    },
60    /// `DROP [ COLUMN ] [ IF EXISTS ] <column_name> [ CASCADE ]`
61    DropColumn {
62        column_name: Ident,
63        if_exists: bool,
64        cascade: bool,
65    },
66    /// `RENAME [ COLUMN ] <old_column_name> TO <new_column_name>`
67    RenameColumn {
68        old_column_name: Ident,
69        new_column_name: Ident,
70    },
71    /// `RENAME TO <table_name>`
72    RenameTable {
73        table_name: ObjectName,
74    },
75    // CHANGE [ COLUMN ] <old_name> <new_name> <data_type> [ <options> ]
76    ChangeColumn {
77        old_name: Ident,
78        new_name: Ident,
79        data_type: DataType,
80        options: Vec<ColumnOption>,
81    },
82    /// `RENAME CONSTRAINT <old_constraint_name> TO <new_constraint_name>`
83    ///
84    /// Note: this is a PostgreSQL-specific operation.
85    RenameConstraint {
86        old_name: Ident,
87        new_name: Ident,
88    },
89    /// `ALTER [ COLUMN ]`
90    AlterColumn {
91        column_name: Ident,
92        op: AlterColumnOperation,
93    },
94    /// `OWNER TO <owner_name>`
95    ChangeOwner {
96        new_owner_name: Ident,
97    },
98    /// `SET SCHEMA <schema_name>`
99    SetSchema {
100        new_schema_name: ObjectName,
101    },
102    /// `SET PARALLELISM TO <parallelism> [ DEFERRED ]`
103    SetParallelism {
104        parallelism: SetVariableValue,
105        deferred: bool,
106    },
107    RefreshSchema,
108    /// `SET SOURCE_RATE_LIMIT TO <rate_limit>`
109    SetSourceRateLimit {
110        rate_limit: i32,
111    },
112    /// SET BACKFILL_RATE_LIMIT TO <rate_limit>
113    SetBackfillRateLimit {
114        rate_limit: i32,
115    },
116    /// `SET DML_RATE_LIMIT TO <rate_limit>`
117    SetDmlRateLimit {
118        rate_limit: i32,
119    },
120    /// `SWAP WITH <table_name>`
121    SwapRenameTable {
122        target_table: ObjectName,
123    },
124    /// `DROP CONNECTOR`
125    DropConnector,
126
127    /// `ALTER CONNECTOR WITH (<connector_props>)`
128    AlterConnectorProps {
129        alter_props: Vec<SqlOption>,
130    },
131}
132
133#[derive(Debug, Clone, PartialEq, Eq, Hash)]
134#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
135pub enum AlterIndexOperation {
136    RenameIndex {
137        index_name: ObjectName,
138    },
139    /// `SET PARALLELISM TO <parallelism> [ DEFERRED ]`
140    SetParallelism {
141        parallelism: SetVariableValue,
142        deferred: bool,
143    },
144}
145
146#[derive(Debug, Clone, PartialEq, Eq, Hash)]
147#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
148pub enum AlterViewOperation {
149    RenameView {
150        view_name: ObjectName,
151    },
152    ChangeOwner {
153        new_owner_name: Ident,
154    },
155    SetSchema {
156        new_schema_name: ObjectName,
157    },
158    /// `SET PARALLELISM TO <parallelism> [ DEFERRED ]`
159    SetParallelism {
160        parallelism: SetVariableValue,
161        deferred: bool,
162    },
163    /// `SET RESOURCE_GROUP TO 'RESOURCE GROUP' [ DEFERRED ]`
164    /// `RESET RESOURCE_GROUP [ DEFERRED ]`
165    SetResourceGroup {
166        resource_group: Option<SetVariableValue>,
167        deferred: bool,
168    },
169    /// `SET BACKFILL_RATE_LIMIT TO <rate_limit>`
170    SetBackfillRateLimit {
171        rate_limit: i32,
172    },
173    /// `SWAP WITH <view_name>`
174    SwapRenameView {
175        target_view: ObjectName,
176    },
177    SetStreamingEnableUnalignedJoin {
178        enable: bool,
179    },
180    /// `AS <query>`
181    AsQuery {
182        query: Box<Query>,
183    },
184}
185
186#[derive(Debug, Clone, PartialEq, Eq, Hash)]
187#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
188pub enum AlterSinkOperation {
189    RenameSink {
190        sink_name: ObjectName,
191    },
192    ChangeOwner {
193        new_owner_name: Ident,
194    },
195    SetSchema {
196        new_schema_name: ObjectName,
197    },
198    /// `SET PARALLELISM TO <parallelism> [ DEFERRED ]`
199    SetParallelism {
200        parallelism: SetVariableValue,
201        deferred: bool,
202    },
203    /// `SWAP WITH <sink_name>`
204    SwapRenameSink {
205        target_sink: ObjectName,
206    },
207    SetSinkRateLimit {
208        rate_limit: i32,
209    },
210    AlterConnectorProps {
211        alter_props: Vec<SqlOption>,
212    },
213    SetStreamingEnableUnalignedJoin {
214        enable: bool,
215    },
216}
217
218#[derive(Debug, Clone, PartialEq, Eq, Hash)]
219#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
220pub enum AlterSubscriptionOperation {
221    RenameSubscription { subscription_name: ObjectName },
222    ChangeOwner { new_owner_name: Ident },
223    SetSchema { new_schema_name: ObjectName },
224    SwapRenameSubscription { target_subscription: ObjectName },
225}
226
227#[derive(Debug, Clone, PartialEq, Eq, Hash)]
228#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
229pub enum AlterSourceOperation {
230    RenameSource {
231        source_name: ObjectName,
232    },
233    AddColumn {
234        column_def: ColumnDef,
235    },
236    ChangeOwner {
237        new_owner_name: Ident,
238    },
239    SetSchema {
240        new_schema_name: ObjectName,
241    },
242    FormatEncode {
243        format_encode: FormatEncodeOptions,
244    },
245    RefreshSchema,
246    SetSourceRateLimit {
247        rate_limit: i32,
248    },
249    SwapRenameSource {
250        target_source: ObjectName,
251    },
252    /// `SET PARALLELISM TO <parallelism> [ DEFERRED ]`
253    SetParallelism {
254        parallelism: SetVariableValue,
255        deferred: bool,
256    },
257    AlterConnectorProps {
258        alter_props: Vec<SqlOption>,
259    },
260}
261
262#[derive(Debug, Clone, PartialEq, Eq, Hash)]
263#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
264pub enum AlterFunctionOperation {
265    SetSchema { new_schema_name: ObjectName },
266}
267
268#[derive(Debug, Clone, PartialEq, Eq, Hash)]
269#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
270pub enum AlterConnectionOperation {
271    SetSchema { new_schema_name: ObjectName },
272    ChangeOwner { new_owner_name: Ident },
273}
274
275#[derive(Debug, Clone, PartialEq, Eq, Hash)]
276#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
277pub enum AlterSecretOperation {
278    ChangeCredential { new_credential: Value },
279}
280
281#[derive(Debug, Clone, PartialEq, Eq, Hash)]
282#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
283pub enum AlterFragmentOperation {
284    AlterBackfillRateLimit { rate_limit: i32 },
285}
286
287impl fmt::Display for AlterDatabaseOperation {
288    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
289        match self {
290            AlterDatabaseOperation::ChangeOwner { new_owner_name } => {
291                write!(f, "OWNER TO {}", new_owner_name)
292            }
293            AlterDatabaseOperation::RenameDatabase { database_name } => {
294                write!(f, "RENAME TO {}", database_name)
295            }
296            AlterDatabaseOperation::SetParam(ConfigParam { param, value }) => {
297                write!(f, "SET {} TO {}", param, value)
298            }
299        }
300    }
301}
302
303impl fmt::Display for AlterSchemaOperation {
304    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
305        match self {
306            AlterSchemaOperation::ChangeOwner { new_owner_name } => {
307                write!(f, "OWNER TO {}", new_owner_name)
308            }
309            AlterSchemaOperation::RenameSchema { schema_name } => {
310                write!(f, "RENAME TO {}", schema_name)
311            }
312            AlterSchemaOperation::SwapRenameSchema { target_schema } => {
313                write!(f, "SWAP WITH {}", target_schema)
314            }
315        }
316    }
317}
318
319impl fmt::Display for AlterTableOperation {
320    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
321        match self {
322            AlterTableOperation::AddConstraint(c) => write!(f, "ADD {}", c),
323            AlterTableOperation::AddColumn { column_def } => {
324                write!(f, "ADD COLUMN {}", column_def)
325            }
326            AlterTableOperation::AlterColumn { column_name, op } => {
327                write!(f, "ALTER COLUMN {} {}", column_name, op)
328            }
329            AlterTableOperation::DropConstraint { name } => write!(f, "DROP CONSTRAINT {}", name),
330            AlterTableOperation::DropColumn {
331                column_name,
332                if_exists,
333                cascade,
334            } => write!(
335                f,
336                "DROP COLUMN {}{}{}",
337                if *if_exists { "IF EXISTS " } else { "" },
338                column_name,
339                if *cascade { " CASCADE" } else { "" }
340            ),
341            AlterTableOperation::RenameColumn {
342                old_column_name,
343                new_column_name,
344            } => write!(
345                f,
346                "RENAME COLUMN {} TO {}",
347                old_column_name, new_column_name
348            ),
349            AlterTableOperation::RenameTable { table_name } => {
350                write!(f, "RENAME TO {}", table_name)
351            }
352            AlterTableOperation::ChangeColumn {
353                old_name,
354                new_name,
355                data_type,
356                options,
357            } => {
358                write!(f, "CHANGE COLUMN {} {} {}", old_name, new_name, data_type)?;
359                if options.is_empty() {
360                    Ok(())
361                } else {
362                    write!(f, " {}", display_separated(options, " "))
363                }
364            }
365            AlterTableOperation::RenameConstraint { old_name, new_name } => {
366                write!(f, "RENAME CONSTRAINT {} TO {}", old_name, new_name)
367            }
368            AlterTableOperation::ChangeOwner { new_owner_name } => {
369                write!(f, "OWNER TO {}", new_owner_name)
370            }
371            AlterTableOperation::SetSchema { new_schema_name } => {
372                write!(f, "SET SCHEMA {}", new_schema_name)
373            }
374            AlterTableOperation::SetParallelism {
375                parallelism,
376                deferred,
377            } => {
378                write!(
379                    f,
380                    "SET PARALLELISM TO {}{}",
381                    parallelism,
382                    if *deferred { " DEFERRED" } else { "" }
383                )
384            }
385            AlterTableOperation::RefreshSchema => {
386                write!(f, "REFRESH SCHEMA")
387            }
388            AlterTableOperation::SetSourceRateLimit { rate_limit } => {
389                write!(f, "SET SOURCE_RATE_LIMIT TO {}", rate_limit)
390            }
391            AlterTableOperation::SetBackfillRateLimit { rate_limit } => {
392                write!(f, "SET BACKFILL_RATE_LIMIT TO {}", rate_limit)
393            }
394            AlterTableOperation::SetDmlRateLimit { rate_limit } => {
395                write!(f, "SET DML_RATE_LIMIT TO {}", rate_limit)
396            }
397            AlterTableOperation::SwapRenameTable { target_table } => {
398                write!(f, "SWAP WITH {}", target_table)
399            }
400            AlterTableOperation::DropConnector => {
401                write!(f, "DROP CONNECTOR")
402            }
403            AlterTableOperation::AlterConnectorProps { alter_props } => {
404                write!(
405                    f,
406                    "CONNECTOR WITH ({})",
407                    display_comma_separated(alter_props)
408                )
409            }
410        }
411    }
412}
413
414impl fmt::Display for AlterIndexOperation {
415    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
416        match self {
417            AlterIndexOperation::RenameIndex { index_name } => {
418                write!(f, "RENAME TO {index_name}")
419            }
420            AlterIndexOperation::SetParallelism {
421                parallelism,
422                deferred,
423            } => {
424                write!(
425                    f,
426                    "SET PARALLELISM TO {}{}",
427                    parallelism,
428                    if *deferred { " DEFERRED" } else { "" }
429                )
430            }
431        }
432    }
433}
434
435impl fmt::Display for AlterViewOperation {
436    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
437        match self {
438            AlterViewOperation::RenameView { view_name } => {
439                write!(f, "RENAME TO {view_name}")
440            }
441            AlterViewOperation::ChangeOwner { new_owner_name } => {
442                write!(f, "OWNER TO {}", new_owner_name)
443            }
444            AlterViewOperation::SetSchema { new_schema_name } => {
445                write!(f, "SET SCHEMA {}", new_schema_name)
446            }
447            AlterViewOperation::SetParallelism {
448                parallelism,
449                deferred,
450            } => {
451                write!(
452                    f,
453                    "SET PARALLELISM TO {}{}",
454                    parallelism,
455                    if *deferred { " DEFERRED" } else { "" }
456                )
457            }
458            AlterViewOperation::SetBackfillRateLimit { rate_limit } => {
459                write!(f, "SET BACKFILL_RATE_LIMIT TO {}", rate_limit)
460            }
461            AlterViewOperation::SwapRenameView { target_view } => {
462                write!(f, "SWAP WITH {}", target_view)
463            }
464            AlterViewOperation::SetResourceGroup {
465                resource_group,
466                deferred,
467            } => {
468                let deferred = if *deferred { " DEFERRED" } else { "" };
469
470                if let Some(resource_group) = resource_group {
471                    write!(f, "SET RESOURCE_GROUP TO {} {}", resource_group, deferred)
472                } else {
473                    write!(f, "RESET RESOURCE_GROUP {}", deferred)
474                }
475            }
476            AlterViewOperation::SetStreamingEnableUnalignedJoin { enable } => {
477                write!(f, "SET STREAMING_ENABLE_UNALIGNED_JOIN TO {}", enable)
478            }
479            AlterViewOperation::AsQuery { query } => {
480                write!(f, "AS {}", query)
481            }
482        }
483    }
484}
485
486impl fmt::Display for AlterSinkOperation {
487    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
488        match self {
489            AlterSinkOperation::RenameSink { sink_name } => {
490                write!(f, "RENAME TO {sink_name}")
491            }
492            AlterSinkOperation::ChangeOwner { new_owner_name } => {
493                write!(f, "OWNER TO {}", new_owner_name)
494            }
495            AlterSinkOperation::SetSchema { new_schema_name } => {
496                write!(f, "SET SCHEMA {}", new_schema_name)
497            }
498            AlterSinkOperation::SetParallelism {
499                parallelism,
500                deferred,
501            } => {
502                write!(
503                    f,
504                    "SET PARALLELISM TO {}{}",
505                    parallelism,
506                    if *deferred { " DEFERRED" } else { "" }
507                )
508            }
509            AlterSinkOperation::SwapRenameSink { target_sink } => {
510                write!(f, "SWAP WITH {}", target_sink)
511            }
512            AlterSinkOperation::SetSinkRateLimit { rate_limit } => {
513                write!(f, "SET SINK_RATE_LIMIT TO {}", rate_limit)
514            }
515            AlterSinkOperation::AlterConnectorProps {
516                alter_props: changed_props,
517            } => {
518                write!(
519                    f,
520                    "CONNECTOR WITH ({})",
521                    display_comma_separated(changed_props)
522                )
523            }
524            AlterSinkOperation::SetStreamingEnableUnalignedJoin { enable } => {
525                write!(f, "SET STREAMING_ENABLE_UNALIGNED_JOIN TO {}", enable)
526            }
527        }
528    }
529}
530
531impl fmt::Display for AlterSubscriptionOperation {
532    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
533        match self {
534            AlterSubscriptionOperation::RenameSubscription { subscription_name } => {
535                write!(f, "RENAME TO {subscription_name}")
536            }
537            AlterSubscriptionOperation::ChangeOwner { new_owner_name } => {
538                write!(f, "OWNER TO {}", new_owner_name)
539            }
540            AlterSubscriptionOperation::SetSchema { new_schema_name } => {
541                write!(f, "SET SCHEMA {}", new_schema_name)
542            }
543            AlterSubscriptionOperation::SwapRenameSubscription {
544                target_subscription,
545            } => {
546                write!(f, "SWAP WITH {}", target_subscription)
547            }
548        }
549    }
550}
551
552impl fmt::Display for AlterSourceOperation {
553    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
554        match self {
555            AlterSourceOperation::RenameSource { source_name } => {
556                write!(f, "RENAME TO {source_name}")
557            }
558            AlterSourceOperation::AddColumn { column_def } => {
559                write!(f, "ADD COLUMN {column_def}")
560            }
561            AlterSourceOperation::ChangeOwner { new_owner_name } => {
562                write!(f, "OWNER TO {}", new_owner_name)
563            }
564            AlterSourceOperation::SetSchema { new_schema_name } => {
565                write!(f, "SET SCHEMA {}", new_schema_name)
566            }
567            AlterSourceOperation::FormatEncode { format_encode } => {
568                write!(f, "{format_encode}")
569            }
570            AlterSourceOperation::RefreshSchema => {
571                write!(f, "REFRESH SCHEMA")
572            }
573            AlterSourceOperation::SetSourceRateLimit { rate_limit } => {
574                write!(f, "SET SOURCE_RATE_LIMIT TO {}", rate_limit)
575            }
576            AlterSourceOperation::SwapRenameSource { target_source } => {
577                write!(f, "SWAP WITH {}", target_source)
578            }
579            AlterSourceOperation::SetParallelism {
580                parallelism,
581                deferred,
582            } => {
583                write!(
584                    f,
585                    "SET PARALLELISM TO {}{}",
586                    parallelism,
587                    if *deferred { " DEFERRED" } else { "" }
588                )
589            }
590            AlterSourceOperation::AlterConnectorProps { alter_props } => {
591                write!(
592                    f,
593                    "CONNECTOR WITH ({})",
594                    display_comma_separated(alter_props)
595                )
596            }
597        }
598    }
599}
600
601impl fmt::Display for AlterFunctionOperation {
602    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
603        match self {
604            AlterFunctionOperation::SetSchema { new_schema_name } => {
605                write!(f, "SET SCHEMA {new_schema_name}")
606            }
607        }
608    }
609}
610
611impl fmt::Display for AlterConnectionOperation {
612    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
613        match self {
614            AlterConnectionOperation::SetSchema { new_schema_name } => {
615                write!(f, "SET SCHEMA {new_schema_name}")
616            }
617            AlterConnectionOperation::ChangeOwner { new_owner_name } => {
618                write!(f, "OWNER TO {new_owner_name}")
619            }
620        }
621    }
622}
623
624impl fmt::Display for AlterSecretOperation {
625    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
626        match self {
627            AlterSecretOperation::ChangeCredential { new_credential } => {
628                write!(f, "AS {new_credential}")
629            }
630        }
631    }
632}
633
634/// An `ALTER COLUMN` (`Statement::AlterTable`) operation
635#[derive(Debug, Clone, PartialEq, Eq, Hash)]
636#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
637pub enum AlterColumnOperation {
638    /// `SET NOT NULL`
639    SetNotNull,
640    /// `DROP NOT NULL`
641    DropNotNull,
642    /// `SET DEFAULT <expr>`
643    SetDefault { value: Expr },
644    /// `DROP DEFAULT`
645    DropDefault,
646    /// `[SET DATA] TYPE <data_type> [USING <expr>]`
647    SetDataType {
648        data_type: DataType,
649        /// PostgreSQL specific
650        using: Option<Expr>,
651    },
652}
653
654impl fmt::Display for AlterColumnOperation {
655    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
656        match self {
657            AlterColumnOperation::SetNotNull => write!(f, "SET NOT NULL",),
658            AlterColumnOperation::DropNotNull => write!(f, "DROP NOT NULL",),
659            AlterColumnOperation::SetDefault { value } => {
660                write!(f, "SET DEFAULT {}", value)
661            }
662            AlterColumnOperation::DropDefault => {
663                write!(f, "DROP DEFAULT")
664            }
665            AlterColumnOperation::SetDataType { data_type, using } => {
666                if let Some(expr) = using {
667                    write!(f, "SET DATA TYPE {} USING {}", data_type, expr)
668                } else {
669                    write!(f, "SET DATA TYPE {}", data_type)
670                }
671            }
672        }
673    }
674}
675
676impl fmt::Display for AlterFragmentOperation {
677    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
678        match self {
679            AlterFragmentOperation::AlterBackfillRateLimit { rate_limit } => {
680                write!(f, "SET BACKFILL_RATE_LIMIT TO {}", rate_limit)
681            }
682        }
683    }
684}
685
686/// The watermark on source.
687/// `WATERMARK FOR <column> AS (<expr>)`
688#[derive(Debug, Clone, PartialEq, Eq, Hash)]
689#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
690pub struct SourceWatermark {
691    pub column: Ident,
692    pub expr: Expr,
693}
694
695impl fmt::Display for SourceWatermark {
696    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
697        write!(f, "WATERMARK FOR {} AS {}", self.column, self.expr,)
698    }
699}
700
701/// A table-level constraint, specified in a `CREATE TABLE` or an
702/// `ALTER TABLE ADD <constraint>` statement.
703#[derive(Debug, Clone, PartialEq, Eq, Hash)]
704#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
705pub enum TableConstraint {
706    /// `[ CONSTRAINT <name> ] { PRIMARY KEY | UNIQUE } (<columns>)`
707    Unique {
708        name: Option<Ident>,
709        columns: Vec<Ident>,
710        /// Whether this is a `PRIMARY KEY` or just a `UNIQUE` constraint
711        is_primary: bool,
712    },
713    /// A referential integrity constraint (`[ CONSTRAINT <name> ] FOREIGN KEY (<columns>)
714    /// REFERENCES <foreign_table> (<referred_columns>)
715    /// { [ON DELETE <referential_action>] [ON UPDATE <referential_action>] |
716    ///   [ON UPDATE <referential_action>] [ON DELETE <referential_action>]
717    /// }`).
718    ForeignKey {
719        name: Option<Ident>,
720        columns: Vec<Ident>,
721        foreign_table: ObjectName,
722        referred_columns: Vec<Ident>,
723        on_delete: Option<ReferentialAction>,
724        on_update: Option<ReferentialAction>,
725    },
726    /// `[ CONSTRAINT <name> ] CHECK (<expr>)`
727    Check {
728        name: Option<Ident>,
729        expr: Box<Expr>,
730    },
731}
732
733impl fmt::Display for TableConstraint {
734    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
735        match self {
736            TableConstraint::Unique {
737                name,
738                columns,
739                is_primary,
740            } => write!(
741                f,
742                "{}{} ({})",
743                display_constraint_name(name),
744                if *is_primary { "PRIMARY KEY" } else { "UNIQUE" },
745                display_comma_separated(columns)
746            ),
747            TableConstraint::ForeignKey {
748                name,
749                columns,
750                foreign_table,
751                referred_columns,
752                on_delete,
753                on_update,
754            } => {
755                write!(
756                    f,
757                    "{}FOREIGN KEY ({}) REFERENCES {}({})",
758                    display_constraint_name(name),
759                    display_comma_separated(columns),
760                    foreign_table,
761                    display_comma_separated(referred_columns),
762                )?;
763                if let Some(action) = on_delete {
764                    write!(f, " ON DELETE {}", action)?;
765                }
766                if let Some(action) = on_update {
767                    write!(f, " ON UPDATE {}", action)?;
768                }
769                Ok(())
770            }
771            TableConstraint::Check { name, expr } => {
772                write!(f, "{}CHECK ({})", display_constraint_name(name), expr)
773            }
774        }
775    }
776}
777
778/// SQL column definition
779#[derive(Debug, Clone, PartialEq, Eq, Hash)]
780#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
781pub struct ColumnDef {
782    pub name: Ident,
783    pub data_type: Option<DataType>,
784    pub collation: Option<ObjectName>,
785    pub options: Vec<ColumnOptionDef>,
786}
787
788impl ColumnDef {
789    pub fn new(
790        name: Ident,
791        data_type: DataType,
792        collation: Option<ObjectName>,
793        options: Vec<ColumnOptionDef>,
794    ) -> Self {
795        ColumnDef {
796            name,
797            data_type: Some(data_type),
798            collation,
799            options,
800        }
801    }
802
803    pub fn is_generated(&self) -> bool {
804        self.options
805            .iter()
806            .any(|option| matches!(option.option, ColumnOption::GeneratedColumns(_)))
807    }
808}
809
810impl fmt::Display for ColumnDef {
811    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
812        write!(
813            f,
814            "{} {}",
815            self.name,
816            if let Some(data_type) = &self.data_type {
817                data_type.to_string()
818            } else {
819                "None".to_owned()
820            }
821        )?;
822        for option in &self.options {
823            write!(f, " {}", option)?;
824        }
825        Ok(())
826    }
827}
828
829/// An optionally-named `ColumnOption`: `[ CONSTRAINT <name> ] <column-option>`.
830///
831/// Note that implementations are substantially more permissive than the ANSI
832/// specification on what order column options can be presented in, and whether
833/// they are allowed to be named. The specification distinguishes between
834/// constraints (NOT NULL, UNIQUE, PRIMARY KEY, and CHECK), which can be named
835/// and can appear in any order, and other options (DEFAULT, GENERATED), which
836/// cannot be named and must appear in a fixed order. PostgreSQL, however,
837/// allows preceding any option with `CONSTRAINT <name>`, even those that are
838/// not really constraints, like NULL and DEFAULT. MSSQL is less permissive,
839/// allowing DEFAULT, UNIQUE, PRIMARY KEY and CHECK to be named, but not NULL or
840/// NOT NULL constraints (the last of which is in violation of the spec).
841///
842/// For maximum flexibility, we don't distinguish between constraint and
843/// non-constraint options, lumping them all together under the umbrella of
844/// "column options," and we allow any column option to be named.
845#[derive(Debug, Clone, PartialEq, Eq, Hash)]
846#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
847pub struct ColumnOptionDef {
848    pub name: Option<Ident>,
849    pub option: ColumnOption,
850}
851
852impl fmt::Display for ColumnOptionDef {
853    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
854        write!(f, "{}{}", display_constraint_name(&self.name), self.option)
855    }
856}
857
858/// `ColumnOption`s are modifiers that follow a column definition in a `CREATE
859/// TABLE` statement.
860#[derive(Debug, Clone, PartialEq, Eq, Hash)]
861#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
862pub enum ColumnOption {
863    /// `NULL`
864    Null,
865    /// `NOT NULL`
866    NotNull,
867    /// `DEFAULT <restricted-expr>`
868    DefaultValue(Expr),
869    /// Default value from previous bound `DefaultColumnDesc`. Used internally
870    /// for schema change and should not be specified by users.
871    DefaultValueInternal {
872        /// Protobuf encoded `DefaultColumnDesc`.
873        persisted: Box<[u8]>,
874        /// Optional AST for unparsing. If `None`, the default value will be
875        /// shown as `DEFAULT INTERNAL` which is for demonstrating and should
876        /// not be specified by users.
877        expr: Option<Expr>,
878    },
879    /// `{ PRIMARY KEY | UNIQUE }`
880    Unique { is_primary: bool },
881    /// A referential integrity constraint (`[FOREIGN KEY REFERENCES
882    /// <foreign_table> (<referred_columns>)
883    /// { [ON DELETE <referential_action>] [ON UPDATE <referential_action>] |
884    ///   [ON UPDATE <referential_action>] [ON DELETE <referential_action>]
885    /// }`).
886    ForeignKey {
887        foreign_table: ObjectName,
888        referred_columns: Vec<Ident>,
889        on_delete: Option<ReferentialAction>,
890        on_update: Option<ReferentialAction>,
891    },
892    /// `CHECK (<expr>)`
893    Check(Expr),
894    /// Dialect-specific options, such as:
895    /// - MySQL's `AUTO_INCREMENT` or SQLite's `AUTOINCREMENT`
896    /// - ...
897    DialectSpecific(Vec<Token>),
898    /// AS ( <generation_expr> )`
899    GeneratedColumns(Expr),
900}
901
902impl fmt::Display for ColumnOption {
903    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
904        use ColumnOption::*;
905        match self {
906            Null => write!(f, "NULL"),
907            NotNull => write!(f, "NOT NULL"),
908            DefaultValue(expr) => write!(f, "DEFAULT {}", expr),
909            DefaultValueInternal { persisted: _, expr } => {
910                if let Some(expr) = expr {
911                    write!(f, "DEFAULT {}", expr)
912                } else {
913                    write!(f, "DEFAULT INTERNAL")
914                }
915            }
916            Unique { is_primary } => {
917                write!(f, "{}", if *is_primary { "PRIMARY KEY" } else { "UNIQUE" })
918            }
919            ForeignKey {
920                foreign_table,
921                referred_columns,
922                on_delete,
923                on_update,
924            } => {
925                write!(f, "REFERENCES {}", foreign_table)?;
926                if !referred_columns.is_empty() {
927                    write!(f, " ({})", display_comma_separated(referred_columns))?;
928                }
929                if let Some(action) = on_delete {
930                    write!(f, " ON DELETE {}", action)?;
931                }
932                if let Some(action) = on_update {
933                    write!(f, " ON UPDATE {}", action)?;
934                }
935                Ok(())
936            }
937            Check(expr) => write!(f, "CHECK ({})", expr),
938            DialectSpecific(val) => write!(f, "{}", display_separated(val, " ")),
939            GeneratedColumns(expr) => write!(f, "AS {}", expr),
940        }
941    }
942}
943
944fn display_constraint_name(name: &'_ Option<Ident>) -> impl fmt::Display + '_ {
945    struct ConstraintName<'a>(&'a Option<Ident>);
946    impl fmt::Display for ConstraintName<'_> {
947        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
948            if let Some(name) = self.0 {
949                write!(f, "CONSTRAINT {} ", name)?;
950            }
951            Ok(())
952        }
953    }
954    ConstraintName(name)
955}
956
957/// `<referential_action> =
958/// { RESTRICT | CASCADE | SET NULL | NO ACTION | SET DEFAULT }`
959///
960/// Used in foreign key constraints in `ON UPDATE` and `ON DELETE` options.
961#[derive(Debug, Clone, PartialEq, Eq, Hash)]
962#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
963pub enum ReferentialAction {
964    Restrict,
965    Cascade,
966    SetNull,
967    NoAction,
968    SetDefault,
969}
970
971impl fmt::Display for ReferentialAction {
972    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
973        f.write_str(match self {
974            ReferentialAction::Restrict => "RESTRICT",
975            ReferentialAction::Cascade => "CASCADE",
976            ReferentialAction::SetNull => "SET NULL",
977            ReferentialAction::NoAction => "NO ACTION",
978            ReferentialAction::SetDefault => "SET DEFAULT",
979        })
980    }
981}
982
983/// secure secret definition for webhook source
984#[derive(Debug, Clone, PartialEq, Eq, Hash)]
985#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
986pub struct WebhookSourceInfo {
987    pub secret_ref: Option<SecretRefValue>,
988    pub signature_expr: Expr,
989    pub wait_for_persistence: bool,
990    pub is_batched: bool,
991}