risingwave_sqlparser/ast/
ddl.rs

1// Licensed under the Apache License, Version 2.0 (the "License");
2// you may not use this file except in compliance with the License.
3// You may obtain a copy of the License at
4//
5//     http://www.apache.org/licenses/LICENSE-2.0
6//
7// Unless required by applicable law or agreed to in writing, software
8// distributed under the License is distributed on an "AS IS" BASIS,
9// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10// See the License for the specific language governing permissions and
11// limitations under the License.
12
13//! AST types specific to CREATE/ALTER variants of [`crate::ast::Statement`]
14//! (commonly referred to as Data Definition Language, or DDL)
15
16#[cfg(not(feature = "std"))]
17use alloc::{boxed::Box, string::ToString, vec::Vec};
18use core::fmt;
19
20#[cfg(feature = "serde")]
21use serde::{Deserialize, Serialize};
22
23use super::{ConfigParam, FormatEncodeOptions, SqlOption};
24use crate::ast::{
25    DataType, Expr, Ident, ObjectName, Query, SecretRefValue, SetVariableValue, Value,
26    display_comma_separated, display_separated,
27};
28use crate::tokenizer::Token;
29
30#[derive(Debug, Clone, PartialEq, Eq, Hash)]
31#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
32pub enum AlterDatabaseOperation {
33    ChangeOwner { new_owner_name: Ident },
34    RenameDatabase { database_name: ObjectName },
35    SetParam(ConfigParam),
36}
37
38#[derive(Debug, Clone, PartialEq, Eq, Hash)]
39#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
40pub enum AlterSchemaOperation {
41    ChangeOwner { new_owner_name: Ident },
42    RenameSchema { schema_name: ObjectName },
43    SwapRenameSchema { target_schema: ObjectName },
44}
45
46/// An `ALTER TABLE` (`Statement::AlterTable`) operation
47#[derive(Debug, Clone, PartialEq, Eq, Hash)]
48#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
49pub enum AlterTableOperation {
50    /// `ADD <table_constraint>`
51    AddConstraint(TableConstraint),
52    /// `ADD [ COLUMN ] <column_def>`
53    AddColumn {
54        column_def: ColumnDef,
55    },
56    /// TODO: implement `DROP CONSTRAINT <name>`
57    DropConstraint {
58        name: Ident,
59    },
60    /// `DROP [ COLUMN ] [ IF EXISTS ] <column_name> [ CASCADE ]`
61    DropColumn {
62        column_name: Ident,
63        if_exists: bool,
64        cascade: bool,
65    },
66    /// `RENAME [ COLUMN ] <old_column_name> TO <new_column_name>`
67    RenameColumn {
68        old_column_name: Ident,
69        new_column_name: Ident,
70    },
71    /// `RENAME TO <table_name>`
72    RenameTable {
73        table_name: ObjectName,
74    },
75    // CHANGE [ COLUMN ] <old_name> <new_name> <data_type> [ <options> ]
76    ChangeColumn {
77        old_name: Ident,
78        new_name: Ident,
79        data_type: DataType,
80        options: Vec<ColumnOption>,
81    },
82    /// `RENAME CONSTRAINT <old_constraint_name> TO <new_constraint_name>`
83    ///
84    /// Note: this is a PostgreSQL-specific operation.
85    RenameConstraint {
86        old_name: Ident,
87        new_name: Ident,
88    },
89    /// `ALTER [ COLUMN ]`
90    AlterColumn {
91        column_name: Ident,
92        op: AlterColumnOperation,
93    },
94    /// `OWNER TO <owner_name>`
95    ChangeOwner {
96        new_owner_name: Ident,
97    },
98    /// `SET SCHEMA <schema_name>`
99    SetSchema {
100        new_schema_name: ObjectName,
101    },
102    /// `SET PARALLELISM TO <parallelism> [ DEFERRED ]`
103    SetParallelism {
104        parallelism: SetVariableValue,
105        deferred: bool,
106    },
107    RefreshSchema,
108    /// `SET SOURCE_RATE_LIMIT TO <rate_limit>`
109    SetSourceRateLimit {
110        rate_limit: i32,
111    },
112    /// SET BACKFILL_RATE_LIMIT TO <rate_limit>
113    SetBackfillRateLimit {
114        rate_limit: i32,
115    },
116    /// `SET DML_RATE_LIMIT TO <rate_limit>`
117    SetDmlRateLimit {
118        rate_limit: i32,
119    },
120    /// `SWAP WITH <table_name>`
121    SwapRenameTable {
122        target_table: ObjectName,
123    },
124    /// `DROP CONNECTOR`
125    DropConnector,
126    /// `ALTER CONNECTOR WITH (<connector_props>)`
127    AlterConnectorProps {
128        alter_props: Vec<SqlOption>,
129    },
130}
131
132#[derive(Debug, Clone, PartialEq, Eq, Hash)]
133#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
134pub enum AlterIndexOperation {
135    RenameIndex {
136        index_name: ObjectName,
137    },
138    /// `SET PARALLELISM TO <parallelism> [ DEFERRED ]`
139    SetParallelism {
140        parallelism: SetVariableValue,
141        deferred: bool,
142    },
143}
144
145#[derive(Debug, Clone, PartialEq, Eq, Hash)]
146#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
147pub enum AlterViewOperation {
148    RenameView {
149        view_name: ObjectName,
150    },
151    ChangeOwner {
152        new_owner_name: Ident,
153    },
154    SetSchema {
155        new_schema_name: ObjectName,
156    },
157    /// `SET PARALLELISM TO <parallelism> [ DEFERRED ]`
158    SetParallelism {
159        parallelism: SetVariableValue,
160        deferred: bool,
161    },
162    /// `SET RESOURCE_GROUP TO 'RESOURCE GROUP' [ DEFERRED ]`
163    /// `RESET RESOURCE_GROUP [ DEFERRED ]`
164    SetResourceGroup {
165        resource_group: Option<SetVariableValue>,
166        deferred: bool,
167    },
168    /// `SET BACKFILL_RATE_LIMIT TO <rate_limit>`
169    SetBackfillRateLimit {
170        rate_limit: i32,
171    },
172    /// `SWAP WITH <view_name>`
173    SwapRenameView {
174        target_view: ObjectName,
175    },
176    SetStreamingEnableUnalignedJoin {
177        enable: bool,
178    },
179    /// `AS <query>`
180    AsQuery {
181        query: Box<Query>,
182    },
183}
184
185#[derive(Debug, Clone, PartialEq, Eq, Hash)]
186#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
187pub enum AlterSinkOperation {
188    RenameSink {
189        sink_name: ObjectName,
190    },
191    ChangeOwner {
192        new_owner_name: Ident,
193    },
194    SetSchema {
195        new_schema_name: ObjectName,
196    },
197    /// `SET PARALLELISM TO <parallelism> [ DEFERRED ]`
198    SetParallelism {
199        parallelism: SetVariableValue,
200        deferred: bool,
201    },
202    /// `SWAP WITH <sink_name>`
203    SwapRenameSink {
204        target_sink: ObjectName,
205    },
206    SetSinkRateLimit {
207        rate_limit: i32,
208    },
209    AlterConnectorProps {
210        alter_props: Vec<SqlOption>,
211    },
212    SetStreamingEnableUnalignedJoin {
213        enable: bool,
214    },
215}
216
217#[derive(Debug, Clone, PartialEq, Eq, Hash)]
218#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
219pub enum AlterSubscriptionOperation {
220    RenameSubscription { subscription_name: ObjectName },
221    ChangeOwner { new_owner_name: Ident },
222    SetSchema { new_schema_name: ObjectName },
223    SwapRenameSubscription { target_subscription: ObjectName },
224}
225
226#[derive(Debug, Clone, PartialEq, Eq, Hash)]
227#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
228pub enum AlterSourceOperation {
229    RenameSource {
230        source_name: ObjectName,
231    },
232    AddColumn {
233        column_def: ColumnDef,
234    },
235    ChangeOwner {
236        new_owner_name: Ident,
237    },
238    SetSchema {
239        new_schema_name: ObjectName,
240    },
241    FormatEncode {
242        format_encode: FormatEncodeOptions,
243    },
244    RefreshSchema,
245    SetSourceRateLimit {
246        rate_limit: i32,
247    },
248    SwapRenameSource {
249        target_source: ObjectName,
250    },
251    /// `SET PARALLELISM TO <parallelism> [ DEFERRED ]`
252    SetParallelism {
253        parallelism: SetVariableValue,
254        deferred: bool,
255    },
256    AlterConnectorProps {
257        alter_props: Vec<SqlOption>,
258    },
259}
260
261#[derive(Debug, Clone, PartialEq, Eq, Hash)]
262#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
263pub enum AlterFunctionOperation {
264    SetSchema { new_schema_name: ObjectName },
265}
266
267#[derive(Debug, Clone, PartialEq, Eq, Hash)]
268#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
269pub enum AlterConnectionOperation {
270    SetSchema { new_schema_name: ObjectName },
271    ChangeOwner { new_owner_name: Ident },
272}
273
274#[derive(Debug, Clone, PartialEq, Eq, Hash)]
275#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
276pub enum AlterSecretOperation {
277    ChangeCredential { new_credential: Value },
278}
279
280#[derive(Debug, Clone, PartialEq, Eq, Hash)]
281#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
282pub enum AlterFragmentOperation {
283    AlterBackfillRateLimit { rate_limit: i32 },
284}
285
286impl fmt::Display for AlterDatabaseOperation {
287    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
288        match self {
289            AlterDatabaseOperation::ChangeOwner { new_owner_name } => {
290                write!(f, "OWNER TO {}", new_owner_name)
291            }
292            AlterDatabaseOperation::RenameDatabase { database_name } => {
293                write!(f, "RENAME TO {}", database_name)
294            }
295            AlterDatabaseOperation::SetParam(ConfigParam { param, value }) => {
296                write!(f, "SET {} TO {}", param, value)
297            }
298        }
299    }
300}
301
302impl fmt::Display for AlterSchemaOperation {
303    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
304        match self {
305            AlterSchemaOperation::ChangeOwner { new_owner_name } => {
306                write!(f, "OWNER TO {}", new_owner_name)
307            }
308            AlterSchemaOperation::RenameSchema { schema_name } => {
309                write!(f, "RENAME TO {}", schema_name)
310            }
311            AlterSchemaOperation::SwapRenameSchema { target_schema } => {
312                write!(f, "SWAP WITH {}", target_schema)
313            }
314        }
315    }
316}
317
318impl fmt::Display for AlterTableOperation {
319    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
320        match self {
321            AlterTableOperation::AddConstraint(c) => write!(f, "ADD {}", c),
322            AlterTableOperation::AddColumn { column_def } => {
323                write!(f, "ADD COLUMN {}", column_def)
324            }
325            AlterTableOperation::AlterColumn { column_name, op } => {
326                write!(f, "ALTER COLUMN {} {}", column_name, op)
327            }
328            AlterTableOperation::DropConstraint { name } => write!(f, "DROP CONSTRAINT {}", name),
329            AlterTableOperation::DropColumn {
330                column_name,
331                if_exists,
332                cascade,
333            } => write!(
334                f,
335                "DROP COLUMN {}{}{}",
336                if *if_exists { "IF EXISTS " } else { "" },
337                column_name,
338                if *cascade { " CASCADE" } else { "" }
339            ),
340            AlterTableOperation::RenameColumn {
341                old_column_name,
342                new_column_name,
343            } => write!(
344                f,
345                "RENAME COLUMN {} TO {}",
346                old_column_name, new_column_name
347            ),
348            AlterTableOperation::RenameTable { table_name } => {
349                write!(f, "RENAME TO {}", table_name)
350            }
351            AlterTableOperation::ChangeColumn {
352                old_name,
353                new_name,
354                data_type,
355                options,
356            } => {
357                write!(f, "CHANGE COLUMN {} {} {}", old_name, new_name, data_type)?;
358                if options.is_empty() {
359                    Ok(())
360                } else {
361                    write!(f, " {}", display_separated(options, " "))
362                }
363            }
364            AlterTableOperation::RenameConstraint { old_name, new_name } => {
365                write!(f, "RENAME CONSTRAINT {} TO {}", old_name, new_name)
366            }
367            AlterTableOperation::ChangeOwner { new_owner_name } => {
368                write!(f, "OWNER TO {}", new_owner_name)
369            }
370            AlterTableOperation::SetSchema { new_schema_name } => {
371                write!(f, "SET SCHEMA {}", new_schema_name)
372            }
373            AlterTableOperation::SetParallelism {
374                parallelism,
375                deferred,
376            } => {
377                write!(
378                    f,
379                    "SET PARALLELISM TO {}{}",
380                    parallelism,
381                    if *deferred { " DEFERRED" } else { "" }
382                )
383            }
384            AlterTableOperation::RefreshSchema => {
385                write!(f, "REFRESH SCHEMA")
386            }
387            AlterTableOperation::SetSourceRateLimit { rate_limit } => {
388                write!(f, "SET SOURCE_RATE_LIMIT TO {}", rate_limit)
389            }
390            AlterTableOperation::SetBackfillRateLimit { rate_limit } => {
391                write!(f, "SET BACKFILL_RATE_LIMIT TO {}", rate_limit)
392            }
393            AlterTableOperation::SetDmlRateLimit { rate_limit } => {
394                write!(f, "SET DML_RATE_LIMIT TO {}", rate_limit)
395            }
396            AlterTableOperation::SwapRenameTable { target_table } => {
397                write!(f, "SWAP WITH {}", target_table)
398            }
399            AlterTableOperation::DropConnector => {
400                write!(f, "DROP CONNECTOR")
401            }
402            AlterTableOperation::AlterConnectorProps { alter_props } => {
403                write!(
404                    f,
405                    "CONNECTOR WITH ({})",
406                    display_comma_separated(alter_props)
407                )
408            }
409        }
410    }
411}
412
413impl fmt::Display for AlterIndexOperation {
414    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
415        match self {
416            AlterIndexOperation::RenameIndex { index_name } => {
417                write!(f, "RENAME TO {index_name}")
418            }
419            AlterIndexOperation::SetParallelism {
420                parallelism,
421                deferred,
422            } => {
423                write!(
424                    f,
425                    "SET PARALLELISM TO {}{}",
426                    parallelism,
427                    if *deferred { " DEFERRED" } else { "" }
428                )
429            }
430        }
431    }
432}
433
434impl fmt::Display for AlterViewOperation {
435    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
436        match self {
437            AlterViewOperation::RenameView { view_name } => {
438                write!(f, "RENAME TO {view_name}")
439            }
440            AlterViewOperation::ChangeOwner { new_owner_name } => {
441                write!(f, "OWNER TO {}", new_owner_name)
442            }
443            AlterViewOperation::SetSchema { new_schema_name } => {
444                write!(f, "SET SCHEMA {}", new_schema_name)
445            }
446            AlterViewOperation::SetParallelism {
447                parallelism,
448                deferred,
449            } => {
450                write!(
451                    f,
452                    "SET PARALLELISM TO {}{}",
453                    parallelism,
454                    if *deferred { " DEFERRED" } else { "" }
455                )
456            }
457            AlterViewOperation::SetBackfillRateLimit { rate_limit } => {
458                write!(f, "SET BACKFILL_RATE_LIMIT TO {}", rate_limit)
459            }
460            AlterViewOperation::SwapRenameView { target_view } => {
461                write!(f, "SWAP WITH {}", target_view)
462            }
463            AlterViewOperation::SetResourceGroup {
464                resource_group,
465                deferred,
466            } => {
467                let deferred = if *deferred { " DEFERRED" } else { "" };
468
469                if let Some(resource_group) = resource_group {
470                    write!(f, "SET RESOURCE_GROUP TO {} {}", resource_group, deferred)
471                } else {
472                    write!(f, "RESET RESOURCE_GROUP {}", deferred)
473                }
474            }
475            AlterViewOperation::SetStreamingEnableUnalignedJoin { enable } => {
476                write!(f, "SET STREAMING_ENABLE_UNALIGNED_JOIN TO {}", enable)
477            }
478            AlterViewOperation::AsQuery { query } => {
479                write!(f, "AS {}", query)
480            }
481        }
482    }
483}
484
485impl fmt::Display for AlterSinkOperation {
486    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
487        match self {
488            AlterSinkOperation::RenameSink { sink_name } => {
489                write!(f, "RENAME TO {sink_name}")
490            }
491            AlterSinkOperation::ChangeOwner { new_owner_name } => {
492                write!(f, "OWNER TO {}", new_owner_name)
493            }
494            AlterSinkOperation::SetSchema { new_schema_name } => {
495                write!(f, "SET SCHEMA {}", new_schema_name)
496            }
497            AlterSinkOperation::SetParallelism {
498                parallelism,
499                deferred,
500            } => {
501                write!(
502                    f,
503                    "SET PARALLELISM TO {}{}",
504                    parallelism,
505                    if *deferred { " DEFERRED" } else { "" }
506                )
507            }
508            AlterSinkOperation::SwapRenameSink { target_sink } => {
509                write!(f, "SWAP WITH {}", target_sink)
510            }
511            AlterSinkOperation::SetSinkRateLimit { rate_limit } => {
512                write!(f, "SET SINK_RATE_LIMIT TO {}", rate_limit)
513            }
514            AlterSinkOperation::AlterConnectorProps {
515                alter_props: changed_props,
516            } => {
517                write!(
518                    f,
519                    "CONNECTOR WITH ({})",
520                    display_comma_separated(changed_props)
521                )
522            }
523            AlterSinkOperation::SetStreamingEnableUnalignedJoin { enable } => {
524                write!(f, "SET STREAMING_ENABLE_UNALIGNED_JOIN TO {}", enable)
525            }
526        }
527    }
528}
529
530impl fmt::Display for AlterSubscriptionOperation {
531    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
532        match self {
533            AlterSubscriptionOperation::RenameSubscription { subscription_name } => {
534                write!(f, "RENAME TO {subscription_name}")
535            }
536            AlterSubscriptionOperation::ChangeOwner { new_owner_name } => {
537                write!(f, "OWNER TO {}", new_owner_name)
538            }
539            AlterSubscriptionOperation::SetSchema { new_schema_name } => {
540                write!(f, "SET SCHEMA {}", new_schema_name)
541            }
542            AlterSubscriptionOperation::SwapRenameSubscription {
543                target_subscription,
544            } => {
545                write!(f, "SWAP WITH {}", target_subscription)
546            }
547        }
548    }
549}
550
551impl fmt::Display for AlterSourceOperation {
552    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
553        match self {
554            AlterSourceOperation::RenameSource { source_name } => {
555                write!(f, "RENAME TO {source_name}")
556            }
557            AlterSourceOperation::AddColumn { column_def } => {
558                write!(f, "ADD COLUMN {column_def}")
559            }
560            AlterSourceOperation::ChangeOwner { new_owner_name } => {
561                write!(f, "OWNER TO {}", new_owner_name)
562            }
563            AlterSourceOperation::SetSchema { new_schema_name } => {
564                write!(f, "SET SCHEMA {}", new_schema_name)
565            }
566            AlterSourceOperation::FormatEncode { format_encode } => {
567                write!(f, "{format_encode}")
568            }
569            AlterSourceOperation::RefreshSchema => {
570                write!(f, "REFRESH SCHEMA")
571            }
572            AlterSourceOperation::SetSourceRateLimit { rate_limit } => {
573                write!(f, "SET SOURCE_RATE_LIMIT TO {}", rate_limit)
574            }
575            AlterSourceOperation::SwapRenameSource { target_source } => {
576                write!(f, "SWAP WITH {}", target_source)
577            }
578            AlterSourceOperation::SetParallelism {
579                parallelism,
580                deferred,
581            } => {
582                write!(
583                    f,
584                    "SET PARALLELISM TO {}{}",
585                    parallelism,
586                    if *deferred { " DEFERRED" } else { "" }
587                )
588            }
589            AlterSourceOperation::AlterConnectorProps { alter_props } => {
590                write!(
591                    f,
592                    "CONNECTOR WITH ({})",
593                    display_comma_separated(alter_props)
594                )
595            }
596        }
597    }
598}
599
600impl fmt::Display for AlterFunctionOperation {
601    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
602        match self {
603            AlterFunctionOperation::SetSchema { new_schema_name } => {
604                write!(f, "SET SCHEMA {new_schema_name}")
605            }
606        }
607    }
608}
609
610impl fmt::Display for AlterConnectionOperation {
611    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
612        match self {
613            AlterConnectionOperation::SetSchema { new_schema_name } => {
614                write!(f, "SET SCHEMA {new_schema_name}")
615            }
616            AlterConnectionOperation::ChangeOwner { new_owner_name } => {
617                write!(f, "OWNER TO {new_owner_name}")
618            }
619        }
620    }
621}
622
623impl fmt::Display for AlterSecretOperation {
624    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
625        match self {
626            AlterSecretOperation::ChangeCredential { new_credential } => {
627                write!(f, "AS {new_credential}")
628            }
629        }
630    }
631}
632
633/// An `ALTER COLUMN` (`Statement::AlterTable`) operation
634#[derive(Debug, Clone, PartialEq, Eq, Hash)]
635#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
636pub enum AlterColumnOperation {
637    /// `SET NOT NULL`
638    SetNotNull,
639    /// `DROP NOT NULL`
640    DropNotNull,
641    /// `SET DEFAULT <expr>`
642    SetDefault { value: Expr },
643    /// `DROP DEFAULT`
644    DropDefault,
645    /// `[SET DATA] TYPE <data_type> [USING <expr>]`
646    SetDataType {
647        data_type: DataType,
648        /// PostgreSQL specific
649        using: Option<Expr>,
650    },
651}
652
653impl fmt::Display for AlterColumnOperation {
654    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
655        match self {
656            AlterColumnOperation::SetNotNull => write!(f, "SET NOT NULL",),
657            AlterColumnOperation::DropNotNull => write!(f, "DROP NOT NULL",),
658            AlterColumnOperation::SetDefault { value } => {
659                write!(f, "SET DEFAULT {}", value)
660            }
661            AlterColumnOperation::DropDefault => {
662                write!(f, "DROP DEFAULT")
663            }
664            AlterColumnOperation::SetDataType { data_type, using } => {
665                if let Some(expr) = using {
666                    write!(f, "SET DATA TYPE {} USING {}", data_type, expr)
667                } else {
668                    write!(f, "SET DATA TYPE {}", data_type)
669                }
670            }
671        }
672    }
673}
674
675impl fmt::Display for AlterFragmentOperation {
676    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
677        match self {
678            AlterFragmentOperation::AlterBackfillRateLimit { rate_limit } => {
679                write!(f, "SET BACKFILL_RATE_LIMIT TO {}", rate_limit)
680            }
681        }
682    }
683}
684
685/// The watermark on source.
686/// `WATERMARK FOR <column> AS (<expr>)`
687#[derive(Debug, Clone, PartialEq, Eq, Hash)]
688#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
689pub struct SourceWatermark {
690    pub column: Ident,
691    pub expr: Expr,
692}
693
694impl fmt::Display for SourceWatermark {
695    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
696        write!(f, "WATERMARK FOR {} AS {}", self.column, self.expr,)
697    }
698}
699
700/// A table-level constraint, specified in a `CREATE TABLE` or an
701/// `ALTER TABLE ADD <constraint>` statement.
702#[derive(Debug, Clone, PartialEq, Eq, Hash)]
703#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
704pub enum TableConstraint {
705    /// `[ CONSTRAINT <name> ] { PRIMARY KEY | UNIQUE } (<columns>)`
706    Unique {
707        name: Option<Ident>,
708        columns: Vec<Ident>,
709        /// Whether this is a `PRIMARY KEY` or just a `UNIQUE` constraint
710        is_primary: bool,
711    },
712    /// A referential integrity constraint (`[ CONSTRAINT <name> ] FOREIGN KEY (<columns>)
713    /// REFERENCES <foreign_table> (<referred_columns>)
714    /// { [ON DELETE <referential_action>] [ON UPDATE <referential_action>] |
715    ///   [ON UPDATE <referential_action>] [ON DELETE <referential_action>]
716    /// }`).
717    ForeignKey {
718        name: Option<Ident>,
719        columns: Vec<Ident>,
720        foreign_table: ObjectName,
721        referred_columns: Vec<Ident>,
722        on_delete: Option<ReferentialAction>,
723        on_update: Option<ReferentialAction>,
724    },
725    /// `[ CONSTRAINT <name> ] CHECK (<expr>)`
726    Check {
727        name: Option<Ident>,
728        expr: Box<Expr>,
729    },
730}
731
732impl fmt::Display for TableConstraint {
733    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
734        match self {
735            TableConstraint::Unique {
736                name,
737                columns,
738                is_primary,
739            } => write!(
740                f,
741                "{}{} ({})",
742                display_constraint_name(name),
743                if *is_primary { "PRIMARY KEY" } else { "UNIQUE" },
744                display_comma_separated(columns)
745            ),
746            TableConstraint::ForeignKey {
747                name,
748                columns,
749                foreign_table,
750                referred_columns,
751                on_delete,
752                on_update,
753            } => {
754                write!(
755                    f,
756                    "{}FOREIGN KEY ({}) REFERENCES {}({})",
757                    display_constraint_name(name),
758                    display_comma_separated(columns),
759                    foreign_table,
760                    display_comma_separated(referred_columns),
761                )?;
762                if let Some(action) = on_delete {
763                    write!(f, " ON DELETE {}", action)?;
764                }
765                if let Some(action) = on_update {
766                    write!(f, " ON UPDATE {}", action)?;
767                }
768                Ok(())
769            }
770            TableConstraint::Check { name, expr } => {
771                write!(f, "{}CHECK ({})", display_constraint_name(name), expr)
772            }
773        }
774    }
775}
776
777/// SQL column definition
778#[derive(Debug, Clone, PartialEq, Eq, Hash)]
779#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
780pub struct ColumnDef {
781    pub name: Ident,
782    pub data_type: Option<DataType>,
783    pub collation: Option<ObjectName>,
784    pub options: Vec<ColumnOptionDef>,
785}
786
787impl ColumnDef {
788    pub fn new(
789        name: Ident,
790        data_type: DataType,
791        collation: Option<ObjectName>,
792        options: Vec<ColumnOptionDef>,
793    ) -> Self {
794        ColumnDef {
795            name,
796            data_type: Some(data_type),
797            collation,
798            options,
799        }
800    }
801
802    pub fn is_generated(&self) -> bool {
803        self.options
804            .iter()
805            .any(|option| matches!(option.option, ColumnOption::GeneratedColumns(_)))
806    }
807}
808
809impl fmt::Display for ColumnDef {
810    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
811        write!(
812            f,
813            "{} {}",
814            self.name,
815            if let Some(data_type) = &self.data_type {
816                data_type.to_string()
817            } else {
818                "None".to_owned()
819            }
820        )?;
821        for option in &self.options {
822            write!(f, " {}", option)?;
823        }
824        Ok(())
825    }
826}
827
828/// An optionally-named `ColumnOption`: `[ CONSTRAINT <name> ] <column-option>`.
829///
830/// Note that implementations are substantially more permissive than the ANSI
831/// specification on what order column options can be presented in, and whether
832/// they are allowed to be named. The specification distinguishes between
833/// constraints (NOT NULL, UNIQUE, PRIMARY KEY, and CHECK), which can be named
834/// and can appear in any order, and other options (DEFAULT, GENERATED), which
835/// cannot be named and must appear in a fixed order. PostgreSQL, however,
836/// allows preceding any option with `CONSTRAINT <name>`, even those that are
837/// not really constraints, like NULL and DEFAULT. MSSQL is less permissive,
838/// allowing DEFAULT, UNIQUE, PRIMARY KEY and CHECK to be named, but not NULL or
839/// NOT NULL constraints (the last of which is in violation of the spec).
840///
841/// For maximum flexibility, we don't distinguish between constraint and
842/// non-constraint options, lumping them all together under the umbrella of
843/// "column options," and we allow any column option to be named.
844#[derive(Debug, Clone, PartialEq, Eq, Hash)]
845#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
846pub struct ColumnOptionDef {
847    pub name: Option<Ident>,
848    pub option: ColumnOption,
849}
850
851impl fmt::Display for ColumnOptionDef {
852    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
853        write!(f, "{}{}", display_constraint_name(&self.name), self.option)
854    }
855}
856
857/// `ColumnOption`s are modifiers that follow a column definition in a `CREATE
858/// TABLE` statement.
859#[derive(Debug, Clone, PartialEq, Eq, Hash)]
860#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
861pub enum ColumnOption {
862    /// `NULL`
863    Null,
864    /// `NOT NULL`
865    NotNull,
866    /// `DEFAULT <restricted-expr>`
867    DefaultValue(Expr),
868    /// Default value from previous bound `DefaultColumnDesc`. Used internally
869    /// for schema change and should not be specified by users.
870    DefaultValueInternal {
871        /// Protobuf encoded `DefaultColumnDesc`.
872        persisted: Box<[u8]>,
873        /// Optional AST for unparsing. If `None`, the default value will be
874        /// shown as `DEFAULT INTERNAL` which is for demonstrating and should
875        /// not be specified by users.
876        expr: Option<Expr>,
877    },
878    /// `{ PRIMARY KEY | UNIQUE }`
879    Unique { is_primary: bool },
880    /// A referential integrity constraint (`[FOREIGN KEY REFERENCES
881    /// <foreign_table> (<referred_columns>)
882    /// { [ON DELETE <referential_action>] [ON UPDATE <referential_action>] |
883    ///   [ON UPDATE <referential_action>] [ON DELETE <referential_action>]
884    /// }`).
885    ForeignKey {
886        foreign_table: ObjectName,
887        referred_columns: Vec<Ident>,
888        on_delete: Option<ReferentialAction>,
889        on_update: Option<ReferentialAction>,
890    },
891    /// `CHECK (<expr>)`
892    Check(Expr),
893    /// Dialect-specific options, such as:
894    /// - MySQL's `AUTO_INCREMENT` or SQLite's `AUTOINCREMENT`
895    /// - ...
896    DialectSpecific(Vec<Token>),
897    /// AS ( <generation_expr> )`
898    GeneratedColumns(Expr),
899}
900
901impl fmt::Display for ColumnOption {
902    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
903        use ColumnOption::*;
904        match self {
905            Null => write!(f, "NULL"),
906            NotNull => write!(f, "NOT NULL"),
907            DefaultValue(expr) => write!(f, "DEFAULT {}", expr),
908            DefaultValueInternal { persisted: _, expr } => {
909                if let Some(expr) = expr {
910                    write!(f, "DEFAULT {}", expr)
911                } else {
912                    write!(f, "DEFAULT INTERNAL")
913                }
914            }
915            Unique { is_primary } => {
916                write!(f, "{}", if *is_primary { "PRIMARY KEY" } else { "UNIQUE" })
917            }
918            ForeignKey {
919                foreign_table,
920                referred_columns,
921                on_delete,
922                on_update,
923            } => {
924                write!(f, "REFERENCES {}", foreign_table)?;
925                if !referred_columns.is_empty() {
926                    write!(f, " ({})", display_comma_separated(referred_columns))?;
927                }
928                if let Some(action) = on_delete {
929                    write!(f, " ON DELETE {}", action)?;
930                }
931                if let Some(action) = on_update {
932                    write!(f, " ON UPDATE {}", action)?;
933                }
934                Ok(())
935            }
936            Check(expr) => write!(f, "CHECK ({})", expr),
937            DialectSpecific(val) => write!(f, "{}", display_separated(val, " ")),
938            GeneratedColumns(expr) => write!(f, "AS {}", expr),
939        }
940    }
941}
942
943fn display_constraint_name(name: &'_ Option<Ident>) -> impl fmt::Display + '_ {
944    struct ConstraintName<'a>(&'a Option<Ident>);
945    impl fmt::Display for ConstraintName<'_> {
946        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
947            if let Some(name) = self.0 {
948                write!(f, "CONSTRAINT {} ", name)?;
949            }
950            Ok(())
951        }
952    }
953    ConstraintName(name)
954}
955
956/// `<referential_action> =
957/// { RESTRICT | CASCADE | SET NULL | NO ACTION | SET DEFAULT }`
958///
959/// Used in foreign key constraints in `ON UPDATE` and `ON DELETE` options.
960#[derive(Debug, Clone, PartialEq, Eq, Hash)]
961#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
962pub enum ReferentialAction {
963    Restrict,
964    Cascade,
965    SetNull,
966    NoAction,
967    SetDefault,
968}
969
970impl fmt::Display for ReferentialAction {
971    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
972        f.write_str(match self {
973            ReferentialAction::Restrict => "RESTRICT",
974            ReferentialAction::Cascade => "CASCADE",
975            ReferentialAction::SetNull => "SET NULL",
976            ReferentialAction::NoAction => "NO ACTION",
977            ReferentialAction::SetDefault => "SET DEFAULT",
978        })
979    }
980}
981
982/// secure secret definition for webhook source
983#[derive(Debug, Clone, PartialEq, Eq, Hash)]
984#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
985pub struct WebhookSourceInfo {
986    pub secret_ref: Option<SecretRefValue>,
987    pub signature_expr: Expr,
988    pub wait_for_persistence: bool,
989    pub is_batched: bool,
990}