risingwave_sqlparser/ast/
ddl.rs

1// Licensed under the Apache License, Version 2.0 (the "License");
2// you may not use this file except in compliance with the License.
3// You may obtain a copy of the License at
4//
5//     http://www.apache.org/licenses/LICENSE-2.0
6//
7// Unless required by applicable law or agreed to in writing, software
8// distributed under the License is distributed on an "AS IS" BASIS,
9// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10// See the License for the specific language governing permissions and
11// limitations under the License.
12
13//! AST types specific to CREATE/ALTER variants of [`crate::ast::Statement`]
14//! (commonly referred to as Data Definition Language, or DDL)
15
16#[cfg(not(feature = "std"))]
17use alloc::{boxed::Box, string::ToString, vec::Vec};
18use core::fmt;
19
20#[cfg(feature = "serde")]
21use serde::{Deserialize, Serialize};
22
23use super::{ConfigParam, FormatEncodeOptions, SqlOption};
24use crate::ast::{
25    DataType, Expr, Ident, ObjectName, Query, SecretRefValue, SetVariableValue, Value,
26    display_comma_separated, display_separated,
27};
28use crate::tokenizer::Token;
29
30#[derive(Debug, Clone, PartialEq, Eq, Hash)]
31#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
32pub enum AlterDatabaseOperation {
33    ChangeOwner { new_owner_name: Ident },
34    RenameDatabase { database_name: ObjectName },
35    SetParam(ConfigParam),
36}
37
38#[derive(Debug, Clone, PartialEq, Eq, Hash)]
39#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
40pub enum AlterSchemaOperation {
41    ChangeOwner { new_owner_name: Ident },
42    RenameSchema { schema_name: ObjectName },
43    SwapRenameSchema { target_schema: ObjectName },
44}
45
46/// An `ALTER TABLE` (`Statement::AlterTable`) operation
47#[derive(Debug, Clone, PartialEq, Eq, Hash)]
48#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
49pub enum AlterTableOperation {
50    /// `ADD <table_constraint>`
51    AddConstraint(TableConstraint),
52    /// `ADD [ COLUMN ] <column_def>`
53    AddColumn {
54        column_def: ColumnDef,
55    },
56    /// TODO: implement `DROP CONSTRAINT <name>`
57    DropConstraint {
58        name: Ident,
59    },
60    /// `DROP [ COLUMN ] [ IF EXISTS ] <column_name> [ CASCADE ]`
61    DropColumn {
62        column_name: Ident,
63        if_exists: bool,
64        cascade: bool,
65    },
66    /// `RENAME [ COLUMN ] <old_column_name> TO <new_column_name>`
67    RenameColumn {
68        old_column_name: Ident,
69        new_column_name: Ident,
70    },
71    /// `RENAME TO <table_name>`
72    RenameTable {
73        table_name: ObjectName,
74    },
75    // CHANGE [ COLUMN ] <old_name> <new_name> <data_type> [ <options> ]
76    ChangeColumn {
77        old_name: Ident,
78        new_name: Ident,
79        data_type: DataType,
80        options: Vec<ColumnOption>,
81    },
82    /// `RENAME CONSTRAINT <old_constraint_name> TO <new_constraint_name>`
83    ///
84    /// Note: this is a PostgreSQL-specific operation.
85    RenameConstraint {
86        old_name: Ident,
87        new_name: Ident,
88    },
89    /// `ALTER [ COLUMN ]`
90    AlterColumn {
91        column_name: Ident,
92        op: AlterColumnOperation,
93    },
94    /// `OWNER TO <owner_name>`
95    ChangeOwner {
96        new_owner_name: Ident,
97    },
98    /// `SET SCHEMA <schema_name>`
99    SetSchema {
100        new_schema_name: ObjectName,
101    },
102    /// `SET PARALLELISM TO <parallelism> [ DEFERRED ]`
103    SetParallelism {
104        parallelism: SetVariableValue,
105        deferred: bool,
106    },
107    RefreshSchema,
108    /// `SET SOURCE_RATE_LIMIT TO <rate_limit>`
109    SetSourceRateLimit {
110        rate_limit: i32,
111    },
112    /// SET BACKFILL_RATE_LIMIT TO <rate_limit>
113    SetBackfillRateLimit {
114        rate_limit: i32,
115    },
116    /// `SET DML_RATE_LIMIT TO <rate_limit>`
117    SetDmlRateLimit {
118        rate_limit: i32,
119    },
120    /// `SWAP WITH <table_name>`
121    SwapRenameTable {
122        target_table: ObjectName,
123    },
124    /// `DROP CONNECTOR`
125    DropConnector,
126
127    /// `ALTER CONNECTOR WITH (<connector_props>)`
128    AlterConnectorProps {
129        alter_props: Vec<SqlOption>,
130    },
131}
132
133#[derive(Debug, Clone, PartialEq, Eq, Hash)]
134#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
135pub enum AlterIndexOperation {
136    RenameIndex {
137        index_name: ObjectName,
138    },
139    /// `SET PARALLELISM TO <parallelism> [ DEFERRED ]`
140    SetParallelism {
141        parallelism: SetVariableValue,
142        deferred: bool,
143    },
144}
145
146#[derive(Debug, Clone, PartialEq, Eq, Hash)]
147#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
148pub enum AlterViewOperation {
149    RenameView {
150        view_name: ObjectName,
151    },
152    ChangeOwner {
153        new_owner_name: Ident,
154    },
155    SetSchema {
156        new_schema_name: ObjectName,
157    },
158    /// `SET PARALLELISM TO <parallelism> [ DEFERRED ]`
159    SetParallelism {
160        parallelism: SetVariableValue,
161        deferred: bool,
162    },
163    /// `SET RESOURCE_GROUP TO 'RESOURCE GROUP' [ DEFERRED ]`
164    /// `RESET RESOURCE_GROUP [ DEFERRED ]`
165    SetResourceGroup {
166        resource_group: Option<SetVariableValue>,
167        deferred: bool,
168    },
169    /// `SET BACKFILL_RATE_LIMIT TO <rate_limit>`
170    SetBackfillRateLimit {
171        rate_limit: i32,
172    },
173    /// `SWAP WITH <view_name>`
174    SwapRenameView {
175        target_view: ObjectName,
176    },
177    SetStreamingEnableUnalignedJoin {
178        enable: bool,
179    },
180    /// `AS <query>`
181    AsQuery {
182        query: Box<Query>,
183    },
184}
185
186#[derive(Debug, Clone, PartialEq, Eq, Hash)]
187#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
188pub enum AlterSinkOperation {
189    RenameSink {
190        sink_name: ObjectName,
191    },
192    ChangeOwner {
193        new_owner_name: Ident,
194    },
195    SetSchema {
196        new_schema_name: ObjectName,
197    },
198    /// `SET PARALLELISM TO <parallelism> [ DEFERRED ]`
199    SetParallelism {
200        parallelism: SetVariableValue,
201        deferred: bool,
202    },
203    /// `SWAP WITH <sink_name>`
204    SwapRenameSink {
205        target_sink: ObjectName,
206    },
207    SetSinkRateLimit {
208        rate_limit: i32,
209    },
210    AlterConnectorProps {
211        alter_props: Vec<SqlOption>,
212    },
213    SetStreamingEnableUnalignedJoin {
214        enable: bool,
215    },
216}
217
218#[derive(Debug, Clone, PartialEq, Eq, Hash)]
219#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
220pub enum AlterSubscriptionOperation {
221    RenameSubscription { subscription_name: ObjectName },
222    ChangeOwner { new_owner_name: Ident },
223    SetSchema { new_schema_name: ObjectName },
224    SwapRenameSubscription { target_subscription: ObjectName },
225}
226
227#[derive(Debug, Clone, PartialEq, Eq, Hash)]
228#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
229pub enum AlterSourceOperation {
230    RenameSource {
231        source_name: ObjectName,
232    },
233    AddColumn {
234        column_def: ColumnDef,
235    },
236    ChangeOwner {
237        new_owner_name: Ident,
238    },
239    SetSchema {
240        new_schema_name: ObjectName,
241    },
242    FormatEncode {
243        format_encode: FormatEncodeOptions,
244    },
245    RefreshSchema,
246    SetSourceRateLimit {
247        rate_limit: i32,
248    },
249    SwapRenameSource {
250        target_source: ObjectName,
251    },
252    /// `SET PARALLELISM TO <parallelism> [ DEFERRED ]`
253    SetParallelism {
254        parallelism: SetVariableValue,
255        deferred: bool,
256    },
257    AlterConnectorProps {
258        alter_props: Vec<SqlOption>,
259    },
260}
261
262#[derive(Debug, Clone, PartialEq, Eq, Hash)]
263#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
264pub enum AlterFunctionOperation {
265    SetSchema { new_schema_name: ObjectName },
266}
267
268#[derive(Debug, Clone, PartialEq, Eq, Hash)]
269#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
270pub enum AlterConnectionOperation {
271    SetSchema { new_schema_name: ObjectName },
272    ChangeOwner { new_owner_name: Ident },
273}
274
275#[derive(Debug, Clone, PartialEq, Eq, Hash)]
276#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
277pub enum AlterSecretOperation {
278    ChangeCredential { new_credential: Value },
279}
280
281#[derive(Debug, Clone, PartialEq, Eq, Hash)]
282#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
283pub enum AlterFragmentOperation {
284    AlterBackfillRateLimit { rate_limit: i32 },
285    SetParallelism { parallelism: SetVariableValue },
286}
287
288impl fmt::Display for AlterDatabaseOperation {
289    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
290        match self {
291            AlterDatabaseOperation::ChangeOwner { new_owner_name } => {
292                write!(f, "OWNER TO {}", new_owner_name)
293            }
294            AlterDatabaseOperation::RenameDatabase { database_name } => {
295                write!(f, "RENAME TO {}", database_name)
296            }
297            AlterDatabaseOperation::SetParam(ConfigParam { param, value }) => {
298                write!(f, "SET {} TO {}", param, value)
299            }
300        }
301    }
302}
303
304impl fmt::Display for AlterSchemaOperation {
305    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
306        match self {
307            AlterSchemaOperation::ChangeOwner { new_owner_name } => {
308                write!(f, "OWNER TO {}", new_owner_name)
309            }
310            AlterSchemaOperation::RenameSchema { schema_name } => {
311                write!(f, "RENAME TO {}", schema_name)
312            }
313            AlterSchemaOperation::SwapRenameSchema { target_schema } => {
314                write!(f, "SWAP WITH {}", target_schema)
315            }
316        }
317    }
318}
319
320impl fmt::Display for AlterTableOperation {
321    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
322        match self {
323            AlterTableOperation::AddConstraint(c) => write!(f, "ADD {}", c),
324            AlterTableOperation::AddColumn { column_def } => {
325                write!(f, "ADD COLUMN {}", column_def)
326            }
327            AlterTableOperation::AlterColumn { column_name, op } => {
328                write!(f, "ALTER COLUMN {} {}", column_name, op)
329            }
330            AlterTableOperation::DropConstraint { name } => write!(f, "DROP CONSTRAINT {}", name),
331            AlterTableOperation::DropColumn {
332                column_name,
333                if_exists,
334                cascade,
335            } => write!(
336                f,
337                "DROP COLUMN {}{}{}",
338                if *if_exists { "IF EXISTS " } else { "" },
339                column_name,
340                if *cascade { " CASCADE" } else { "" }
341            ),
342            AlterTableOperation::RenameColumn {
343                old_column_name,
344                new_column_name,
345            } => write!(
346                f,
347                "RENAME COLUMN {} TO {}",
348                old_column_name, new_column_name
349            ),
350            AlterTableOperation::RenameTable { table_name } => {
351                write!(f, "RENAME TO {}", table_name)
352            }
353            AlterTableOperation::ChangeColumn {
354                old_name,
355                new_name,
356                data_type,
357                options,
358            } => {
359                write!(f, "CHANGE COLUMN {} {} {}", old_name, new_name, data_type)?;
360                if options.is_empty() {
361                    Ok(())
362                } else {
363                    write!(f, " {}", display_separated(options, " "))
364                }
365            }
366            AlterTableOperation::RenameConstraint { old_name, new_name } => {
367                write!(f, "RENAME CONSTRAINT {} TO {}", old_name, new_name)
368            }
369            AlterTableOperation::ChangeOwner { new_owner_name } => {
370                write!(f, "OWNER TO {}", new_owner_name)
371            }
372            AlterTableOperation::SetSchema { new_schema_name } => {
373                write!(f, "SET SCHEMA {}", new_schema_name)
374            }
375            AlterTableOperation::SetParallelism {
376                parallelism,
377                deferred,
378            } => {
379                write!(
380                    f,
381                    "SET PARALLELISM TO {}{}",
382                    parallelism,
383                    if *deferred { " DEFERRED" } else { "" }
384                )
385            }
386            AlterTableOperation::RefreshSchema => {
387                write!(f, "REFRESH SCHEMA")
388            }
389            AlterTableOperation::SetSourceRateLimit { rate_limit } => {
390                write!(f, "SET SOURCE_RATE_LIMIT TO {}", rate_limit)
391            }
392            AlterTableOperation::SetBackfillRateLimit { rate_limit } => {
393                write!(f, "SET BACKFILL_RATE_LIMIT TO {}", rate_limit)
394            }
395            AlterTableOperation::SetDmlRateLimit { rate_limit } => {
396                write!(f, "SET DML_RATE_LIMIT TO {}", rate_limit)
397            }
398            AlterTableOperation::SwapRenameTable { target_table } => {
399                write!(f, "SWAP WITH {}", target_table)
400            }
401            AlterTableOperation::DropConnector => {
402                write!(f, "DROP CONNECTOR")
403            }
404            AlterTableOperation::AlterConnectorProps { alter_props } => {
405                write!(
406                    f,
407                    "CONNECTOR WITH ({})",
408                    display_comma_separated(alter_props)
409                )
410            }
411        }
412    }
413}
414
415impl fmt::Display for AlterIndexOperation {
416    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
417        match self {
418            AlterIndexOperation::RenameIndex { index_name } => {
419                write!(f, "RENAME TO {index_name}")
420            }
421            AlterIndexOperation::SetParallelism {
422                parallelism,
423                deferred,
424            } => {
425                write!(
426                    f,
427                    "SET PARALLELISM TO {}{}",
428                    parallelism,
429                    if *deferred { " DEFERRED" } else { "" }
430                )
431            }
432        }
433    }
434}
435
436impl fmt::Display for AlterViewOperation {
437    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
438        match self {
439            AlterViewOperation::RenameView { view_name } => {
440                write!(f, "RENAME TO {view_name}")
441            }
442            AlterViewOperation::ChangeOwner { new_owner_name } => {
443                write!(f, "OWNER TO {}", new_owner_name)
444            }
445            AlterViewOperation::SetSchema { new_schema_name } => {
446                write!(f, "SET SCHEMA {}", new_schema_name)
447            }
448            AlterViewOperation::SetParallelism {
449                parallelism,
450                deferred,
451            } => {
452                write!(
453                    f,
454                    "SET PARALLELISM TO {}{}",
455                    parallelism,
456                    if *deferred { " DEFERRED" } else { "" }
457                )
458            }
459            AlterViewOperation::SetBackfillRateLimit { rate_limit } => {
460                write!(f, "SET BACKFILL_RATE_LIMIT TO {}", rate_limit)
461            }
462            AlterViewOperation::SwapRenameView { target_view } => {
463                write!(f, "SWAP WITH {}", target_view)
464            }
465            AlterViewOperation::SetResourceGroup {
466                resource_group,
467                deferred,
468            } => {
469                let deferred = if *deferred { " DEFERRED" } else { "" };
470
471                if let Some(resource_group) = resource_group {
472                    write!(f, "SET RESOURCE_GROUP TO {} {}", resource_group, deferred)
473                } else {
474                    write!(f, "RESET RESOURCE_GROUP {}", deferred)
475                }
476            }
477            AlterViewOperation::SetStreamingEnableUnalignedJoin { enable } => {
478                write!(f, "SET STREAMING_ENABLE_UNALIGNED_JOIN TO {}", enable)
479            }
480            AlterViewOperation::AsQuery { query } => {
481                write!(f, "AS {}", query)
482            }
483        }
484    }
485}
486
487impl fmt::Display for AlterSinkOperation {
488    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
489        match self {
490            AlterSinkOperation::RenameSink { sink_name } => {
491                write!(f, "RENAME TO {sink_name}")
492            }
493            AlterSinkOperation::ChangeOwner { new_owner_name } => {
494                write!(f, "OWNER TO {}", new_owner_name)
495            }
496            AlterSinkOperation::SetSchema { new_schema_name } => {
497                write!(f, "SET SCHEMA {}", new_schema_name)
498            }
499            AlterSinkOperation::SetParallelism {
500                parallelism,
501                deferred,
502            } => {
503                write!(
504                    f,
505                    "SET PARALLELISM TO {}{}",
506                    parallelism,
507                    if *deferred { " DEFERRED" } else { "" }
508                )
509            }
510            AlterSinkOperation::SwapRenameSink { target_sink } => {
511                write!(f, "SWAP WITH {}", target_sink)
512            }
513            AlterSinkOperation::SetSinkRateLimit { rate_limit } => {
514                write!(f, "SET SINK_RATE_LIMIT TO {}", rate_limit)
515            }
516            AlterSinkOperation::AlterConnectorProps {
517                alter_props: changed_props,
518            } => {
519                write!(
520                    f,
521                    "CONNECTOR WITH ({})",
522                    display_comma_separated(changed_props)
523                )
524            }
525            AlterSinkOperation::SetStreamingEnableUnalignedJoin { enable } => {
526                write!(f, "SET STREAMING_ENABLE_UNALIGNED_JOIN TO {}", enable)
527            }
528        }
529    }
530}
531
532impl fmt::Display for AlterSubscriptionOperation {
533    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
534        match self {
535            AlterSubscriptionOperation::RenameSubscription { subscription_name } => {
536                write!(f, "RENAME TO {subscription_name}")
537            }
538            AlterSubscriptionOperation::ChangeOwner { new_owner_name } => {
539                write!(f, "OWNER TO {}", new_owner_name)
540            }
541            AlterSubscriptionOperation::SetSchema { new_schema_name } => {
542                write!(f, "SET SCHEMA {}", new_schema_name)
543            }
544            AlterSubscriptionOperation::SwapRenameSubscription {
545                target_subscription,
546            } => {
547                write!(f, "SWAP WITH {}", target_subscription)
548            }
549        }
550    }
551}
552
553impl fmt::Display for AlterSourceOperation {
554    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
555        match self {
556            AlterSourceOperation::RenameSource { source_name } => {
557                write!(f, "RENAME TO {source_name}")
558            }
559            AlterSourceOperation::AddColumn { column_def } => {
560                write!(f, "ADD COLUMN {column_def}")
561            }
562            AlterSourceOperation::ChangeOwner { new_owner_name } => {
563                write!(f, "OWNER TO {}", new_owner_name)
564            }
565            AlterSourceOperation::SetSchema { new_schema_name } => {
566                write!(f, "SET SCHEMA {}", new_schema_name)
567            }
568            AlterSourceOperation::FormatEncode { format_encode } => {
569                write!(f, "{format_encode}")
570            }
571            AlterSourceOperation::RefreshSchema => {
572                write!(f, "REFRESH SCHEMA")
573            }
574            AlterSourceOperation::SetSourceRateLimit { rate_limit } => {
575                write!(f, "SET SOURCE_RATE_LIMIT TO {}", rate_limit)
576            }
577            AlterSourceOperation::SwapRenameSource { target_source } => {
578                write!(f, "SWAP WITH {}", target_source)
579            }
580            AlterSourceOperation::SetParallelism {
581                parallelism,
582                deferred,
583            } => {
584                write!(
585                    f,
586                    "SET PARALLELISM TO {}{}",
587                    parallelism,
588                    if *deferred { " DEFERRED" } else { "" }
589                )
590            }
591            AlterSourceOperation::AlterConnectorProps { alter_props } => {
592                write!(
593                    f,
594                    "CONNECTOR WITH ({})",
595                    display_comma_separated(alter_props)
596                )
597            }
598        }
599    }
600}
601
602impl fmt::Display for AlterFunctionOperation {
603    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
604        match self {
605            AlterFunctionOperation::SetSchema { new_schema_name } => {
606                write!(f, "SET SCHEMA {new_schema_name}")
607            }
608        }
609    }
610}
611
612impl fmt::Display for AlterConnectionOperation {
613    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
614        match self {
615            AlterConnectionOperation::SetSchema { new_schema_name } => {
616                write!(f, "SET SCHEMA {new_schema_name}")
617            }
618            AlterConnectionOperation::ChangeOwner { new_owner_name } => {
619                write!(f, "OWNER TO {new_owner_name}")
620            }
621        }
622    }
623}
624
625impl fmt::Display for AlterSecretOperation {
626    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
627        match self {
628            AlterSecretOperation::ChangeCredential { new_credential } => {
629                write!(f, "AS {new_credential}")
630            }
631        }
632    }
633}
634
635/// An `ALTER COLUMN` (`Statement::AlterTable`) operation
636#[derive(Debug, Clone, PartialEq, Eq, Hash)]
637#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
638pub enum AlterColumnOperation {
639    /// `SET NOT NULL`
640    SetNotNull,
641    /// `DROP NOT NULL`
642    DropNotNull,
643    /// `SET DEFAULT <expr>`
644    SetDefault { value: Expr },
645    /// `DROP DEFAULT`
646    DropDefault,
647    /// `[SET DATA] TYPE <data_type> [USING <expr>]`
648    SetDataType {
649        data_type: DataType,
650        /// PostgreSQL specific
651        using: Option<Expr>,
652    },
653}
654
655impl fmt::Display for AlterColumnOperation {
656    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
657        match self {
658            AlterColumnOperation::SetNotNull => write!(f, "SET NOT NULL",),
659            AlterColumnOperation::DropNotNull => write!(f, "DROP NOT NULL",),
660            AlterColumnOperation::SetDefault { value } => {
661                write!(f, "SET DEFAULT {}", value)
662            }
663            AlterColumnOperation::DropDefault => {
664                write!(f, "DROP DEFAULT")
665            }
666            AlterColumnOperation::SetDataType { data_type, using } => {
667                if let Some(expr) = using {
668                    write!(f, "SET DATA TYPE {} USING {}", data_type, expr)
669                } else {
670                    write!(f, "SET DATA TYPE {}", data_type)
671                }
672            }
673        }
674    }
675}
676
677impl fmt::Display for AlterFragmentOperation {
678    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
679        match self {
680            AlterFragmentOperation::AlterBackfillRateLimit { rate_limit } => {
681                write!(f, "SET BACKFILL_RATE_LIMIT TO {}", rate_limit)
682            }
683            AlterFragmentOperation::SetParallelism { parallelism } => {
684                write!(f, "SET PARALLELISM TO {}", parallelism)
685            }
686        }
687    }
688}
689
690/// The watermark on source.
691/// `WATERMARK FOR <column> AS (<expr>)`
692#[derive(Debug, Clone, PartialEq, Eq, Hash)]
693#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
694pub struct SourceWatermark {
695    pub column: Ident,
696    pub expr: Expr,
697}
698
699impl fmt::Display for SourceWatermark {
700    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
701        write!(f, "WATERMARK FOR {} AS {}", self.column, self.expr,)
702    }
703}
704
705/// A table-level constraint, specified in a `CREATE TABLE` or an
706/// `ALTER TABLE ADD <constraint>` statement.
707#[derive(Debug, Clone, PartialEq, Eq, Hash)]
708#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
709pub enum TableConstraint {
710    /// `[ CONSTRAINT <name> ] { PRIMARY KEY | UNIQUE } (<columns>)`
711    Unique {
712        name: Option<Ident>,
713        columns: Vec<Ident>,
714        /// Whether this is a `PRIMARY KEY` or just a `UNIQUE` constraint
715        is_primary: bool,
716    },
717    /// A referential integrity constraint (`[ CONSTRAINT <name> ] FOREIGN KEY (<columns>)
718    /// REFERENCES <foreign_table> (<referred_columns>)
719    /// { [ON DELETE <referential_action>] [ON UPDATE <referential_action>] |
720    ///   [ON UPDATE <referential_action>] [ON DELETE <referential_action>]
721    /// }`).
722    ForeignKey {
723        name: Option<Ident>,
724        columns: Vec<Ident>,
725        foreign_table: ObjectName,
726        referred_columns: Vec<Ident>,
727        on_delete: Option<ReferentialAction>,
728        on_update: Option<ReferentialAction>,
729    },
730    /// `[ CONSTRAINT <name> ] CHECK (<expr>)`
731    Check {
732        name: Option<Ident>,
733        expr: Box<Expr>,
734    },
735}
736
737impl fmt::Display for TableConstraint {
738    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
739        match self {
740            TableConstraint::Unique {
741                name,
742                columns,
743                is_primary,
744            } => write!(
745                f,
746                "{}{} ({})",
747                display_constraint_name(name),
748                if *is_primary { "PRIMARY KEY" } else { "UNIQUE" },
749                display_comma_separated(columns)
750            ),
751            TableConstraint::ForeignKey {
752                name,
753                columns,
754                foreign_table,
755                referred_columns,
756                on_delete,
757                on_update,
758            } => {
759                write!(
760                    f,
761                    "{}FOREIGN KEY ({}) REFERENCES {}({})",
762                    display_constraint_name(name),
763                    display_comma_separated(columns),
764                    foreign_table,
765                    display_comma_separated(referred_columns),
766                )?;
767                if let Some(action) = on_delete {
768                    write!(f, " ON DELETE {}", action)?;
769                }
770                if let Some(action) = on_update {
771                    write!(f, " ON UPDATE {}", action)?;
772                }
773                Ok(())
774            }
775            TableConstraint::Check { name, expr } => {
776                write!(f, "{}CHECK ({})", display_constraint_name(name), expr)
777            }
778        }
779    }
780}
781
782/// SQL column definition
783#[derive(Debug, Clone, PartialEq, Eq, Hash)]
784#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
785pub struct ColumnDef {
786    pub name: Ident,
787    pub data_type: Option<DataType>,
788    pub collation: Option<ObjectName>,
789    pub options: Vec<ColumnOptionDef>,
790}
791
792impl ColumnDef {
793    pub fn new(
794        name: Ident,
795        data_type: DataType,
796        collation: Option<ObjectName>,
797        options: Vec<ColumnOptionDef>,
798    ) -> Self {
799        ColumnDef {
800            name,
801            data_type: Some(data_type),
802            collation,
803            options,
804        }
805    }
806
807    pub fn is_generated(&self) -> bool {
808        self.options
809            .iter()
810            .any(|option| matches!(option.option, ColumnOption::GeneratedColumns(_)))
811    }
812}
813
814impl fmt::Display for ColumnDef {
815    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
816        write!(
817            f,
818            "{} {}",
819            self.name,
820            if let Some(data_type) = &self.data_type {
821                data_type.to_string()
822            } else {
823                "None".to_owned()
824            }
825        )?;
826        for option in &self.options {
827            write!(f, " {}", option)?;
828        }
829        Ok(())
830    }
831}
832
833/// An optionally-named `ColumnOption`: `[ CONSTRAINT <name> ] <column-option>`.
834///
835/// Note that implementations are substantially more permissive than the ANSI
836/// specification on what order column options can be presented in, and whether
837/// they are allowed to be named. The specification distinguishes between
838/// constraints (NOT NULL, UNIQUE, PRIMARY KEY, and CHECK), which can be named
839/// and can appear in any order, and other options (DEFAULT, GENERATED), which
840/// cannot be named and must appear in a fixed order. PostgreSQL, however,
841/// allows preceding any option with `CONSTRAINT <name>`, even those that are
842/// not really constraints, like NULL and DEFAULT. MSSQL is less permissive,
843/// allowing DEFAULT, UNIQUE, PRIMARY KEY and CHECK to be named, but not NULL or
844/// NOT NULL constraints (the last of which is in violation of the spec).
845///
846/// For maximum flexibility, we don't distinguish between constraint and
847/// non-constraint options, lumping them all together under the umbrella of
848/// "column options," and we allow any column option to be named.
849#[derive(Debug, Clone, PartialEq, Eq, Hash)]
850#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
851pub struct ColumnOptionDef {
852    pub name: Option<Ident>,
853    pub option: ColumnOption,
854}
855
856impl fmt::Display for ColumnOptionDef {
857    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
858        write!(f, "{}{}", display_constraint_name(&self.name), self.option)
859    }
860}
861
862/// `ColumnOption`s are modifiers that follow a column definition in a `CREATE
863/// TABLE` statement.
864#[derive(Debug, Clone, PartialEq, Eq, Hash)]
865#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
866pub enum ColumnOption {
867    /// `NULL`
868    Null,
869    /// `NOT NULL`
870    NotNull,
871    /// `DEFAULT <restricted-expr>`
872    DefaultValue(Expr),
873    /// Default value from previous bound `DefaultColumnDesc`. Used internally
874    /// for schema change and should not be specified by users.
875    DefaultValueInternal {
876        /// Protobuf encoded `DefaultColumnDesc`.
877        persisted: Box<[u8]>,
878        /// Optional AST for unparsing. If `None`, the default value will be
879        /// shown as `DEFAULT INTERNAL` which is for demonstrating and should
880        /// not be specified by users.
881        expr: Option<Expr>,
882    },
883    /// `{ PRIMARY KEY | UNIQUE }`
884    Unique { is_primary: bool },
885    /// A referential integrity constraint (`[FOREIGN KEY REFERENCES
886    /// <foreign_table> (<referred_columns>)
887    /// { [ON DELETE <referential_action>] [ON UPDATE <referential_action>] |
888    ///   [ON UPDATE <referential_action>] [ON DELETE <referential_action>]
889    /// }`).
890    ForeignKey {
891        foreign_table: ObjectName,
892        referred_columns: Vec<Ident>,
893        on_delete: Option<ReferentialAction>,
894        on_update: Option<ReferentialAction>,
895    },
896    /// `CHECK (<expr>)`
897    Check(Expr),
898    /// Dialect-specific options, such as:
899    /// - MySQL's `AUTO_INCREMENT` or SQLite's `AUTOINCREMENT`
900    /// - ...
901    DialectSpecific(Vec<Token>),
902    /// AS ( <generation_expr> )`
903    GeneratedColumns(Expr),
904}
905
906impl fmt::Display for ColumnOption {
907    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
908        use ColumnOption::*;
909        match self {
910            Null => write!(f, "NULL"),
911            NotNull => write!(f, "NOT NULL"),
912            DefaultValue(expr) => write!(f, "DEFAULT {}", expr),
913            DefaultValueInternal { persisted: _, expr } => {
914                if let Some(expr) = expr {
915                    write!(f, "DEFAULT {}", expr)
916                } else {
917                    write!(f, "DEFAULT INTERNAL")
918                }
919            }
920            Unique { is_primary } => {
921                write!(f, "{}", if *is_primary { "PRIMARY KEY" } else { "UNIQUE" })
922            }
923            ForeignKey {
924                foreign_table,
925                referred_columns,
926                on_delete,
927                on_update,
928            } => {
929                write!(f, "REFERENCES {}", foreign_table)?;
930                if !referred_columns.is_empty() {
931                    write!(f, " ({})", display_comma_separated(referred_columns))?;
932                }
933                if let Some(action) = on_delete {
934                    write!(f, " ON DELETE {}", action)?;
935                }
936                if let Some(action) = on_update {
937                    write!(f, " ON UPDATE {}", action)?;
938                }
939                Ok(())
940            }
941            Check(expr) => write!(f, "CHECK ({})", expr),
942            DialectSpecific(val) => write!(f, "{}", display_separated(val, " ")),
943            GeneratedColumns(expr) => write!(f, "AS {}", expr),
944        }
945    }
946}
947
948fn display_constraint_name(name: &'_ Option<Ident>) -> impl fmt::Display + '_ {
949    struct ConstraintName<'a>(&'a Option<Ident>);
950    impl fmt::Display for ConstraintName<'_> {
951        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
952            if let Some(name) = self.0 {
953                write!(f, "CONSTRAINT {} ", name)?;
954            }
955            Ok(())
956        }
957    }
958    ConstraintName(name)
959}
960
961/// `<referential_action> =
962/// { RESTRICT | CASCADE | SET NULL | NO ACTION | SET DEFAULT }`
963///
964/// Used in foreign key constraints in `ON UPDATE` and `ON DELETE` options.
965#[derive(Debug, Clone, PartialEq, Eq, Hash)]
966#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
967pub enum ReferentialAction {
968    Restrict,
969    Cascade,
970    SetNull,
971    NoAction,
972    SetDefault,
973}
974
975impl fmt::Display for ReferentialAction {
976    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
977        f.write_str(match self {
978            ReferentialAction::Restrict => "RESTRICT",
979            ReferentialAction::Cascade => "CASCADE",
980            ReferentialAction::SetNull => "SET NULL",
981            ReferentialAction::NoAction => "NO ACTION",
982            ReferentialAction::SetDefault => "SET DEFAULT",
983        })
984    }
985}
986
987/// secure secret definition for webhook source
988#[derive(Debug, Clone, PartialEq, Eq, Hash)]
989#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
990pub struct WebhookSourceInfo {
991    pub secret_ref: Option<SecretRefValue>,
992    pub signature_expr: Expr,
993    pub wait_for_persistence: bool,
994    pub is_batched: bool,
995}