risingwave_sqlparser/ast/
ddl.rs

1// Licensed under the Apache License, Version 2.0 (the "License");
2// you may not use this file except in compliance with the License.
3// You may obtain a copy of the License at
4//
5//     http://www.apache.org/licenses/LICENSE-2.0
6//
7// Unless required by applicable law or agreed to in writing, software
8// distributed under the License is distributed on an "AS IS" BASIS,
9// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10// See the License for the specific language governing permissions and
11// limitations under the License.
12
13//! AST types specific to CREATE/ALTER variants of [`crate::ast::Statement`]
14//! (commonly referred to as Data Definition Language, or DDL)
15
16use std::fmt;
17
18use super::{ConfigParam, FormatEncodeOptions, SqlOption};
19use crate::ast::{
20    DataType, Expr, Ident, ObjectName, Query, SecretRefValue, SetVariableValue, Value,
21    display_comma_separated, display_separated,
22};
23use crate::tokenizer::Token;
24
25#[derive(Debug, Clone, PartialEq, Eq, Hash)]
26pub enum AlterDatabaseOperation {
27    ChangeOwner { new_owner_name: Ident },
28    RenameDatabase { database_name: ObjectName },
29    SetParam(ConfigParam),
30}
31
32#[derive(Debug, Clone, PartialEq, Eq, Hash)]
33pub enum AlterSchemaOperation {
34    ChangeOwner { new_owner_name: Ident },
35    RenameSchema { schema_name: ObjectName },
36    SwapRenameSchema { target_schema: ObjectName },
37}
38
39/// An `ALTER TABLE` (`Statement::AlterTable`) operation
40#[derive(Debug, Clone, PartialEq, Eq, Hash)]
41pub enum AlterTableOperation {
42    /// `ADD <table_constraint>`
43    AddConstraint(TableConstraint),
44    /// `ADD [ COLUMN ] <column_def>`
45    AddColumn {
46        column_def: ColumnDef,
47    },
48    /// TODO: implement `DROP CONSTRAINT <name>`
49    DropConstraint {
50        name: Ident,
51    },
52    /// `DROP [ COLUMN ] [ IF EXISTS ] <column_name> [ CASCADE ]`
53    DropColumn {
54        column_name: Ident,
55        if_exists: bool,
56        cascade: bool,
57    },
58    /// `RENAME [ COLUMN ] <old_column_name> TO <new_column_name>`
59    RenameColumn {
60        old_column_name: Ident,
61        new_column_name: Ident,
62    },
63    /// `RENAME TO <table_name>`
64    RenameTable {
65        table_name: ObjectName,
66    },
67    // CHANGE [ COLUMN ] <old_name> <new_name> <data_type> [ <options> ]
68    ChangeColumn {
69        old_name: Ident,
70        new_name: Ident,
71        data_type: DataType,
72        options: Vec<ColumnOption>,
73    },
74    /// `RENAME CONSTRAINT <old_constraint_name> TO <new_constraint_name>`
75    ///
76    /// Note: this is a PostgreSQL-specific operation.
77    RenameConstraint {
78        old_name: Ident,
79        new_name: Ident,
80    },
81    /// `ALTER [ COLUMN ]`
82    AlterColumn {
83        column_name: Ident,
84        op: AlterColumnOperation,
85    },
86    /// `OWNER TO <owner_name>`
87    ChangeOwner {
88        new_owner_name: Ident,
89    },
90    /// `SET SCHEMA <schema_name>`
91    SetSchema {
92        new_schema_name: ObjectName,
93    },
94    /// `SET PARALLELISM TO <parallelism> [ DEFERRED ]`
95    SetParallelism {
96        parallelism: SetVariableValue,
97        deferred: bool,
98    },
99    /// `SET CONFIG (key = value, ...)`
100    SetConfig {
101        entries: Vec<SqlOption>,
102    },
103    /// `RESET CONFIG (key, ...)`
104    ResetConfig {
105        keys: Vec<ObjectName>,
106    },
107    RefreshSchema,
108    /// `SET SOURCE_RATE_LIMIT TO <rate_limit>`
109    SetSourceRateLimit {
110        rate_limit: i32,
111    },
112    /// SET BACKFILL_RATE_LIMIT TO <rate_limit>
113    SetBackfillRateLimit {
114        rate_limit: i32,
115    },
116    /// `SET DML_RATE_LIMIT TO <rate_limit>`
117    SetDmlRateLimit {
118        rate_limit: i32,
119    },
120    /// `SWAP WITH <table_name>`
121    SwapRenameTable {
122        target_table: ObjectName,
123    },
124    /// `DROP CONNECTOR`
125    DropConnector,
126
127    /// `ALTER CONNECTOR WITH (<connector_props>)`
128    AlterConnectorProps {
129        alter_props: Vec<SqlOption>,
130    },
131}
132
133#[derive(Debug, Clone, PartialEq, Eq, Hash)]
134pub enum AlterIndexOperation {
135    RenameIndex {
136        index_name: ObjectName,
137    },
138    /// `SET PARALLELISM TO <parallelism> [ DEFERRED ]`
139    SetParallelism {
140        parallelism: SetVariableValue,
141        deferred: bool,
142    },
143    /// `SET CONFIG (key = value, ...)`
144    SetConfig {
145        entries: Vec<SqlOption>,
146    },
147    /// `RESET CONFIG (key, ...)`
148    ResetConfig {
149        keys: Vec<ObjectName>,
150    },
151}
152
153#[derive(Debug, Clone, PartialEq, Eq, Hash)]
154pub enum AlterViewOperation {
155    RenameView {
156        view_name: ObjectName,
157    },
158    ChangeOwner {
159        new_owner_name: Ident,
160    },
161    SetSchema {
162        new_schema_name: ObjectName,
163    },
164    /// `SET PARALLELISM TO <parallelism> [ DEFERRED ]`
165    SetParallelism {
166        parallelism: SetVariableValue,
167        deferred: bool,
168    },
169    /// `SET RESOURCE_GROUP TO 'RESOURCE GROUP' [ DEFERRED ]`
170    /// `RESET RESOURCE_GROUP [ DEFERRED ]`
171    SetResourceGroup {
172        resource_group: Option<SetVariableValue>,
173        deferred: bool,
174    },
175    /// `SET BACKFILL_RATE_LIMIT TO <rate_limit>`
176    SetBackfillRateLimit {
177        rate_limit: i32,
178    },
179    /// `SWAP WITH <view_name>`
180    SwapRenameView {
181        target_view: ObjectName,
182    },
183    SetStreamingEnableUnalignedJoin {
184        enable: bool,
185    },
186    /// `AS <query>`
187    AsQuery {
188        query: Box<Query>,
189    },
190    /// `SET CONFIG ( streaming.some_config_key = <some_config_value>, .. )`
191    SetConfig {
192        entries: Vec<SqlOption>,
193    },
194    /// `RESET CONFIG ( streaming.some_config_key, .. )`
195    ResetConfig {
196        keys: Vec<ObjectName>,
197    },
198}
199
200#[derive(Debug, Clone, PartialEq, Eq, Hash)]
201pub enum AlterSinkOperation {
202    RenameSink {
203        sink_name: ObjectName,
204    },
205    ChangeOwner {
206        new_owner_name: Ident,
207    },
208    SetSchema {
209        new_schema_name: ObjectName,
210    },
211    /// `SET PARALLELISM TO <parallelism> [ DEFERRED ]`
212    SetParallelism {
213        parallelism: SetVariableValue,
214        deferred: bool,
215    },
216    /// `SET CONFIG (key = value, ...)`
217    SetConfig {
218        entries: Vec<SqlOption>,
219    },
220    /// `RESET CONFIG (key, ...)`
221    ResetConfig {
222        keys: Vec<ObjectName>,
223    },
224    /// `SWAP WITH <sink_name>`
225    SwapRenameSink {
226        target_sink: ObjectName,
227    },
228    SetSinkRateLimit {
229        rate_limit: i32,
230    },
231    AlterConnectorProps {
232        alter_props: Vec<SqlOption>,
233    },
234    SetStreamingEnableUnalignedJoin {
235        enable: bool,
236    },
237}
238
239#[derive(Debug, Clone, PartialEq, Eq, Hash)]
240pub enum AlterSubscriptionOperation {
241    RenameSubscription { subscription_name: ObjectName },
242    ChangeOwner { new_owner_name: Ident },
243    SetSchema { new_schema_name: ObjectName },
244    SwapRenameSubscription { target_subscription: ObjectName },
245}
246
247#[derive(Debug, Clone, PartialEq, Eq, Hash)]
248pub enum AlterSourceOperation {
249    RenameSource {
250        source_name: ObjectName,
251    },
252    AddColumn {
253        column_def: ColumnDef,
254    },
255    ChangeOwner {
256        new_owner_name: Ident,
257    },
258    SetSchema {
259        new_schema_name: ObjectName,
260    },
261    FormatEncode {
262        format_encode: FormatEncodeOptions,
263    },
264    RefreshSchema,
265    SetSourceRateLimit {
266        rate_limit: i32,
267    },
268    SwapRenameSource {
269        target_source: ObjectName,
270    },
271    /// `SET PARALLELISM TO <parallelism> [ DEFERRED ]`
272    SetParallelism {
273        parallelism: SetVariableValue,
274        deferred: bool,
275    },
276    /// `SET CONFIG (key = value, ...)`
277    SetConfig {
278        entries: Vec<SqlOption>,
279    },
280    /// `RESET CONFIG (key, ...)`
281    ResetConfig {
282        keys: Vec<ObjectName>,
283    },
284    AlterConnectorProps {
285        alter_props: Vec<SqlOption>,
286    },
287}
288
289#[derive(Debug, Clone, PartialEq, Eq, Hash)]
290pub enum AlterFunctionOperation {
291    SetSchema { new_schema_name: ObjectName },
292}
293
294#[derive(Debug, Clone, PartialEq, Eq, Hash)]
295pub enum AlterConnectionOperation {
296    SetSchema { new_schema_name: ObjectName },
297    ChangeOwner { new_owner_name: Ident },
298}
299
300#[derive(Debug, Clone, PartialEq, Eq, Hash)]
301pub enum AlterSecretOperation {
302    ChangeCredential { new_credential: Value },
303}
304
305#[derive(Debug, Clone, PartialEq, Eq, Hash)]
306pub enum AlterFragmentOperation {
307    AlterBackfillRateLimit { rate_limit: i32 },
308    SetParallelism { parallelism: SetVariableValue },
309}
310
311impl fmt::Display for AlterDatabaseOperation {
312    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
313        match self {
314            AlterDatabaseOperation::ChangeOwner { new_owner_name } => {
315                write!(f, "OWNER TO {}", new_owner_name)
316            }
317            AlterDatabaseOperation::RenameDatabase { database_name } => {
318                write!(f, "RENAME TO {}", database_name)
319            }
320            AlterDatabaseOperation::SetParam(ConfigParam { param, value }) => {
321                write!(f, "SET {} TO {}", param, value)
322            }
323        }
324    }
325}
326
327impl fmt::Display for AlterSchemaOperation {
328    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
329        match self {
330            AlterSchemaOperation::ChangeOwner { new_owner_name } => {
331                write!(f, "OWNER TO {}", new_owner_name)
332            }
333            AlterSchemaOperation::RenameSchema { schema_name } => {
334                write!(f, "RENAME TO {}", schema_name)
335            }
336            AlterSchemaOperation::SwapRenameSchema { target_schema } => {
337                write!(f, "SWAP WITH {}", target_schema)
338            }
339        }
340    }
341}
342
343impl fmt::Display for AlterTableOperation {
344    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
345        match self {
346            AlterTableOperation::AddConstraint(c) => write!(f, "ADD {}", c),
347            AlterTableOperation::AddColumn { column_def } => {
348                write!(f, "ADD COLUMN {}", column_def)
349            }
350            AlterTableOperation::AlterColumn { column_name, op } => {
351                write!(f, "ALTER COLUMN {} {}", column_name, op)
352            }
353            AlterTableOperation::DropConstraint { name } => write!(f, "DROP CONSTRAINT {}", name),
354            AlterTableOperation::DropColumn {
355                column_name,
356                if_exists,
357                cascade,
358            } => write!(
359                f,
360                "DROP COLUMN {}{}{}",
361                if *if_exists { "IF EXISTS " } else { "" },
362                column_name,
363                if *cascade { " CASCADE" } else { "" }
364            ),
365            AlterTableOperation::RenameColumn {
366                old_column_name,
367                new_column_name,
368            } => write!(
369                f,
370                "RENAME COLUMN {} TO {}",
371                old_column_name, new_column_name
372            ),
373            AlterTableOperation::RenameTable { table_name } => {
374                write!(f, "RENAME TO {}", table_name)
375            }
376            AlterTableOperation::ChangeColumn {
377                old_name,
378                new_name,
379                data_type,
380                options,
381            } => {
382                write!(f, "CHANGE COLUMN {} {} {}", old_name, new_name, data_type)?;
383                if options.is_empty() {
384                    Ok(())
385                } else {
386                    write!(f, " {}", display_separated(options, " "))
387                }
388            }
389            AlterTableOperation::RenameConstraint { old_name, new_name } => {
390                write!(f, "RENAME CONSTRAINT {} TO {}", old_name, new_name)
391            }
392            AlterTableOperation::ChangeOwner { new_owner_name } => {
393                write!(f, "OWNER TO {}", new_owner_name)
394            }
395            AlterTableOperation::SetSchema { new_schema_name } => {
396                write!(f, "SET SCHEMA {}", new_schema_name)
397            }
398            AlterTableOperation::SetParallelism {
399                parallelism,
400                deferred,
401            } => {
402                write!(
403                    f,
404                    "SET PARALLELISM TO {}{}",
405                    parallelism,
406                    if *deferred { " DEFERRED" } else { "" }
407                )
408            }
409            AlterTableOperation::SetConfig { entries } => {
410                write!(f, "SET CONFIG ({})", display_comma_separated(entries))
411            }
412            AlterTableOperation::ResetConfig { keys } => {
413                write!(f, "RESET CONFIG ({})", display_comma_separated(keys))
414            }
415            AlterTableOperation::RefreshSchema => {
416                write!(f, "REFRESH SCHEMA")
417            }
418            AlterTableOperation::SetSourceRateLimit { rate_limit } => {
419                write!(f, "SET SOURCE_RATE_LIMIT TO {}", rate_limit)
420            }
421            AlterTableOperation::SetBackfillRateLimit { rate_limit } => {
422                write!(f, "SET BACKFILL_RATE_LIMIT TO {}", rate_limit)
423            }
424            AlterTableOperation::SetDmlRateLimit { rate_limit } => {
425                write!(f, "SET DML_RATE_LIMIT TO {}", rate_limit)
426            }
427            AlterTableOperation::SwapRenameTable { target_table } => {
428                write!(f, "SWAP WITH {}", target_table)
429            }
430            AlterTableOperation::DropConnector => {
431                write!(f, "DROP CONNECTOR")
432            }
433            AlterTableOperation::AlterConnectorProps { alter_props } => {
434                write!(
435                    f,
436                    "CONNECTOR WITH ({})",
437                    display_comma_separated(alter_props)
438                )
439            }
440        }
441    }
442}
443
444impl fmt::Display for AlterIndexOperation {
445    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
446        match self {
447            AlterIndexOperation::RenameIndex { index_name } => {
448                write!(f, "RENAME TO {index_name}")
449            }
450            AlterIndexOperation::SetParallelism {
451                parallelism,
452                deferred,
453            } => {
454                write!(
455                    f,
456                    "SET PARALLELISM TO {}{}",
457                    parallelism,
458                    if *deferred { " DEFERRED" } else { "" }
459                )
460            }
461            AlterIndexOperation::SetConfig { entries } => {
462                write!(f, "SET CONFIG ({})", display_comma_separated(entries))
463            }
464            AlterIndexOperation::ResetConfig { keys } => {
465                write!(f, "RESET CONFIG ({})", display_comma_separated(keys))
466            }
467        }
468    }
469}
470
471impl fmt::Display for AlterViewOperation {
472    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
473        match self {
474            AlterViewOperation::RenameView { view_name } => {
475                write!(f, "RENAME TO {view_name}")
476            }
477            AlterViewOperation::ChangeOwner { new_owner_name } => {
478                write!(f, "OWNER TO {}", new_owner_name)
479            }
480            AlterViewOperation::SetSchema { new_schema_name } => {
481                write!(f, "SET SCHEMA {}", new_schema_name)
482            }
483            AlterViewOperation::SetParallelism {
484                parallelism,
485                deferred,
486            } => {
487                write!(
488                    f,
489                    "SET PARALLELISM TO {}{}",
490                    parallelism,
491                    if *deferred { " DEFERRED" } else { "" }
492                )
493            }
494            AlterViewOperation::SetBackfillRateLimit { rate_limit } => {
495                write!(f, "SET BACKFILL_RATE_LIMIT TO {}", rate_limit)
496            }
497            AlterViewOperation::SwapRenameView { target_view } => {
498                write!(f, "SWAP WITH {}", target_view)
499            }
500            AlterViewOperation::SetResourceGroup {
501                resource_group,
502                deferred,
503            } => {
504                let deferred = if *deferred { " DEFERRED" } else { "" };
505
506                if let Some(resource_group) = resource_group {
507                    write!(f, "SET RESOURCE_GROUP TO {} {}", resource_group, deferred)
508                } else {
509                    write!(f, "RESET RESOURCE_GROUP {}", deferred)
510                }
511            }
512            AlterViewOperation::SetStreamingEnableUnalignedJoin { enable } => {
513                write!(f, "SET STREAMING_ENABLE_UNALIGNED_JOIN TO {}", enable)
514            }
515            AlterViewOperation::AsQuery { query } => {
516                write!(f, "AS {}", query)
517            }
518            AlterViewOperation::SetConfig { entries } => {
519                write!(f, "SET CONFIG ({})", display_comma_separated(entries))
520            }
521            AlterViewOperation::ResetConfig { keys } => {
522                write!(f, "RESET CONFIG ({})", display_comma_separated(keys))
523            }
524        }
525    }
526}
527
528impl fmt::Display for AlterSinkOperation {
529    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
530        match self {
531            AlterSinkOperation::RenameSink { sink_name } => {
532                write!(f, "RENAME TO {sink_name}")
533            }
534            AlterSinkOperation::ChangeOwner { new_owner_name } => {
535                write!(f, "OWNER TO {}", new_owner_name)
536            }
537            AlterSinkOperation::SetSchema { new_schema_name } => {
538                write!(f, "SET SCHEMA {}", new_schema_name)
539            }
540            AlterSinkOperation::SetParallelism {
541                parallelism,
542                deferred,
543            } => {
544                write!(
545                    f,
546                    "SET PARALLELISM TO {}{}",
547                    parallelism,
548                    if *deferred { " DEFERRED" } else { "" }
549                )
550            }
551            AlterSinkOperation::SetConfig { entries } => {
552                write!(f, "SET CONFIG ({})", display_comma_separated(entries))
553            }
554            AlterSinkOperation::ResetConfig { keys } => {
555                write!(f, "RESET CONFIG ({})", display_comma_separated(keys))
556            }
557            AlterSinkOperation::SwapRenameSink { target_sink } => {
558                write!(f, "SWAP WITH {}", target_sink)
559            }
560            AlterSinkOperation::SetSinkRateLimit { rate_limit } => {
561                write!(f, "SET SINK_RATE_LIMIT TO {}", rate_limit)
562            }
563            AlterSinkOperation::AlterConnectorProps {
564                alter_props: changed_props,
565            } => {
566                write!(
567                    f,
568                    "CONNECTOR WITH ({})",
569                    display_comma_separated(changed_props)
570                )
571            }
572            AlterSinkOperation::SetStreamingEnableUnalignedJoin { enable } => {
573                write!(f, "SET STREAMING_ENABLE_UNALIGNED_JOIN TO {}", enable)
574            }
575        }
576    }
577}
578
579impl fmt::Display for AlterSubscriptionOperation {
580    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
581        match self {
582            AlterSubscriptionOperation::RenameSubscription { subscription_name } => {
583                write!(f, "RENAME TO {subscription_name}")
584            }
585            AlterSubscriptionOperation::ChangeOwner { new_owner_name } => {
586                write!(f, "OWNER TO {}", new_owner_name)
587            }
588            AlterSubscriptionOperation::SetSchema { new_schema_name } => {
589                write!(f, "SET SCHEMA {}", new_schema_name)
590            }
591            AlterSubscriptionOperation::SwapRenameSubscription {
592                target_subscription,
593            } => {
594                write!(f, "SWAP WITH {}", target_subscription)
595            }
596        }
597    }
598}
599
600impl fmt::Display for AlterSourceOperation {
601    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
602        match self {
603            AlterSourceOperation::RenameSource { source_name } => {
604                write!(f, "RENAME TO {source_name}")
605            }
606            AlterSourceOperation::AddColumn { column_def } => {
607                write!(f, "ADD COLUMN {column_def}")
608            }
609            AlterSourceOperation::ChangeOwner { new_owner_name } => {
610                write!(f, "OWNER TO {}", new_owner_name)
611            }
612            AlterSourceOperation::SetSchema { new_schema_name } => {
613                write!(f, "SET SCHEMA {}", new_schema_name)
614            }
615            AlterSourceOperation::FormatEncode { format_encode } => {
616                write!(f, "{format_encode}")
617            }
618            AlterSourceOperation::RefreshSchema => {
619                write!(f, "REFRESH SCHEMA")
620            }
621            AlterSourceOperation::SetSourceRateLimit { rate_limit } => {
622                write!(f, "SET SOURCE_RATE_LIMIT TO {}", rate_limit)
623            }
624            AlterSourceOperation::SwapRenameSource { target_source } => {
625                write!(f, "SWAP WITH {}", target_source)
626            }
627            AlterSourceOperation::SetParallelism {
628                parallelism,
629                deferred,
630            } => {
631                write!(
632                    f,
633                    "SET PARALLELISM TO {}{}",
634                    parallelism,
635                    if *deferred { " DEFERRED" } else { "" }
636                )
637            }
638            AlterSourceOperation::SetConfig { entries } => {
639                write!(f, "SET CONFIG ({})", display_comma_separated(entries))
640            }
641            AlterSourceOperation::ResetConfig { keys } => {
642                write!(f, "RESET CONFIG ({})", display_comma_separated(keys))
643            }
644            AlterSourceOperation::AlterConnectorProps { alter_props } => {
645                write!(
646                    f,
647                    "CONNECTOR WITH ({})",
648                    display_comma_separated(alter_props)
649                )
650            }
651        }
652    }
653}
654
655impl fmt::Display for AlterFunctionOperation {
656    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
657        match self {
658            AlterFunctionOperation::SetSchema { new_schema_name } => {
659                write!(f, "SET SCHEMA {new_schema_name}")
660            }
661        }
662    }
663}
664
665impl fmt::Display for AlterConnectionOperation {
666    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
667        match self {
668            AlterConnectionOperation::SetSchema { new_schema_name } => {
669                write!(f, "SET SCHEMA {new_schema_name}")
670            }
671            AlterConnectionOperation::ChangeOwner { new_owner_name } => {
672                write!(f, "OWNER TO {new_owner_name}")
673            }
674        }
675    }
676}
677
678impl fmt::Display for AlterSecretOperation {
679    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
680        match self {
681            AlterSecretOperation::ChangeCredential { new_credential } => {
682                write!(f, "AS {new_credential}")
683            }
684        }
685    }
686}
687
688/// An `ALTER COLUMN` (`Statement::AlterTable`) operation
689#[derive(Debug, Clone, PartialEq, Eq, Hash)]
690pub enum AlterColumnOperation {
691    /// `SET NOT NULL`
692    SetNotNull,
693    /// `DROP NOT NULL`
694    DropNotNull,
695    /// `SET DEFAULT <expr>`
696    SetDefault { value: Expr },
697    /// `DROP DEFAULT`
698    DropDefault,
699    /// `[SET DATA] TYPE <data_type> [USING <expr>]`
700    SetDataType {
701        data_type: DataType,
702        /// PostgreSQL specific
703        using: Option<Expr>,
704    },
705}
706
707impl fmt::Display for AlterColumnOperation {
708    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
709        match self {
710            AlterColumnOperation::SetNotNull => write!(f, "SET NOT NULL",),
711            AlterColumnOperation::DropNotNull => write!(f, "DROP NOT NULL",),
712            AlterColumnOperation::SetDefault { value } => {
713                write!(f, "SET DEFAULT {}", value)
714            }
715            AlterColumnOperation::DropDefault => {
716                write!(f, "DROP DEFAULT")
717            }
718            AlterColumnOperation::SetDataType { data_type, using } => {
719                if let Some(expr) = using {
720                    write!(f, "SET DATA TYPE {} USING {}", data_type, expr)
721                } else {
722                    write!(f, "SET DATA TYPE {}", data_type)
723                }
724            }
725        }
726    }
727}
728
729impl fmt::Display for AlterFragmentOperation {
730    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
731        match self {
732            AlterFragmentOperation::AlterBackfillRateLimit { rate_limit } => {
733                write!(f, "SET BACKFILL_RATE_LIMIT TO {}", rate_limit)
734            }
735            AlterFragmentOperation::SetParallelism { parallelism } => {
736                write!(f, "SET PARALLELISM TO {}", parallelism)
737            }
738        }
739    }
740}
741
742/// The watermark on source.
743/// `WATERMARK FOR <column> AS (<expr>)`
744#[derive(Debug, Clone, PartialEq, Eq, Hash)]
745pub struct SourceWatermark {
746    pub column: Ident,
747    pub expr: Expr,
748}
749
750impl fmt::Display for SourceWatermark {
751    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
752        write!(f, "WATERMARK FOR {} AS {}", self.column, self.expr,)
753    }
754}
755
756/// A table-level constraint, specified in a `CREATE TABLE` or an
757/// `ALTER TABLE ADD <constraint>` statement.
758#[derive(Debug, Clone, PartialEq, Eq, Hash)]
759pub enum TableConstraint {
760    /// `[ CONSTRAINT <name> ] { PRIMARY KEY | UNIQUE } (<columns>)`
761    Unique {
762        name: Option<Ident>,
763        columns: Vec<Ident>,
764        /// Whether this is a `PRIMARY KEY` or just a `UNIQUE` constraint
765        is_primary: bool,
766    },
767    /// A referential integrity constraint (`[ CONSTRAINT <name> ] FOREIGN KEY (<columns>)
768    /// REFERENCES <foreign_table> (<referred_columns>)
769    /// { [ON DELETE <referential_action>] [ON UPDATE <referential_action>] |
770    ///   [ON UPDATE <referential_action>] [ON DELETE <referential_action>]
771    /// }`).
772    ForeignKey {
773        name: Option<Ident>,
774        columns: Vec<Ident>,
775        foreign_table: ObjectName,
776        referred_columns: Vec<Ident>,
777        on_delete: Option<ReferentialAction>,
778        on_update: Option<ReferentialAction>,
779    },
780    /// `[ CONSTRAINT <name> ] CHECK (<expr>)`
781    Check {
782        name: Option<Ident>,
783        expr: Box<Expr>,
784    },
785}
786
787impl fmt::Display for TableConstraint {
788    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
789        match self {
790            TableConstraint::Unique {
791                name,
792                columns,
793                is_primary,
794            } => write!(
795                f,
796                "{}{} ({})",
797                display_constraint_name(name),
798                if *is_primary { "PRIMARY KEY" } else { "UNIQUE" },
799                display_comma_separated(columns)
800            ),
801            TableConstraint::ForeignKey {
802                name,
803                columns,
804                foreign_table,
805                referred_columns,
806                on_delete,
807                on_update,
808            } => {
809                write!(
810                    f,
811                    "{}FOREIGN KEY ({}) REFERENCES {}({})",
812                    display_constraint_name(name),
813                    display_comma_separated(columns),
814                    foreign_table,
815                    display_comma_separated(referred_columns),
816                )?;
817                if let Some(action) = on_delete {
818                    write!(f, " ON DELETE {}", action)?;
819                }
820                if let Some(action) = on_update {
821                    write!(f, " ON UPDATE {}", action)?;
822                }
823                Ok(())
824            }
825            TableConstraint::Check { name, expr } => {
826                write!(f, "{}CHECK ({})", display_constraint_name(name), expr)
827            }
828        }
829    }
830}
831
832/// SQL column definition
833#[derive(Debug, Clone, PartialEq, Eq, Hash)]
834pub struct ColumnDef {
835    pub name: Ident,
836    pub data_type: Option<DataType>,
837    pub collation: Option<ObjectName>,
838    pub options: Vec<ColumnOptionDef>,
839}
840
841impl ColumnDef {
842    pub fn new(
843        name: Ident,
844        data_type: DataType,
845        collation: Option<ObjectName>,
846        options: Vec<ColumnOptionDef>,
847    ) -> Self {
848        ColumnDef {
849            name,
850            data_type: Some(data_type),
851            collation,
852            options,
853        }
854    }
855
856    pub fn is_generated(&self) -> bool {
857        self.options
858            .iter()
859            .any(|option| matches!(option.option, ColumnOption::GeneratedColumns(_)))
860    }
861}
862
863impl fmt::Display for ColumnDef {
864    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
865        write!(
866            f,
867            "{} {}",
868            self.name,
869            if let Some(data_type) = &self.data_type {
870                data_type.to_string()
871            } else {
872                "None".to_owned()
873            }
874        )?;
875        for option in &self.options {
876            write!(f, " {}", option)?;
877        }
878        Ok(())
879    }
880}
881
882/// An optionally-named `ColumnOption`: `[ CONSTRAINT <name> ] <column-option>`.
883///
884/// Note that implementations are substantially more permissive than the ANSI
885/// specification on what order column options can be presented in, and whether
886/// they are allowed to be named. The specification distinguishes between
887/// constraints (NOT NULL, UNIQUE, PRIMARY KEY, and CHECK), which can be named
888/// and can appear in any order, and other options (DEFAULT, GENERATED), which
889/// cannot be named and must appear in a fixed order. PostgreSQL, however,
890/// allows preceding any option with `CONSTRAINT <name>`, even those that are
891/// not really constraints, like NULL and DEFAULT. MSSQL is less permissive,
892/// allowing DEFAULT, UNIQUE, PRIMARY KEY and CHECK to be named, but not NULL or
893/// NOT NULL constraints (the last of which is in violation of the spec).
894///
895/// For maximum flexibility, we don't distinguish between constraint and
896/// non-constraint options, lumping them all together under the umbrella of
897/// "column options," and we allow any column option to be named.
898#[derive(Debug, Clone, PartialEq, Eq, Hash)]
899pub struct ColumnOptionDef {
900    pub name: Option<Ident>,
901    pub option: ColumnOption,
902}
903
904impl fmt::Display for ColumnOptionDef {
905    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
906        write!(f, "{}{}", display_constraint_name(&self.name), self.option)
907    }
908}
909
910/// `ColumnOption`s are modifiers that follow a column definition in a `CREATE
911/// TABLE` statement.
912#[derive(Debug, Clone, PartialEq, Eq, Hash)]
913pub enum ColumnOption {
914    /// `NULL`
915    Null,
916    /// `NOT NULL`
917    NotNull,
918    /// `DEFAULT <restricted-expr>`
919    DefaultValue(Expr),
920    /// Default value from previous bound `DefaultColumnDesc`. Used internally
921    /// for schema change and should not be specified by users.
922    DefaultValueInternal {
923        /// Protobuf encoded `DefaultColumnDesc`.
924        persisted: Box<[u8]>,
925        /// Optional AST for unparsing. If `None`, the default value will be
926        /// shown as `DEFAULT INTERNAL` which is for demonstrating and should
927        /// not be specified by users.
928        expr: Option<Expr>,
929    },
930    /// `{ PRIMARY KEY | UNIQUE }`
931    Unique { is_primary: bool },
932    /// A referential integrity constraint (`[FOREIGN KEY REFERENCES
933    /// <foreign_table> (<referred_columns>)
934    /// { [ON DELETE <referential_action>] [ON UPDATE <referential_action>] |
935    ///   [ON UPDATE <referential_action>] [ON DELETE <referential_action>]
936    /// }`).
937    ForeignKey {
938        foreign_table: ObjectName,
939        referred_columns: Vec<Ident>,
940        on_delete: Option<ReferentialAction>,
941        on_update: Option<ReferentialAction>,
942    },
943    /// `CHECK (<expr>)`
944    Check(Expr),
945    /// Dialect-specific options, such as:
946    /// - MySQL's `AUTO_INCREMENT` or SQLite's `AUTOINCREMENT`
947    /// - ...
948    DialectSpecific(Vec<Token>),
949    /// AS ( <generation_expr> )`
950    GeneratedColumns(Expr),
951}
952
953impl fmt::Display for ColumnOption {
954    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
955        use ColumnOption::*;
956        match self {
957            Null => write!(f, "NULL"),
958            NotNull => write!(f, "NOT NULL"),
959            DefaultValue(expr) => write!(f, "DEFAULT {}", expr),
960            DefaultValueInternal { persisted: _, expr } => {
961                if let Some(expr) = expr {
962                    write!(f, "DEFAULT {}", expr)
963                } else {
964                    write!(f, "DEFAULT INTERNAL")
965                }
966            }
967            Unique { is_primary } => {
968                write!(f, "{}", if *is_primary { "PRIMARY KEY" } else { "UNIQUE" })
969            }
970            ForeignKey {
971                foreign_table,
972                referred_columns,
973                on_delete,
974                on_update,
975            } => {
976                write!(f, "REFERENCES {}", foreign_table)?;
977                if !referred_columns.is_empty() {
978                    write!(f, " ({})", display_comma_separated(referred_columns))?;
979                }
980                if let Some(action) = on_delete {
981                    write!(f, " ON DELETE {}", action)?;
982                }
983                if let Some(action) = on_update {
984                    write!(f, " ON UPDATE {}", action)?;
985                }
986                Ok(())
987            }
988            Check(expr) => write!(f, "CHECK ({})", expr),
989            DialectSpecific(val) => write!(f, "{}", display_separated(val, " ")),
990            GeneratedColumns(expr) => write!(f, "AS {}", expr),
991        }
992    }
993}
994
995fn display_constraint_name(name: &'_ Option<Ident>) -> impl fmt::Display + '_ {
996    struct ConstraintName<'a>(&'a Option<Ident>);
997    impl fmt::Display for ConstraintName<'_> {
998        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
999            if let Some(name) = self.0 {
1000                write!(f, "CONSTRAINT {} ", name)?;
1001            }
1002            Ok(())
1003        }
1004    }
1005    ConstraintName(name)
1006}
1007
1008/// `<referential_action> =
1009/// { RESTRICT | CASCADE | SET NULL | NO ACTION | SET DEFAULT }`
1010///
1011/// Used in foreign key constraints in `ON UPDATE` and `ON DELETE` options.
1012#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1013pub enum ReferentialAction {
1014    Restrict,
1015    Cascade,
1016    SetNull,
1017    NoAction,
1018    SetDefault,
1019}
1020
1021impl fmt::Display for ReferentialAction {
1022    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1023        f.write_str(match self {
1024            ReferentialAction::Restrict => "RESTRICT",
1025            ReferentialAction::Cascade => "CASCADE",
1026            ReferentialAction::SetNull => "SET NULL",
1027            ReferentialAction::NoAction => "NO ACTION",
1028            ReferentialAction::SetDefault => "SET DEFAULT",
1029        })
1030    }
1031}
1032
1033/// secure secret definition for webhook source
1034#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1035pub struct WebhookSourceInfo {
1036    pub secret_ref: Option<SecretRefValue>,
1037    pub signature_expr: Expr,
1038    pub wait_for_persistence: bool,
1039    pub is_batched: bool,
1040}