risingwave_sqlparser/ast/
ddl.rs

1// Licensed under the Apache License, Version 2.0 (the "License");
2// you may not use this file except in compliance with the License.
3// You may obtain a copy of the License at
4//
5//     http://www.apache.org/licenses/LICENSE-2.0
6//
7// Unless required by applicable law or agreed to in writing, software
8// distributed under the License is distributed on an "AS IS" BASIS,
9// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10// See the License for the specific language governing permissions and
11// limitations under the License.
12
13//! AST types specific to CREATE/ALTER variants of [`crate::ast::Statement`]
14//! (commonly referred to as Data Definition Language, or DDL)
15
16use std::fmt;
17
18use super::{ConfigParam, FormatEncodeOptions, SqlOption};
19use crate::ast::{
20    DataType, Expr, Ident, ObjectName, Query, SecretRefValue, SetVariableValue, Value,
21    display_comma_separated, display_separated,
22};
23use crate::tokenizer::Token;
24
25#[derive(Debug, Clone, PartialEq, Eq, Hash)]
26pub enum AlterDatabaseOperation {
27    ChangeOwner { new_owner_name: Ident },
28    RenameDatabase { database_name: ObjectName },
29    SetParam(ConfigParam),
30}
31
32#[derive(Debug, Clone, PartialEq, Eq, Hash)]
33pub enum AlterSchemaOperation {
34    ChangeOwner { new_owner_name: Ident },
35    RenameSchema { schema_name: ObjectName },
36    SwapRenameSchema { target_schema: ObjectName },
37}
38
39/// An `ALTER TABLE` (`Statement::AlterTable`) operation
40#[derive(Debug, Clone, PartialEq, Eq, Hash)]
41pub enum AlterTableOperation {
42    /// `ADD <table_constraint>`
43    AddConstraint(TableConstraint),
44    /// `ADD [ COLUMN ] <column_def>`
45    AddColumn {
46        column_def: ColumnDef,
47    },
48    /// TODO: implement `DROP CONSTRAINT <name>`
49    DropConstraint {
50        name: Ident,
51    },
52    /// `DROP [ COLUMN ] [ IF EXISTS ] <column_name> [ CASCADE ]`
53    DropColumn {
54        column_name: Ident,
55        if_exists: bool,
56        cascade: bool,
57    },
58    /// `RENAME [ COLUMN ] <old_column_name> TO <new_column_name>`
59    RenameColumn {
60        old_column_name: Ident,
61        new_column_name: Ident,
62    },
63    /// `RENAME TO <table_name>`
64    RenameTable {
65        table_name: ObjectName,
66    },
67    // CHANGE [ COLUMN ] <old_name> <new_name> <data_type> [ <options> ]
68    ChangeColumn {
69        old_name: Ident,
70        new_name: Ident,
71        data_type: DataType,
72        options: Vec<ColumnOption>,
73    },
74    /// `RENAME CONSTRAINT <old_constraint_name> TO <new_constraint_name>`
75    ///
76    /// Note: this is a PostgreSQL-specific operation.
77    RenameConstraint {
78        old_name: Ident,
79        new_name: Ident,
80    },
81    /// `ALTER [ COLUMN ]`
82    AlterColumn {
83        column_name: Ident,
84        op: AlterColumnOperation,
85    },
86    /// `ALTER WATERMARK FOR <column> AS <expr> [WITH TTL]`
87    AlterWatermark {
88        column_name: Ident,
89        expr: Expr,
90        with_ttl: bool,
91    },
92    /// `OWNER TO <owner_name>`
93    ChangeOwner {
94        new_owner_name: Ident,
95    },
96    /// `SET SCHEMA <schema_name>`
97    SetSchema {
98        new_schema_name: ObjectName,
99    },
100    /// `SET PARALLELISM TO <parallelism> [ DEFERRED ]`
101    SetParallelism {
102        parallelism: SetVariableValue,
103        deferred: bool,
104    },
105    /// `SET BACKFILL_PARALLELISM TO <parallelism> [ DEFERRED ]`
106    SetBackfillParallelism {
107        parallelism: SetVariableValue,
108        deferred: bool,
109    },
110    /// `SET CONFIG (key = value, ...)`
111    SetConfig {
112        entries: Vec<SqlOption>,
113    },
114    /// `RESET CONFIG (key, ...)`
115    ResetConfig {
116        keys: Vec<ObjectName>,
117    },
118    RefreshSchema,
119    /// `SET SOURCE_RATE_LIMIT TO <rate_limit>`
120    SetSourceRateLimit {
121        rate_limit: i32,
122    },
123    /// SET BACKFILL_RATE_LIMIT TO <rate_limit>
124    SetBackfillRateLimit {
125        rate_limit: i32,
126    },
127    /// `SET DML_RATE_LIMIT TO <rate_limit>`
128    SetDmlRateLimit {
129        rate_limit: i32,
130    },
131    /// `SWAP WITH <table_name>`
132    SwapRenameTable {
133        target_table: ObjectName,
134    },
135    /// `DROP CONNECTOR`
136    DropConnector,
137
138    /// `ALTER CONNECTOR WITH (<connector_props>)`
139    AlterConnectorProps {
140        alter_props: Vec<SqlOption>,
141    },
142}
143
144#[derive(Debug, Clone, PartialEq, Eq, Hash)]
145pub enum AlterIndexOperation {
146    RenameIndex {
147        index_name: ObjectName,
148    },
149    /// `SET PARALLELISM TO <parallelism> [ DEFERRED ]`
150    SetParallelism {
151        parallelism: SetVariableValue,
152        deferred: bool,
153    },
154    /// `SET BACKFILL_PARALLELISM TO <parallelism> [ DEFERRED ]`
155    SetBackfillParallelism {
156        parallelism: SetVariableValue,
157        deferred: bool,
158    },
159    /// `SET CONFIG (key = value, ...)`
160    SetConfig {
161        entries: Vec<SqlOption>,
162    },
163    /// `RESET CONFIG (key, ...)`
164    ResetConfig {
165        keys: Vec<ObjectName>,
166    },
167}
168
169#[derive(Debug, Clone, PartialEq, Eq, Hash)]
170pub enum AlterViewOperation {
171    RenameView {
172        view_name: ObjectName,
173    },
174    ChangeOwner {
175        new_owner_name: Ident,
176    },
177    SetSchema {
178        new_schema_name: ObjectName,
179    },
180    /// `SET PARALLELISM TO <parallelism> [ DEFERRED ]`
181    SetParallelism {
182        parallelism: SetVariableValue,
183        deferred: bool,
184    },
185    /// `SET BACKFILL_PARALLELISM TO <parallelism> [ DEFERRED ]`
186    SetBackfillParallelism {
187        parallelism: SetVariableValue,
188        deferred: bool,
189    },
190    /// `SET RESOURCE_GROUP TO 'RESOURCE GROUP' [ DEFERRED ]`
191    /// `RESET RESOURCE_GROUP [ DEFERRED ]`
192    SetResourceGroup {
193        resource_group: Option<SetVariableValue>,
194        deferred: bool,
195    },
196    /// `SET BACKFILL_RATE_LIMIT TO <rate_limit>`
197    SetBackfillRateLimit {
198        rate_limit: i32,
199    },
200    /// `SWAP WITH <view_name>`
201    SwapRenameView {
202        target_view: ObjectName,
203    },
204    SetStreamingEnableUnalignedJoin {
205        enable: bool,
206    },
207    /// `AS <query>`
208    AsQuery {
209        query: Box<Query>,
210    },
211    /// `SET CONFIG ( streaming.some_config_key = <some_config_value>, .. )`
212    SetConfig {
213        entries: Vec<SqlOption>,
214    },
215    /// `RESET CONFIG ( streaming.some_config_key, .. )`
216    ResetConfig {
217        keys: Vec<ObjectName>,
218    },
219}
220
221#[derive(Debug, Clone, PartialEq, Eq, Hash)]
222pub enum AlterSinkOperation {
223    RenameSink {
224        sink_name: ObjectName,
225    },
226    ChangeOwner {
227        new_owner_name: Ident,
228    },
229    SetSchema {
230        new_schema_name: ObjectName,
231    },
232    /// `SET PARALLELISM TO <parallelism> [ DEFERRED ]`
233    SetParallelism {
234        parallelism: SetVariableValue,
235        deferred: bool,
236    },
237    /// `SET BACKFILL_PARALLELISM TO <parallelism> [ DEFERRED ]`
238    SetBackfillParallelism {
239        parallelism: SetVariableValue,
240        deferred: bool,
241    },
242    /// `SET CONFIG (key = value, ...)`
243    SetConfig {
244        entries: Vec<SqlOption>,
245    },
246    /// `RESET CONFIG (key, ...)`
247    ResetConfig {
248        keys: Vec<ObjectName>,
249    },
250    /// `SWAP WITH <sink_name>`
251    SwapRenameSink {
252        target_sink: ObjectName,
253    },
254    SetSinkRateLimit {
255        rate_limit: i32,
256    },
257    SetBackfillRateLimit {
258        rate_limit: i32,
259    },
260    AlterConnectorProps {
261        alter_props: Vec<SqlOption>,
262    },
263    SetStreamingEnableUnalignedJoin {
264        enable: bool,
265    },
266}
267
268#[derive(Debug, Clone, PartialEq, Eq, Hash)]
269pub enum AlterSubscriptionOperation {
270    RenameSubscription { subscription_name: ObjectName },
271    ChangeOwner { new_owner_name: Ident },
272    SetSchema { new_schema_name: ObjectName },
273    SetRetention { retention: Value },
274    SwapRenameSubscription { target_subscription: ObjectName },
275}
276
277#[derive(Debug, Clone, PartialEq, Eq, Hash)]
278pub enum AlterSourceOperation {
279    RenameSource {
280        source_name: ObjectName,
281    },
282    AddColumn {
283        column_def: ColumnDef,
284    },
285    ChangeOwner {
286        new_owner_name: Ident,
287    },
288    SetSchema {
289        new_schema_name: ObjectName,
290    },
291    FormatEncode {
292        format_encode: FormatEncodeOptions,
293    },
294    RefreshSchema,
295    SetSourceRateLimit {
296        rate_limit: i32,
297    },
298    SwapRenameSource {
299        target_source: ObjectName,
300    },
301    /// `SET PARALLELISM TO <parallelism> [ DEFERRED ]`
302    SetParallelism {
303        parallelism: SetVariableValue,
304        deferred: bool,
305    },
306    /// `SET BACKFILL_PARALLELISM TO <parallelism> [ DEFERRED ]`
307    SetBackfillParallelism {
308        parallelism: SetVariableValue,
309        deferred: bool,
310    },
311    /// `SET CONFIG (key = value, ...)`
312    SetConfig {
313        entries: Vec<SqlOption>,
314    },
315    /// `RESET CONFIG (key, ...)`
316    ResetConfig {
317        keys: Vec<ObjectName>,
318    },
319    /// `RESET` - Reset CDC source offset to latest
320    ResetSource,
321    AlterConnectorProps {
322        alter_props: Vec<SqlOption>,
323    },
324}
325
326#[derive(Debug, Clone, PartialEq, Eq, Hash)]
327pub enum AlterFunctionOperation {
328    SetSchema { new_schema_name: ObjectName },
329    ChangeOwner { new_owner_name: Ident },
330}
331
332#[derive(Debug, Clone, PartialEq, Eq, Hash)]
333pub enum AlterConnectionOperation {
334    SetSchema { new_schema_name: ObjectName },
335    ChangeOwner { new_owner_name: Ident },
336    AlterConnectorProps { alter_props: Vec<SqlOption> },
337}
338
339#[derive(Debug, Clone, PartialEq, Eq, Hash)]
340pub enum AlterSecretOperation {
341    ChangeCredential {
342        with_options: Vec<SqlOption>,
343        new_credential: Value,
344    },
345    ChangeOwner {
346        new_owner_name: Ident,
347    },
348}
349
350#[derive(Debug, Clone, PartialEq, Eq, Hash)]
351pub enum AlterFragmentOperation {
352    AlterBackfillRateLimit { rate_limit: i32 },
353    SetParallelism { parallelism: SetVariableValue },
354}
355
356#[derive(Debug, Clone, PartialEq, Eq, Hash)]
357pub enum AlterCompactionGroupOperation {
358    Set { configs: Vec<ConfigParam> },
359}
360
361impl fmt::Display for AlterDatabaseOperation {
362    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
363        match self {
364            AlterDatabaseOperation::ChangeOwner { new_owner_name } => {
365                write!(f, "OWNER TO {}", new_owner_name)
366            }
367            AlterDatabaseOperation::RenameDatabase { database_name } => {
368                write!(f, "RENAME TO {}", database_name)
369            }
370            AlterDatabaseOperation::SetParam(ConfigParam { param, value }) => {
371                write!(f, "SET {} TO {}", param, value)
372            }
373        }
374    }
375}
376
377impl fmt::Display for AlterSchemaOperation {
378    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
379        match self {
380            AlterSchemaOperation::ChangeOwner { new_owner_name } => {
381                write!(f, "OWNER TO {}", new_owner_name)
382            }
383            AlterSchemaOperation::RenameSchema { schema_name } => {
384                write!(f, "RENAME TO {}", schema_name)
385            }
386            AlterSchemaOperation::SwapRenameSchema { target_schema } => {
387                write!(f, "SWAP WITH {}", target_schema)
388            }
389        }
390    }
391}
392
393impl fmt::Display for AlterTableOperation {
394    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
395        match self {
396            AlterTableOperation::AddConstraint(c) => write!(f, "ADD {}", c),
397            AlterTableOperation::AddColumn { column_def } => {
398                write!(f, "ADD COLUMN {}", column_def)
399            }
400            AlterTableOperation::AlterColumn { column_name, op } => {
401                write!(f, "ALTER COLUMN {} {}", column_name, op)
402            }
403            AlterTableOperation::AlterWatermark {
404                column_name,
405                expr,
406                with_ttl,
407            } => {
408                write!(f, "ALTER WATERMARK FOR {} AS {}", column_name, expr)?;
409                if *with_ttl {
410                    write!(f, " WITH TTL")?;
411                }
412                Ok(())
413            }
414            AlterTableOperation::DropConstraint { name } => write!(f, "DROP CONSTRAINT {}", name),
415            AlterTableOperation::DropColumn {
416                column_name,
417                if_exists,
418                cascade,
419            } => write!(
420                f,
421                "DROP COLUMN {}{}{}",
422                if *if_exists { "IF EXISTS " } else { "" },
423                column_name,
424                if *cascade { " CASCADE" } else { "" }
425            ),
426            AlterTableOperation::RenameColumn {
427                old_column_name,
428                new_column_name,
429            } => write!(
430                f,
431                "RENAME COLUMN {} TO {}",
432                old_column_name, new_column_name
433            ),
434            AlterTableOperation::RenameTable { table_name } => {
435                write!(f, "RENAME TO {}", table_name)
436            }
437            AlterTableOperation::ChangeColumn {
438                old_name,
439                new_name,
440                data_type,
441                options,
442            } => {
443                write!(f, "CHANGE COLUMN {} {} {}", old_name, new_name, data_type)?;
444                if options.is_empty() {
445                    Ok(())
446                } else {
447                    write!(f, " {}", display_separated(options, " "))
448                }
449            }
450            AlterTableOperation::RenameConstraint { old_name, new_name } => {
451                write!(f, "RENAME CONSTRAINT {} TO {}", old_name, new_name)
452            }
453            AlterTableOperation::ChangeOwner { new_owner_name } => {
454                write!(f, "OWNER TO {}", new_owner_name)
455            }
456            AlterTableOperation::SetSchema { new_schema_name } => {
457                write!(f, "SET SCHEMA {}", new_schema_name)
458            }
459            AlterTableOperation::SetParallelism {
460                parallelism,
461                deferred,
462            } => {
463                write!(
464                    f,
465                    "SET PARALLELISM TO {}{}",
466                    parallelism,
467                    if *deferred { " DEFERRED" } else { "" }
468                )
469            }
470            AlterTableOperation::SetBackfillParallelism {
471                parallelism,
472                deferred,
473            } => {
474                write!(
475                    f,
476                    "SET BACKFILL_PARALLELISM TO {}{}",
477                    parallelism,
478                    if *deferred { " DEFERRED" } else { "" }
479                )
480            }
481            AlterTableOperation::SetConfig { entries } => {
482                write!(f, "SET CONFIG ({})", display_comma_separated(entries))
483            }
484            AlterTableOperation::ResetConfig { keys } => {
485                write!(f, "RESET CONFIG ({})", display_comma_separated(keys))
486            }
487            AlterTableOperation::RefreshSchema => {
488                write!(f, "REFRESH SCHEMA")
489            }
490            AlterTableOperation::SetSourceRateLimit { rate_limit } => {
491                write!(f, "SET SOURCE_RATE_LIMIT TO {}", rate_limit)
492            }
493            AlterTableOperation::SetBackfillRateLimit { rate_limit } => {
494                write!(f, "SET BACKFILL_RATE_LIMIT TO {}", rate_limit)
495            }
496            AlterTableOperation::SetDmlRateLimit { rate_limit } => {
497                write!(f, "SET DML_RATE_LIMIT TO {}", rate_limit)
498            }
499            AlterTableOperation::SwapRenameTable { target_table } => {
500                write!(f, "SWAP WITH {}", target_table)
501            }
502            AlterTableOperation::DropConnector => {
503                write!(f, "DROP CONNECTOR")
504            }
505            AlterTableOperation::AlterConnectorProps { alter_props } => {
506                write!(
507                    f,
508                    "CONNECTOR WITH ({})",
509                    display_comma_separated(alter_props)
510                )
511            }
512        }
513    }
514}
515
516impl fmt::Display for AlterIndexOperation {
517    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
518        match self {
519            AlterIndexOperation::RenameIndex { index_name } => {
520                write!(f, "RENAME TO {index_name}")
521            }
522            AlterIndexOperation::SetParallelism {
523                parallelism,
524                deferred,
525            } => {
526                write!(
527                    f,
528                    "SET PARALLELISM TO {}{}",
529                    parallelism,
530                    if *deferred { " DEFERRED" } else { "" }
531                )
532            }
533            AlterIndexOperation::SetBackfillParallelism {
534                parallelism,
535                deferred,
536            } => {
537                write!(
538                    f,
539                    "SET BACKFILL_PARALLELISM TO {}{}",
540                    parallelism,
541                    if *deferred { " DEFERRED" } else { "" }
542                )
543            }
544            AlterIndexOperation::SetConfig { entries } => {
545                write!(f, "SET CONFIG ({})", display_comma_separated(entries))
546            }
547            AlterIndexOperation::ResetConfig { keys } => {
548                write!(f, "RESET CONFIG ({})", display_comma_separated(keys))
549            }
550        }
551    }
552}
553
554impl fmt::Display for AlterViewOperation {
555    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
556        match self {
557            AlterViewOperation::RenameView { view_name } => {
558                write!(f, "RENAME TO {view_name}")
559            }
560            AlterViewOperation::ChangeOwner { new_owner_name } => {
561                write!(f, "OWNER TO {}", new_owner_name)
562            }
563            AlterViewOperation::SetSchema { new_schema_name } => {
564                write!(f, "SET SCHEMA {}", new_schema_name)
565            }
566            AlterViewOperation::SetParallelism {
567                parallelism,
568                deferred,
569            } => {
570                write!(
571                    f,
572                    "SET PARALLELISM TO {}{}",
573                    parallelism,
574                    if *deferred { " DEFERRED" } else { "" }
575                )
576            }
577            AlterViewOperation::SetBackfillParallelism {
578                parallelism,
579                deferred,
580            } => {
581                write!(
582                    f,
583                    "SET BACKFILL_PARALLELISM TO {}{}",
584                    parallelism,
585                    if *deferred { " DEFERRED" } else { "" }
586                )
587            }
588            AlterViewOperation::SetBackfillRateLimit { rate_limit } => {
589                write!(f, "SET BACKFILL_RATE_LIMIT TO {}", rate_limit)
590            }
591            AlterViewOperation::SwapRenameView { target_view } => {
592                write!(f, "SWAP WITH {}", target_view)
593            }
594            AlterViewOperation::SetResourceGroup {
595                resource_group,
596                deferred,
597            } => {
598                let deferred = if *deferred { " DEFERRED" } else { "" };
599
600                if let Some(resource_group) = resource_group {
601                    write!(f, "SET RESOURCE_GROUP TO {} {}", resource_group, deferred)
602                } else {
603                    write!(f, "RESET RESOURCE_GROUP {}", deferred)
604                }
605            }
606            AlterViewOperation::SetStreamingEnableUnalignedJoin { enable } => {
607                write!(f, "SET STREAMING_ENABLE_UNALIGNED_JOIN TO {}", enable)
608            }
609            AlterViewOperation::AsQuery { query } => {
610                write!(f, "AS {}", query)
611            }
612            AlterViewOperation::SetConfig { entries } => {
613                write!(f, "SET CONFIG ({})", display_comma_separated(entries))
614            }
615            AlterViewOperation::ResetConfig { keys } => {
616                write!(f, "RESET CONFIG ({})", display_comma_separated(keys))
617            }
618        }
619    }
620}
621
622impl fmt::Display for AlterSinkOperation {
623    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
624        match self {
625            AlterSinkOperation::RenameSink { sink_name } => {
626                write!(f, "RENAME TO {sink_name}")
627            }
628            AlterSinkOperation::ChangeOwner { new_owner_name } => {
629                write!(f, "OWNER TO {}", new_owner_name)
630            }
631            AlterSinkOperation::SetSchema { new_schema_name } => {
632                write!(f, "SET SCHEMA {}", new_schema_name)
633            }
634            AlterSinkOperation::SetParallelism {
635                parallelism,
636                deferred,
637            } => {
638                write!(
639                    f,
640                    "SET PARALLELISM TO {}{}",
641                    parallelism,
642                    if *deferred { " DEFERRED" } else { "" }
643                )
644            }
645            AlterSinkOperation::SetBackfillParallelism {
646                parallelism,
647                deferred,
648            } => {
649                write!(
650                    f,
651                    "SET BACKFILL_PARALLELISM TO {}{}",
652                    parallelism,
653                    if *deferred { " DEFERRED" } else { "" }
654                )
655            }
656            AlterSinkOperation::SetConfig { entries } => {
657                write!(f, "SET CONFIG ({})", display_comma_separated(entries))
658            }
659            AlterSinkOperation::ResetConfig { keys } => {
660                write!(f, "RESET CONFIG ({})", display_comma_separated(keys))
661            }
662            AlterSinkOperation::SwapRenameSink { target_sink } => {
663                write!(f, "SWAP WITH {}", target_sink)
664            }
665            AlterSinkOperation::SetSinkRateLimit { rate_limit } => {
666                write!(f, "SET SINK_RATE_LIMIT TO {}", rate_limit)
667            }
668            AlterSinkOperation::SetBackfillRateLimit { rate_limit } => {
669                write!(f, "SET BACKFILL_RATE_LIMIT TO {}", rate_limit)
670            }
671            AlterSinkOperation::AlterConnectorProps {
672                alter_props: changed_props,
673            } => {
674                write!(
675                    f,
676                    "CONNECTOR WITH ({})",
677                    display_comma_separated(changed_props)
678                )
679            }
680            AlterSinkOperation::SetStreamingEnableUnalignedJoin { enable } => {
681                write!(f, "SET STREAMING_ENABLE_UNALIGNED_JOIN TO {}", enable)
682            }
683        }
684    }
685}
686
687impl fmt::Display for AlterSubscriptionOperation {
688    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
689        match self {
690            AlterSubscriptionOperation::RenameSubscription { subscription_name } => {
691                write!(f, "RENAME TO {subscription_name}")
692            }
693            AlterSubscriptionOperation::ChangeOwner { new_owner_name } => {
694                write!(f, "OWNER TO {}", new_owner_name)
695            }
696            AlterSubscriptionOperation::SetSchema { new_schema_name } => {
697                write!(f, "SET SCHEMA {}", new_schema_name)
698            }
699            AlterSubscriptionOperation::SetRetention { retention } => {
700                write!(f, "SET RETENTION TO {}", retention)
701            }
702            AlterSubscriptionOperation::SwapRenameSubscription {
703                target_subscription,
704            } => {
705                write!(f, "SWAP WITH {}", target_subscription)
706            }
707        }
708    }
709}
710
711impl fmt::Display for AlterSourceOperation {
712    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
713        match self {
714            AlterSourceOperation::RenameSource { source_name } => {
715                write!(f, "RENAME TO {source_name}")
716            }
717            AlterSourceOperation::AddColumn { column_def } => {
718                write!(f, "ADD COLUMN {column_def}")
719            }
720            AlterSourceOperation::ChangeOwner { new_owner_name } => {
721                write!(f, "OWNER TO {}", new_owner_name)
722            }
723            AlterSourceOperation::SetSchema { new_schema_name } => {
724                write!(f, "SET SCHEMA {}", new_schema_name)
725            }
726            AlterSourceOperation::FormatEncode { format_encode } => {
727                write!(f, "{format_encode}")
728            }
729            AlterSourceOperation::RefreshSchema => {
730                write!(f, "REFRESH SCHEMA")
731            }
732            AlterSourceOperation::SetSourceRateLimit { rate_limit } => {
733                write!(f, "SET SOURCE_RATE_LIMIT TO {}", rate_limit)
734            }
735            AlterSourceOperation::SwapRenameSource { target_source } => {
736                write!(f, "SWAP WITH {}", target_source)
737            }
738            AlterSourceOperation::SetParallelism {
739                parallelism,
740                deferred,
741            } => {
742                write!(
743                    f,
744                    "SET PARALLELISM TO {}{}",
745                    parallelism,
746                    if *deferred { " DEFERRED" } else { "" }
747                )
748            }
749            AlterSourceOperation::SetBackfillParallelism {
750                parallelism,
751                deferred,
752            } => {
753                write!(
754                    f,
755                    "SET BACKFILL_PARALLELISM TO {}{}",
756                    parallelism,
757                    if *deferred { " DEFERRED" } else { "" }
758                )
759            }
760            AlterSourceOperation::SetConfig { entries } => {
761                write!(f, "SET CONFIG ({})", display_comma_separated(entries))
762            }
763            AlterSourceOperation::ResetConfig { keys } => {
764                write!(f, "RESET CONFIG ({})", display_comma_separated(keys))
765            }
766            AlterSourceOperation::ResetSource => {
767                write!(f, "RESET")
768            }
769            AlterSourceOperation::AlterConnectorProps { alter_props } => {
770                write!(
771                    f,
772                    "CONNECTOR WITH ({})",
773                    display_comma_separated(alter_props)
774                )
775            }
776        }
777    }
778}
779
780impl fmt::Display for AlterFunctionOperation {
781    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
782        match self {
783            AlterFunctionOperation::SetSchema { new_schema_name } => {
784                write!(f, "SET SCHEMA {new_schema_name}")
785            }
786            AlterFunctionOperation::ChangeOwner { new_owner_name } => {
787                write!(f, "OWNER TO {new_owner_name}")
788            }
789        }
790    }
791}
792
793impl fmt::Display for AlterConnectionOperation {
794    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
795        match self {
796            AlterConnectionOperation::SetSchema { new_schema_name } => {
797                write!(f, "SET SCHEMA {new_schema_name}")
798            }
799            AlterConnectionOperation::ChangeOwner { new_owner_name } => {
800                write!(f, "OWNER TO {new_owner_name}")
801            }
802            AlterConnectionOperation::AlterConnectorProps { alter_props } => {
803                write!(
804                    f,
805                    "CONNECTOR WITH ({})",
806                    display_comma_separated(alter_props)
807                )
808            }
809        }
810    }
811}
812
813impl fmt::Display for AlterSecretOperation {
814    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
815        match self {
816            AlterSecretOperation::ChangeCredential {
817                new_credential,
818                with_options,
819            } => {
820                write!(
821                    f,
822                    "WITH ({}) AS {}",
823                    display_comma_separated(with_options),
824                    new_credential
825                )
826            }
827            AlterSecretOperation::ChangeOwner { new_owner_name } => {
828                write!(f, "OWNER TO {new_owner_name}")
829            }
830        }
831    }
832}
833
834/// An `ALTER COLUMN` (`Statement::AlterTable`) operation
835#[derive(Debug, Clone, PartialEq, Eq, Hash)]
836pub enum AlterColumnOperation {
837    /// `SET NOT NULL`
838    SetNotNull,
839    /// `DROP NOT NULL`
840    DropNotNull,
841    /// `SET DEFAULT <expr>`
842    SetDefault { value: Expr },
843    /// `DROP DEFAULT`
844    DropDefault,
845    /// `[SET DATA] TYPE <data_type> [USING <expr>]`
846    SetDataType {
847        data_type: DataType,
848        /// PostgreSQL specific
849        using: Option<Expr>,
850    },
851}
852
853impl fmt::Display for AlterColumnOperation {
854    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
855        match self {
856            AlterColumnOperation::SetNotNull => write!(f, "SET NOT NULL",),
857            AlterColumnOperation::DropNotNull => write!(f, "DROP NOT NULL",),
858            AlterColumnOperation::SetDefault { value } => {
859                write!(f, "SET DEFAULT {}", value)
860            }
861            AlterColumnOperation::DropDefault => {
862                write!(f, "DROP DEFAULT")
863            }
864            AlterColumnOperation::SetDataType { data_type, using } => {
865                if let Some(expr) = using {
866                    write!(f, "SET DATA TYPE {} USING {}", data_type, expr)
867                } else {
868                    write!(f, "SET DATA TYPE {}", data_type)
869                }
870            }
871        }
872    }
873}
874
875impl fmt::Display for AlterFragmentOperation {
876    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
877        match self {
878            AlterFragmentOperation::AlterBackfillRateLimit { rate_limit } => {
879                write!(f, "SET BACKFILL_RATE_LIMIT TO {}", rate_limit)
880            }
881            AlterFragmentOperation::SetParallelism { parallelism } => {
882                write!(f, "SET PARALLELISM TO {}", parallelism)
883            }
884        }
885    }
886}
887
888impl fmt::Display for AlterCompactionGroupOperation {
889    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
890        match self {
891            AlterCompactionGroupOperation::Set { configs } => {
892                struct Assign<'a>(&'a ConfigParam);
893
894                impl fmt::Display for Assign<'_> {
895                    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
896                        write!(f, "{} = {}", self.0.param, self.0.value)
897                    }
898                }
899
900                let assigns: Vec<Assign<'_>> = configs.iter().map(Assign).collect();
901                write!(f, "SET {}", display_comma_separated(&assigns))
902            }
903        }
904    }
905}
906
907/// The watermark on source.
908/// `WATERMARK FOR <column> AS (<expr>)`
909#[derive(Debug, Clone, PartialEq, Eq, Hash)]
910pub struct SourceWatermark {
911    pub column: Ident,
912    pub expr: Expr,
913    /// Whether `WITH TTL` is specified.
914    pub with_ttl: bool,
915}
916
917impl fmt::Display for SourceWatermark {
918    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
919        write!(f, "WATERMARK FOR {} AS {}", self.column, self.expr,)?;
920        if self.with_ttl {
921            write!(f, " WITH TTL")?;
922        }
923        Ok(())
924    }
925}
926
927/// A table-level constraint, specified in a `CREATE TABLE` or an
928/// `ALTER TABLE ADD <constraint>` statement.
929#[derive(Debug, Clone, PartialEq, Eq, Hash)]
930pub enum TableConstraint {
931    /// `[ CONSTRAINT <name> ] { PRIMARY KEY | UNIQUE } (<columns>)`
932    Unique {
933        name: Option<Ident>,
934        columns: Vec<Ident>,
935        /// Whether this is a `PRIMARY KEY` or just a `UNIQUE` constraint
936        is_primary: bool,
937    },
938    /// A referential integrity constraint (`[ CONSTRAINT <name> ] FOREIGN KEY (<columns>)
939    /// REFERENCES <foreign_table> (<referred_columns>)
940    /// { [ON DELETE <referential_action>] [ON UPDATE <referential_action>] |
941    ///   [ON UPDATE <referential_action>] [ON DELETE <referential_action>]
942    /// }`).
943    ForeignKey {
944        name: Option<Ident>,
945        columns: Vec<Ident>,
946        foreign_table: ObjectName,
947        referred_columns: Vec<Ident>,
948        on_delete: Option<ReferentialAction>,
949        on_update: Option<ReferentialAction>,
950    },
951    /// `[ CONSTRAINT <name> ] CHECK (<expr>)`
952    Check {
953        name: Option<Ident>,
954        expr: Box<Expr>,
955    },
956}
957
958impl fmt::Display for TableConstraint {
959    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
960        match self {
961            TableConstraint::Unique {
962                name,
963                columns,
964                is_primary,
965            } => write!(
966                f,
967                "{}{} ({})",
968                display_constraint_name(name),
969                if *is_primary { "PRIMARY KEY" } else { "UNIQUE" },
970                display_comma_separated(columns)
971            ),
972            TableConstraint::ForeignKey {
973                name,
974                columns,
975                foreign_table,
976                referred_columns,
977                on_delete,
978                on_update,
979            } => {
980                write!(
981                    f,
982                    "{}FOREIGN KEY ({}) REFERENCES {}({})",
983                    display_constraint_name(name),
984                    display_comma_separated(columns),
985                    foreign_table,
986                    display_comma_separated(referred_columns),
987                )?;
988                if let Some(action) = on_delete {
989                    write!(f, " ON DELETE {}", action)?;
990                }
991                if let Some(action) = on_update {
992                    write!(f, " ON UPDATE {}", action)?;
993                }
994                Ok(())
995            }
996            TableConstraint::Check { name, expr } => {
997                write!(f, "{}CHECK ({})", display_constraint_name(name), expr)
998            }
999        }
1000    }
1001}
1002
1003/// SQL column definition
1004#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1005pub struct ColumnDef {
1006    pub name: Ident,
1007    pub data_type: Option<DataType>,
1008    pub collation: Option<ObjectName>,
1009    pub options: Vec<ColumnOptionDef>,
1010}
1011
1012impl ColumnDef {
1013    pub fn new(
1014        name: Ident,
1015        data_type: DataType,
1016        collation: Option<ObjectName>,
1017        options: Vec<ColumnOptionDef>,
1018    ) -> Self {
1019        ColumnDef {
1020            name,
1021            data_type: Some(data_type),
1022            collation,
1023            options,
1024        }
1025    }
1026
1027    pub fn is_generated(&self) -> bool {
1028        self.options
1029            .iter()
1030            .any(|option| matches!(option.option, ColumnOption::GeneratedColumns(_)))
1031    }
1032}
1033
1034impl fmt::Display for ColumnDef {
1035    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1036        write!(
1037            f,
1038            "{} {}",
1039            self.name,
1040            if let Some(data_type) = &self.data_type {
1041                data_type.to_string()
1042            } else {
1043                "None".to_owned()
1044            }
1045        )?;
1046        for option in &self.options {
1047            write!(f, " {}", option)?;
1048        }
1049        Ok(())
1050    }
1051}
1052
1053/// An optionally-named `ColumnOption`: `[ CONSTRAINT <name> ] <column-option>`.
1054///
1055/// Note that implementations are substantially more permissive than the ANSI
1056/// specification on what order column options can be presented in, and whether
1057/// they are allowed to be named. The specification distinguishes between
1058/// constraints (NOT NULL, UNIQUE, PRIMARY KEY, and CHECK), which can be named
1059/// and can appear in any order, and other options (DEFAULT, GENERATED), which
1060/// cannot be named and must appear in a fixed order. PostgreSQL, however,
1061/// allows preceding any option with `CONSTRAINT <name>`, even those that are
1062/// not really constraints, like NULL and DEFAULT. MSSQL is less permissive,
1063/// allowing DEFAULT, UNIQUE, PRIMARY KEY and CHECK to be named, but not NULL or
1064/// NOT NULL constraints (the last of which is in violation of the spec).
1065///
1066/// For maximum flexibility, we don't distinguish between constraint and
1067/// non-constraint options, lumping them all together under the umbrella of
1068/// "column options," and we allow any column option to be named.
1069#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1070pub struct ColumnOptionDef {
1071    pub name: Option<Ident>,
1072    pub option: ColumnOption,
1073}
1074
1075impl fmt::Display for ColumnOptionDef {
1076    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1077        write!(f, "{}{}", display_constraint_name(&self.name), self.option)
1078    }
1079}
1080
1081/// `ColumnOption`s are modifiers that follow a column definition in a `CREATE
1082/// TABLE` statement.
1083#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1084pub enum ColumnOption {
1085    /// `NULL`
1086    Null,
1087    /// `NOT NULL`
1088    NotNull,
1089    /// `DEFAULT <restricted-expr>`
1090    DefaultValue(Expr),
1091    /// Default value from previous bound `DefaultColumnDesc`. Used internally
1092    /// for schema change and should not be specified by users.
1093    DefaultValueInternal {
1094        /// Protobuf encoded `DefaultColumnDesc`.
1095        persisted: Box<[u8]>,
1096        /// Optional AST for unparsing. If `None`, the default value will be
1097        /// shown as `DEFAULT INTERNAL` which is for demonstrating and should
1098        /// not be specified by users.
1099        expr: Option<Expr>,
1100    },
1101    /// `{ PRIMARY KEY | UNIQUE }`
1102    Unique { is_primary: bool },
1103    /// A referential integrity constraint (`[FOREIGN KEY REFERENCES
1104    /// <foreign_table> (<referred_columns>)
1105    /// { [ON DELETE <referential_action>] [ON UPDATE <referential_action>] |
1106    ///   [ON UPDATE <referential_action>] [ON DELETE <referential_action>]
1107    /// }`).
1108    ForeignKey {
1109        foreign_table: ObjectName,
1110        referred_columns: Vec<Ident>,
1111        on_delete: Option<ReferentialAction>,
1112        on_update: Option<ReferentialAction>,
1113    },
1114    /// `CHECK (<expr>)`
1115    Check(Expr),
1116    /// Dialect-specific options, such as:
1117    /// - MySQL's `AUTO_INCREMENT` or SQLite's `AUTOINCREMENT`
1118    /// - ...
1119    DialectSpecific(Vec<Token>),
1120    /// AS ( <generation_expr> )`
1121    GeneratedColumns(Expr),
1122}
1123
1124impl fmt::Display for ColumnOption {
1125    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1126        use ColumnOption::*;
1127        match self {
1128            Null => write!(f, "NULL"),
1129            NotNull => write!(f, "NOT NULL"),
1130            DefaultValue(expr) => write!(f, "DEFAULT {}", expr),
1131            DefaultValueInternal { persisted: _, expr } => {
1132                if let Some(expr) = expr {
1133                    write!(f, "DEFAULT {}", expr)
1134                } else {
1135                    write!(f, "DEFAULT INTERNAL")
1136                }
1137            }
1138            Unique { is_primary } => {
1139                write!(f, "{}", if *is_primary { "PRIMARY KEY" } else { "UNIQUE" })
1140            }
1141            ForeignKey {
1142                foreign_table,
1143                referred_columns,
1144                on_delete,
1145                on_update,
1146            } => {
1147                write!(f, "REFERENCES {}", foreign_table)?;
1148                if !referred_columns.is_empty() {
1149                    write!(f, " ({})", display_comma_separated(referred_columns))?;
1150                }
1151                if let Some(action) = on_delete {
1152                    write!(f, " ON DELETE {}", action)?;
1153                }
1154                if let Some(action) = on_update {
1155                    write!(f, " ON UPDATE {}", action)?;
1156                }
1157                Ok(())
1158            }
1159            Check(expr) => write!(f, "CHECK ({})", expr),
1160            DialectSpecific(val) => write!(f, "{}", display_separated(val, " ")),
1161            GeneratedColumns(expr) => write!(f, "AS {}", expr),
1162        }
1163    }
1164}
1165
1166fn display_constraint_name(name: &'_ Option<Ident>) -> impl fmt::Display + '_ {
1167    struct ConstraintName<'a>(&'a Option<Ident>);
1168    impl fmt::Display for ConstraintName<'_> {
1169        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1170            if let Some(name) = self.0 {
1171                write!(f, "CONSTRAINT {} ", name)?;
1172            }
1173            Ok(())
1174        }
1175    }
1176    ConstraintName(name)
1177}
1178
1179/// `<referential_action> =
1180/// { RESTRICT | CASCADE | SET NULL | NO ACTION | SET DEFAULT }`
1181///
1182/// Used in foreign key constraints in `ON UPDATE` and `ON DELETE` options.
1183#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1184pub enum ReferentialAction {
1185    Restrict,
1186    Cascade,
1187    SetNull,
1188    NoAction,
1189    SetDefault,
1190}
1191
1192impl fmt::Display for ReferentialAction {
1193    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1194        f.write_str(match self {
1195            ReferentialAction::Restrict => "RESTRICT",
1196            ReferentialAction::Cascade => "CASCADE",
1197            ReferentialAction::SetNull => "SET NULL",
1198            ReferentialAction::NoAction => "NO ACTION",
1199            ReferentialAction::SetDefault => "SET DEFAULT",
1200        })
1201    }
1202}
1203
1204/// secure secret definition for webhook source
1205#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1206pub struct WebhookSourceInfo {
1207    pub secret_ref: Option<SecretRefValue>,
1208    pub signature_expr: Option<Expr>,
1209    pub wait_for_persistence: bool,
1210    pub is_batched: bool,
1211}