Skip to main content

risingwave_sqlparser/ast/
ddl.rs

1// Licensed under the Apache License, Version 2.0 (the "License");
2// you may not use this file except in compliance with the License.
3// You may obtain a copy of the License at
4//
5//     http://www.apache.org/licenses/LICENSE-2.0
6//
7// Unless required by applicable law or agreed to in writing, software
8// distributed under the License is distributed on an "AS IS" BASIS,
9// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10// See the License for the specific language governing permissions and
11// limitations under the License.
12
13//! AST types specific to CREATE/ALTER variants of [`crate::ast::Statement`]
14//! (commonly referred to as Data Definition Language, or DDL)
15
16use std::fmt;
17
18use super::{ConfigParam, FormatEncodeOptions, SqlOption};
19use crate::ast::{
20    DataType, Expr, Ident, ObjectName, Query, SecretRefValue, SetVariableValue, Value,
21    display_comma_separated, display_separated,
22};
23use crate::tokenizer::Token;
24
25#[derive(Debug, Clone, PartialEq, Eq, Hash)]
26pub enum AlterDatabaseOperation {
27    ChangeOwner {
28        new_owner_name: Ident,
29    },
30    RenameDatabase {
31        database_name: ObjectName,
32    },
33    SetParam(ConfigParam),
34    /// `SET RESOURCE_GROUP TO 'RESOURCE GROUP' [ DEFERRED ]`
35    /// `RESET RESOURCE_GROUP [ DEFERRED ]`
36    SetResourceGroup {
37        resource_group: Option<SetVariableValue>,
38        deferred: bool,
39    },
40}
41
42#[derive(Debug, Clone, PartialEq, Eq, Hash)]
43pub enum AlterSchemaOperation {
44    ChangeOwner { new_owner_name: Ident },
45    RenameSchema { schema_name: ObjectName },
46    SwapRenameSchema { target_schema: ObjectName },
47}
48
49/// An `ALTER TABLE` (`Statement::AlterTable`) operation
50#[derive(Debug, Clone, PartialEq, Eq, Hash)]
51pub enum AlterTableOperation {
52    /// `ADD <table_constraint>`
53    AddConstraint(TableConstraint),
54    /// `ADD [ COLUMN ] <column_def>`
55    AddColumn {
56        column_def: ColumnDef,
57    },
58    /// TODO: implement `DROP CONSTRAINT <name>`
59    DropConstraint {
60        name: Ident,
61    },
62    /// `DROP [ COLUMN ] [ IF EXISTS ] <column_name> [ CASCADE ]`
63    DropColumn {
64        column_name: Ident,
65        if_exists: bool,
66        cascade: bool,
67    },
68    /// `RENAME [ COLUMN ] <old_column_name> TO <new_column_name>`
69    RenameColumn {
70        old_column_name: Ident,
71        new_column_name: Ident,
72    },
73    /// `RENAME TO <table_name>`
74    RenameTable {
75        table_name: ObjectName,
76    },
77    // CHANGE [ COLUMN ] <old_name> <new_name> <data_type> [ <options> ]
78    ChangeColumn {
79        old_name: Ident,
80        new_name: Ident,
81        data_type: DataType,
82        options: Vec<ColumnOption>,
83    },
84    /// `RENAME CONSTRAINT <old_constraint_name> TO <new_constraint_name>`
85    ///
86    /// Note: this is a PostgreSQL-specific operation.
87    RenameConstraint {
88        old_name: Ident,
89        new_name: Ident,
90    },
91    /// `ALTER [ COLUMN ]`
92    AlterColumn {
93        column_name: Ident,
94        op: AlterColumnOperation,
95    },
96    /// `ALTER WATERMARK FOR <column> AS <expr> [WITH TTL]`
97    AlterWatermark {
98        column_name: Ident,
99        expr: Expr,
100        with_ttl: bool,
101    },
102    /// `OWNER TO <owner_name>`
103    ChangeOwner {
104        new_owner_name: Ident,
105    },
106    /// `SET SCHEMA <schema_name>`
107    SetSchema {
108        new_schema_name: ObjectName,
109    },
110    /// `SET PARALLELISM TO <parallelism> [ DEFERRED ]`
111    SetParallelism {
112        parallelism: SetVariableValue,
113        deferred: bool,
114    },
115    /// `SET BACKFILL_PARALLELISM TO <parallelism> [ DEFERRED ]`
116    SetBackfillParallelism {
117        parallelism: SetVariableValue,
118        deferred: bool,
119    },
120    /// `SET CONFIG (key = value, ...)`
121    SetConfig {
122        entries: Vec<SqlOption>,
123    },
124    /// `RESET CONFIG (key, ...)`
125    ResetConfig {
126        keys: Vec<ObjectName>,
127    },
128    RefreshSchema,
129    /// `SET SOURCE_RATE_LIMIT TO <rate_limit>`
130    SetSourceRateLimit {
131        rate_limit: i32,
132    },
133    /// SET BACKFILL_RATE_LIMIT TO <rate_limit>
134    SetBackfillRateLimit {
135        rate_limit: i32,
136    },
137    /// `SET DML_RATE_LIMIT TO <rate_limit>`
138    SetDmlRateLimit {
139        rate_limit: i32,
140    },
141    /// `SWAP WITH <table_name>`
142    SwapRenameTable {
143        target_table: ObjectName,
144    },
145    /// `DROP CONNECTOR`
146    DropConnector,
147
148    /// `ALTER CONNECTOR WITH (<connector_props>)`
149    AlterConnectorProps {
150        alter_props: Vec<SqlOption>,
151    },
152}
153
154#[derive(Debug, Clone, PartialEq, Eq, Hash)]
155pub enum AlterIndexOperation {
156    RenameIndex {
157        index_name: ObjectName,
158    },
159    /// `SET PARALLELISM TO <parallelism> [ DEFERRED ]`
160    SetParallelism {
161        parallelism: SetVariableValue,
162        deferred: bool,
163    },
164    /// `SET BACKFILL_PARALLELISM TO <parallelism> [ DEFERRED ]`
165    SetBackfillParallelism {
166        parallelism: SetVariableValue,
167        deferred: bool,
168    },
169    /// `SET RESOURCE_GROUP TO 'RESOURCE GROUP' [ DEFERRED ]`
170    /// `RESET RESOURCE_GROUP [ DEFERRED ]`
171    SetResourceGroup {
172        resource_group: Option<SetVariableValue>,
173        deferred: bool,
174    },
175    /// `SET CONFIG (key = value, ...)`
176    SetConfig {
177        entries: Vec<SqlOption>,
178    },
179    /// `RESET CONFIG (key, ...)`
180    ResetConfig {
181        keys: Vec<ObjectName>,
182    },
183}
184
185#[derive(Debug, Clone, PartialEq, Eq, Hash)]
186pub enum AlterViewOperation {
187    RenameView {
188        view_name: ObjectName,
189    },
190    ChangeOwner {
191        new_owner_name: Ident,
192    },
193    SetSchema {
194        new_schema_name: ObjectName,
195    },
196    /// `SET PARALLELISM TO <parallelism> [ DEFERRED ]`
197    SetParallelism {
198        parallelism: SetVariableValue,
199        deferred: bool,
200    },
201    /// `SET BACKFILL_PARALLELISM TO <parallelism> [ DEFERRED ]`
202    SetBackfillParallelism {
203        parallelism: SetVariableValue,
204        deferred: bool,
205    },
206    /// `SET RESOURCE_GROUP TO 'RESOURCE GROUP' [ DEFERRED ]`
207    /// `RESET RESOURCE_GROUP [ DEFERRED ]`
208    SetResourceGroup {
209        resource_group: Option<SetVariableValue>,
210        deferred: bool,
211    },
212    /// `SET BACKFILL_RATE_LIMIT TO <rate_limit>`
213    SetBackfillRateLimit {
214        rate_limit: i32,
215    },
216    /// `SWAP WITH <view_name>`
217    SwapRenameView {
218        target_view: ObjectName,
219    },
220    SetStreamingEnableUnalignedJoin {
221        enable: bool,
222    },
223    /// `AS <query>`
224    AsQuery {
225        query: Box<Query>,
226    },
227    /// `SET CONFIG ( streaming.some_config_key = <some_config_value>, .. )`
228    SetConfig {
229        entries: Vec<SqlOption>,
230    },
231    /// `RESET CONFIG ( streaming.some_config_key, .. )`
232    ResetConfig {
233        keys: Vec<ObjectName>,
234    },
235}
236
237#[derive(Debug, Clone, PartialEq, Eq, Hash)]
238pub enum AlterSinkOperation {
239    RenameSink {
240        sink_name: ObjectName,
241    },
242    ChangeOwner {
243        new_owner_name: Ident,
244    },
245    SetSchema {
246        new_schema_name: ObjectName,
247    },
248    /// `SET PARALLELISM TO <parallelism> [ DEFERRED ]`
249    SetParallelism {
250        parallelism: SetVariableValue,
251        deferred: bool,
252    },
253    /// `SET BACKFILL_PARALLELISM TO <parallelism> [ DEFERRED ]`
254    SetBackfillParallelism {
255        parallelism: SetVariableValue,
256        deferred: bool,
257    },
258    /// `SET RESOURCE_GROUP TO 'RESOURCE GROUP' [ DEFERRED ]`
259    /// `RESET RESOURCE_GROUP [ DEFERRED ]`
260    SetResourceGroup {
261        resource_group: Option<SetVariableValue>,
262        deferred: bool,
263    },
264    /// `SET CONFIG (key = value, ...)`
265    SetConfig {
266        entries: Vec<SqlOption>,
267    },
268    /// `RESET CONFIG (key, ...)`
269    ResetConfig {
270        keys: Vec<ObjectName>,
271    },
272    /// `SWAP WITH <sink_name>`
273    SwapRenameSink {
274        target_sink: ObjectName,
275    },
276    SetSinkRateLimit {
277        rate_limit: i32,
278    },
279    SetBackfillRateLimit {
280        rate_limit: i32,
281    },
282    AlterConnectorProps {
283        alter_props: Vec<SqlOption>,
284    },
285    SetStreamingEnableUnalignedJoin {
286        enable: bool,
287    },
288}
289
290#[derive(Debug, Clone, PartialEq, Eq, Hash)]
291pub enum AlterSubscriptionOperation {
292    RenameSubscription { subscription_name: ObjectName },
293    ChangeOwner { new_owner_name: Ident },
294    SetSchema { new_schema_name: ObjectName },
295    SetRetention { retention: Value },
296    SwapRenameSubscription { target_subscription: ObjectName },
297}
298
299#[derive(Debug, Clone, PartialEq, Eq, Hash)]
300pub enum AlterSourceOperation {
301    RenameSource {
302        source_name: ObjectName,
303    },
304    AddColumn {
305        column_def: ColumnDef,
306    },
307    ChangeOwner {
308        new_owner_name: Ident,
309    },
310    SetSchema {
311        new_schema_name: ObjectName,
312    },
313    FormatEncode {
314        format_encode: FormatEncodeOptions,
315    },
316    RefreshSchema,
317    SetSourceRateLimit {
318        rate_limit: i32,
319    },
320    SwapRenameSource {
321        target_source: ObjectName,
322    },
323    /// `SET PARALLELISM TO <parallelism> [ DEFERRED ]`
324    SetParallelism {
325        parallelism: SetVariableValue,
326        deferred: bool,
327    },
328    /// `SET BACKFILL_PARALLELISM TO <parallelism> [ DEFERRED ]`
329    SetBackfillParallelism {
330        parallelism: SetVariableValue,
331        deferred: bool,
332    },
333    /// `SET CONFIG (key = value, ...)`
334    SetConfig {
335        entries: Vec<SqlOption>,
336    },
337    /// `RESET CONFIG (key, ...)`
338    ResetConfig {
339        keys: Vec<ObjectName>,
340    },
341    /// `RESET` - Reset CDC source offset to latest
342    ResetSource,
343    AlterConnectorProps {
344        alter_props: Vec<SqlOption>,
345    },
346}
347
348#[derive(Debug, Clone, PartialEq, Eq, Hash)]
349pub enum AlterFunctionOperation {
350    SetSchema { new_schema_name: ObjectName },
351    ChangeOwner { new_owner_name: Ident },
352}
353
354#[derive(Debug, Clone, PartialEq, Eq, Hash)]
355pub enum AlterConnectionOperation {
356    SetSchema { new_schema_name: ObjectName },
357    ChangeOwner { new_owner_name: Ident },
358    AlterConnectorProps { alter_props: Vec<SqlOption> },
359}
360
361#[derive(Debug, Clone, PartialEq, Eq, Hash)]
362pub enum AlterSecretOperation {
363    ChangeCredential {
364        with_options: Vec<SqlOption>,
365        new_credential: Value,
366    },
367    ChangeOwner {
368        new_owner_name: Ident,
369    },
370}
371
372#[derive(Debug, Clone, PartialEq, Eq, Hash)]
373pub enum AlterFragmentOperation {
374    AlterBackfillRateLimit { rate_limit: i32 },
375    SetParallelism { parallelism: SetVariableValue },
376}
377
378#[derive(Debug, Clone, PartialEq, Eq, Hash)]
379pub enum AlterCompactionGroupOperation {
380    Set { configs: Vec<ConfigParam> },
381}
382
383impl fmt::Display for AlterDatabaseOperation {
384    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
385        match self {
386            AlterDatabaseOperation::ChangeOwner { new_owner_name } => {
387                write!(f, "OWNER TO {}", new_owner_name)
388            }
389            AlterDatabaseOperation::RenameDatabase { database_name } => {
390                write!(f, "RENAME TO {}", database_name)
391            }
392            AlterDatabaseOperation::SetParam(ConfigParam { param, value }) => {
393                write!(f, "SET {} TO {}", param, value)
394            }
395            AlterDatabaseOperation::SetResourceGroup {
396                resource_group,
397                deferred,
398            } => {
399                let deferred = if *deferred { " DEFERRED" } else { "" };
400
401                if let Some(resource_group) = resource_group {
402                    write!(f, "SET RESOURCE_GROUP TO {}{}", resource_group, deferred)
403                } else {
404                    write!(f, "RESET RESOURCE_GROUP{}", deferred)
405                }
406            }
407        }
408    }
409}
410
411impl fmt::Display for AlterSchemaOperation {
412    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
413        match self {
414            AlterSchemaOperation::ChangeOwner { new_owner_name } => {
415                write!(f, "OWNER TO {}", new_owner_name)
416            }
417            AlterSchemaOperation::RenameSchema { schema_name } => {
418                write!(f, "RENAME TO {}", schema_name)
419            }
420            AlterSchemaOperation::SwapRenameSchema { target_schema } => {
421                write!(f, "SWAP WITH {}", target_schema)
422            }
423        }
424    }
425}
426
427impl fmt::Display for AlterTableOperation {
428    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
429        match self {
430            AlterTableOperation::AddConstraint(c) => write!(f, "ADD {}", c),
431            AlterTableOperation::AddColumn { column_def } => {
432                write!(f, "ADD COLUMN {}", column_def)
433            }
434            AlterTableOperation::AlterColumn { column_name, op } => {
435                write!(f, "ALTER COLUMN {} {}", column_name, op)
436            }
437            AlterTableOperation::AlterWatermark {
438                column_name,
439                expr,
440                with_ttl,
441            } => {
442                write!(f, "ALTER WATERMARK FOR {} AS {}", column_name, expr)?;
443                if *with_ttl {
444                    write!(f, " WITH TTL")?;
445                }
446                Ok(())
447            }
448            AlterTableOperation::DropConstraint { name } => write!(f, "DROP CONSTRAINT {}", name),
449            AlterTableOperation::DropColumn {
450                column_name,
451                if_exists,
452                cascade,
453            } => write!(
454                f,
455                "DROP COLUMN {}{}{}",
456                if *if_exists { "IF EXISTS " } else { "" },
457                column_name,
458                if *cascade { " CASCADE" } else { "" }
459            ),
460            AlterTableOperation::RenameColumn {
461                old_column_name,
462                new_column_name,
463            } => write!(
464                f,
465                "RENAME COLUMN {} TO {}",
466                old_column_name, new_column_name
467            ),
468            AlterTableOperation::RenameTable { table_name } => {
469                write!(f, "RENAME TO {}", table_name)
470            }
471            AlterTableOperation::ChangeColumn {
472                old_name,
473                new_name,
474                data_type,
475                options,
476            } => {
477                write!(f, "CHANGE COLUMN {} {} {}", old_name, new_name, data_type)?;
478                if options.is_empty() {
479                    Ok(())
480                } else {
481                    write!(f, " {}", display_separated(options, " "))
482                }
483            }
484            AlterTableOperation::RenameConstraint { old_name, new_name } => {
485                write!(f, "RENAME CONSTRAINT {} TO {}", old_name, new_name)
486            }
487            AlterTableOperation::ChangeOwner { new_owner_name } => {
488                write!(f, "OWNER TO {}", new_owner_name)
489            }
490            AlterTableOperation::SetSchema { new_schema_name } => {
491                write!(f, "SET SCHEMA {}", new_schema_name)
492            }
493            AlterTableOperation::SetParallelism {
494                parallelism,
495                deferred,
496            } => {
497                write!(
498                    f,
499                    "SET PARALLELISM TO {}{}",
500                    parallelism,
501                    if *deferred { " DEFERRED" } else { "" }
502                )
503            }
504            AlterTableOperation::SetBackfillParallelism {
505                parallelism,
506                deferred,
507            } => {
508                write!(
509                    f,
510                    "SET BACKFILL_PARALLELISM TO {}{}",
511                    parallelism,
512                    if *deferred { " DEFERRED" } else { "" }
513                )
514            }
515            AlterTableOperation::SetConfig { entries } => {
516                write!(f, "SET CONFIG ({})", display_comma_separated(entries))
517            }
518            AlterTableOperation::ResetConfig { keys } => {
519                write!(f, "RESET CONFIG ({})", display_comma_separated(keys))
520            }
521            AlterTableOperation::RefreshSchema => {
522                write!(f, "REFRESH SCHEMA")
523            }
524            AlterTableOperation::SetSourceRateLimit { rate_limit } => {
525                write!(f, "SET SOURCE_RATE_LIMIT TO {}", rate_limit)
526            }
527            AlterTableOperation::SetBackfillRateLimit { rate_limit } => {
528                write!(f, "SET BACKFILL_RATE_LIMIT TO {}", rate_limit)
529            }
530            AlterTableOperation::SetDmlRateLimit { rate_limit } => {
531                write!(f, "SET DML_RATE_LIMIT TO {}", rate_limit)
532            }
533            AlterTableOperation::SwapRenameTable { target_table } => {
534                write!(f, "SWAP WITH {}", target_table)
535            }
536            AlterTableOperation::DropConnector => {
537                write!(f, "DROP CONNECTOR")
538            }
539            AlterTableOperation::AlterConnectorProps { alter_props } => {
540                write!(
541                    f,
542                    "CONNECTOR WITH ({})",
543                    display_comma_separated(alter_props)
544                )
545            }
546        }
547    }
548}
549
550impl fmt::Display for AlterIndexOperation {
551    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
552        match self {
553            AlterIndexOperation::RenameIndex { index_name } => {
554                write!(f, "RENAME TO {index_name}")
555            }
556            AlterIndexOperation::SetParallelism {
557                parallelism,
558                deferred,
559            } => {
560                write!(
561                    f,
562                    "SET PARALLELISM TO {}{}",
563                    parallelism,
564                    if *deferred { " DEFERRED" } else { "" }
565                )
566            }
567            AlterIndexOperation::SetBackfillParallelism {
568                parallelism,
569                deferred,
570            } => {
571                write!(
572                    f,
573                    "SET BACKFILL_PARALLELISM TO {}{}",
574                    parallelism,
575                    if *deferred { " DEFERRED" } else { "" }
576                )
577            }
578            AlterIndexOperation::SetResourceGroup {
579                resource_group,
580                deferred,
581            } => {
582                let deferred = if *deferred { " DEFERRED" } else { "" };
583
584                if let Some(resource_group) = resource_group {
585                    write!(f, "SET RESOURCE_GROUP TO {}{}", resource_group, deferred)
586                } else {
587                    write!(f, "RESET RESOURCE_GROUP{}", deferred)
588                }
589            }
590            AlterIndexOperation::SetConfig { entries } => {
591                write!(f, "SET CONFIG ({})", display_comma_separated(entries))
592            }
593            AlterIndexOperation::ResetConfig { keys } => {
594                write!(f, "RESET CONFIG ({})", display_comma_separated(keys))
595            }
596        }
597    }
598}
599
600impl fmt::Display for AlterViewOperation {
601    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
602        match self {
603            AlterViewOperation::RenameView { view_name } => {
604                write!(f, "RENAME TO {view_name}")
605            }
606            AlterViewOperation::ChangeOwner { new_owner_name } => {
607                write!(f, "OWNER TO {}", new_owner_name)
608            }
609            AlterViewOperation::SetSchema { new_schema_name } => {
610                write!(f, "SET SCHEMA {}", new_schema_name)
611            }
612            AlterViewOperation::SetParallelism {
613                parallelism,
614                deferred,
615            } => {
616                write!(
617                    f,
618                    "SET PARALLELISM TO {}{}",
619                    parallelism,
620                    if *deferred { " DEFERRED" } else { "" }
621                )
622            }
623            AlterViewOperation::SetBackfillParallelism {
624                parallelism,
625                deferred,
626            } => {
627                write!(
628                    f,
629                    "SET BACKFILL_PARALLELISM TO {}{}",
630                    parallelism,
631                    if *deferred { " DEFERRED" } else { "" }
632                )
633            }
634            AlterViewOperation::SetBackfillRateLimit { rate_limit } => {
635                write!(f, "SET BACKFILL_RATE_LIMIT TO {}", rate_limit)
636            }
637            AlterViewOperation::SwapRenameView { target_view } => {
638                write!(f, "SWAP WITH {}", target_view)
639            }
640            AlterViewOperation::SetResourceGroup {
641                resource_group,
642                deferred,
643            } => {
644                let deferred = if *deferred { " DEFERRED" } else { "" };
645
646                if let Some(resource_group) = resource_group {
647                    write!(f, "SET RESOURCE_GROUP TO {}{}", resource_group, deferred)
648                } else {
649                    write!(f, "RESET RESOURCE_GROUP{}", deferred)
650                }
651            }
652            AlterViewOperation::SetStreamingEnableUnalignedJoin { enable } => {
653                write!(f, "SET STREAMING_ENABLE_UNALIGNED_JOIN TO {}", enable)
654            }
655            AlterViewOperation::AsQuery { query } => {
656                write!(f, "AS {}", query)
657            }
658            AlterViewOperation::SetConfig { entries } => {
659                write!(f, "SET CONFIG ({})", display_comma_separated(entries))
660            }
661            AlterViewOperation::ResetConfig { keys } => {
662                write!(f, "RESET CONFIG ({})", display_comma_separated(keys))
663            }
664        }
665    }
666}
667
668impl fmt::Display for AlterSinkOperation {
669    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
670        match self {
671            AlterSinkOperation::RenameSink { sink_name } => {
672                write!(f, "RENAME TO {sink_name}")
673            }
674            AlterSinkOperation::ChangeOwner { new_owner_name } => {
675                write!(f, "OWNER TO {}", new_owner_name)
676            }
677            AlterSinkOperation::SetSchema { new_schema_name } => {
678                write!(f, "SET SCHEMA {}", new_schema_name)
679            }
680            AlterSinkOperation::SetParallelism {
681                parallelism,
682                deferred,
683            } => {
684                write!(
685                    f,
686                    "SET PARALLELISM TO {}{}",
687                    parallelism,
688                    if *deferred { " DEFERRED" } else { "" }
689                )
690            }
691            AlterSinkOperation::SetBackfillParallelism {
692                parallelism,
693                deferred,
694            } => {
695                write!(
696                    f,
697                    "SET BACKFILL_PARALLELISM TO {}{}",
698                    parallelism,
699                    if *deferred { " DEFERRED" } else { "" }
700                )
701            }
702            AlterSinkOperation::SetResourceGroup {
703                resource_group,
704                deferred,
705            } => {
706                let deferred = if *deferred { " DEFERRED" } else { "" };
707
708                if let Some(resource_group) = resource_group {
709                    write!(f, "SET RESOURCE_GROUP TO {}{}", resource_group, deferred)
710                } else {
711                    write!(f, "RESET RESOURCE_GROUP{}", deferred)
712                }
713            }
714            AlterSinkOperation::SetConfig { entries } => {
715                write!(f, "SET CONFIG ({})", display_comma_separated(entries))
716            }
717            AlterSinkOperation::ResetConfig { keys } => {
718                write!(f, "RESET CONFIG ({})", display_comma_separated(keys))
719            }
720            AlterSinkOperation::SwapRenameSink { target_sink } => {
721                write!(f, "SWAP WITH {}", target_sink)
722            }
723            AlterSinkOperation::SetSinkRateLimit { rate_limit } => {
724                write!(f, "SET SINK_RATE_LIMIT TO {}", rate_limit)
725            }
726            AlterSinkOperation::SetBackfillRateLimit { rate_limit } => {
727                write!(f, "SET BACKFILL_RATE_LIMIT TO {}", rate_limit)
728            }
729            AlterSinkOperation::AlterConnectorProps {
730                alter_props: changed_props,
731            } => {
732                write!(
733                    f,
734                    "CONNECTOR WITH ({})",
735                    display_comma_separated(changed_props)
736                )
737            }
738            AlterSinkOperation::SetStreamingEnableUnalignedJoin { enable } => {
739                write!(f, "SET STREAMING_ENABLE_UNALIGNED_JOIN TO {}", enable)
740            }
741        }
742    }
743}
744
745impl fmt::Display for AlterSubscriptionOperation {
746    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
747        match self {
748            AlterSubscriptionOperation::RenameSubscription { subscription_name } => {
749                write!(f, "RENAME TO {subscription_name}")
750            }
751            AlterSubscriptionOperation::ChangeOwner { new_owner_name } => {
752                write!(f, "OWNER TO {}", new_owner_name)
753            }
754            AlterSubscriptionOperation::SetSchema { new_schema_name } => {
755                write!(f, "SET SCHEMA {}", new_schema_name)
756            }
757            AlterSubscriptionOperation::SetRetention { retention } => {
758                write!(f, "SET RETENTION TO {}", retention)
759            }
760            AlterSubscriptionOperation::SwapRenameSubscription {
761                target_subscription,
762            } => {
763                write!(f, "SWAP WITH {}", target_subscription)
764            }
765        }
766    }
767}
768
769impl fmt::Display for AlterSourceOperation {
770    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
771        match self {
772            AlterSourceOperation::RenameSource { source_name } => {
773                write!(f, "RENAME TO {source_name}")
774            }
775            AlterSourceOperation::AddColumn { column_def } => {
776                write!(f, "ADD COLUMN {column_def}")
777            }
778            AlterSourceOperation::ChangeOwner { new_owner_name } => {
779                write!(f, "OWNER TO {}", new_owner_name)
780            }
781            AlterSourceOperation::SetSchema { new_schema_name } => {
782                write!(f, "SET SCHEMA {}", new_schema_name)
783            }
784            AlterSourceOperation::FormatEncode { format_encode } => {
785                write!(f, "{format_encode}")
786            }
787            AlterSourceOperation::RefreshSchema => {
788                write!(f, "REFRESH SCHEMA")
789            }
790            AlterSourceOperation::SetSourceRateLimit { rate_limit } => {
791                write!(f, "SET SOURCE_RATE_LIMIT TO {}", rate_limit)
792            }
793            AlterSourceOperation::SwapRenameSource { target_source } => {
794                write!(f, "SWAP WITH {}", target_source)
795            }
796            AlterSourceOperation::SetParallelism {
797                parallelism,
798                deferred,
799            } => {
800                write!(
801                    f,
802                    "SET PARALLELISM TO {}{}",
803                    parallelism,
804                    if *deferred { " DEFERRED" } else { "" }
805                )
806            }
807            AlterSourceOperation::SetBackfillParallelism {
808                parallelism,
809                deferred,
810            } => {
811                write!(
812                    f,
813                    "SET BACKFILL_PARALLELISM TO {}{}",
814                    parallelism,
815                    if *deferred { " DEFERRED" } else { "" }
816                )
817            }
818            AlterSourceOperation::SetConfig { entries } => {
819                write!(f, "SET CONFIG ({})", display_comma_separated(entries))
820            }
821            AlterSourceOperation::ResetConfig { keys } => {
822                write!(f, "RESET CONFIG ({})", display_comma_separated(keys))
823            }
824            AlterSourceOperation::ResetSource => {
825                write!(f, "RESET")
826            }
827            AlterSourceOperation::AlterConnectorProps { alter_props } => {
828                write!(
829                    f,
830                    "CONNECTOR WITH ({})",
831                    display_comma_separated(alter_props)
832                )
833            }
834        }
835    }
836}
837
838impl fmt::Display for AlterFunctionOperation {
839    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
840        match self {
841            AlterFunctionOperation::SetSchema { new_schema_name } => {
842                write!(f, "SET SCHEMA {new_schema_name}")
843            }
844            AlterFunctionOperation::ChangeOwner { new_owner_name } => {
845                write!(f, "OWNER TO {new_owner_name}")
846            }
847        }
848    }
849}
850
851impl fmt::Display for AlterConnectionOperation {
852    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
853        match self {
854            AlterConnectionOperation::SetSchema { new_schema_name } => {
855                write!(f, "SET SCHEMA {new_schema_name}")
856            }
857            AlterConnectionOperation::ChangeOwner { new_owner_name } => {
858                write!(f, "OWNER TO {new_owner_name}")
859            }
860            AlterConnectionOperation::AlterConnectorProps { alter_props } => {
861                write!(
862                    f,
863                    "CONNECTOR WITH ({})",
864                    display_comma_separated(alter_props)
865                )
866            }
867        }
868    }
869}
870
871impl fmt::Display for AlterSecretOperation {
872    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
873        match self {
874            AlterSecretOperation::ChangeCredential {
875                new_credential,
876                with_options,
877            } => {
878                write!(
879                    f,
880                    "WITH ({}) AS {}",
881                    display_comma_separated(with_options),
882                    new_credential
883                )
884            }
885            AlterSecretOperation::ChangeOwner { new_owner_name } => {
886                write!(f, "OWNER TO {new_owner_name}")
887            }
888        }
889    }
890}
891
892/// An `ALTER COLUMN` (`Statement::AlterTable`) operation
893#[derive(Debug, Clone, PartialEq, Eq, Hash)]
894pub enum AlterColumnOperation {
895    /// `SET NOT NULL`
896    SetNotNull,
897    /// `DROP NOT NULL`
898    DropNotNull,
899    /// `SET DEFAULT <expr>`
900    SetDefault { value: Expr },
901    /// `DROP DEFAULT`
902    DropDefault,
903    /// `[SET DATA] TYPE <data_type> [USING <expr>]`
904    SetDataType {
905        data_type: DataType,
906        /// PostgreSQL specific
907        using: Option<Expr>,
908    },
909}
910
911impl fmt::Display for AlterColumnOperation {
912    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
913        match self {
914            AlterColumnOperation::SetNotNull => write!(f, "SET NOT NULL",),
915            AlterColumnOperation::DropNotNull => write!(f, "DROP NOT NULL",),
916            AlterColumnOperation::SetDefault { value } => {
917                write!(f, "SET DEFAULT {}", value)
918            }
919            AlterColumnOperation::DropDefault => {
920                write!(f, "DROP DEFAULT")
921            }
922            AlterColumnOperation::SetDataType { data_type, using } => {
923                if let Some(expr) = using {
924                    write!(f, "SET DATA TYPE {} USING {}", data_type, expr)
925                } else {
926                    write!(f, "SET DATA TYPE {}", data_type)
927                }
928            }
929        }
930    }
931}
932
933impl fmt::Display for AlterFragmentOperation {
934    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
935        match self {
936            AlterFragmentOperation::AlterBackfillRateLimit { rate_limit } => {
937                write!(f, "SET BACKFILL_RATE_LIMIT TO {}", rate_limit)
938            }
939            AlterFragmentOperation::SetParallelism { parallelism } => {
940                write!(f, "SET PARALLELISM TO {}", parallelism)
941            }
942        }
943    }
944}
945
946impl fmt::Display for AlterCompactionGroupOperation {
947    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
948        match self {
949            AlterCompactionGroupOperation::Set { configs } => {
950                struct Assign<'a>(&'a ConfigParam);
951
952                impl fmt::Display for Assign<'_> {
953                    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
954                        write!(f, "{} = {}", self.0.param, self.0.value)
955                    }
956                }
957
958                let assigns: Vec<Assign<'_>> = configs.iter().map(Assign).collect();
959                write!(f, "SET {}", display_comma_separated(&assigns))
960            }
961        }
962    }
963}
964
965/// The watermark on source.
966/// `WATERMARK FOR <column> AS (<expr>)`
967#[derive(Debug, Clone, PartialEq, Eq, Hash)]
968pub struct SourceWatermark {
969    pub column: Ident,
970    pub expr: Expr,
971    /// Whether `WITH TTL` is specified.
972    pub with_ttl: bool,
973}
974
975impl fmt::Display for SourceWatermark {
976    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
977        write!(f, "WATERMARK FOR {} AS {}", self.column, self.expr,)?;
978        if self.with_ttl {
979            write!(f, " WITH TTL")?;
980        }
981        Ok(())
982    }
983}
984
985/// A table-level constraint, specified in a `CREATE TABLE` or an
986/// `ALTER TABLE ADD <constraint>` statement.
987#[derive(Debug, Clone, PartialEq, Eq, Hash)]
988pub enum TableConstraint {
989    /// `[ CONSTRAINT <name> ] { PRIMARY KEY | UNIQUE } (<columns>)`
990    Unique {
991        name: Option<Ident>,
992        columns: Vec<Ident>,
993        /// Whether this is a `PRIMARY KEY` or just a `UNIQUE` constraint
994        is_primary: bool,
995    },
996    /// A referential integrity constraint (`[ CONSTRAINT <name> ] FOREIGN KEY (<columns>)
997    /// REFERENCES <foreign_table> (<referred_columns>)
998    /// { [ON DELETE <referential_action>] [ON UPDATE <referential_action>] |
999    ///   [ON UPDATE <referential_action>] [ON DELETE <referential_action>]
1000    /// }`).
1001    ForeignKey {
1002        name: Option<Ident>,
1003        columns: Vec<Ident>,
1004        foreign_table: ObjectName,
1005        referred_columns: Vec<Ident>,
1006        on_delete: Option<ReferentialAction>,
1007        on_update: Option<ReferentialAction>,
1008    },
1009    /// `[ CONSTRAINT <name> ] CHECK (<expr>)`
1010    Check {
1011        name: Option<Ident>,
1012        expr: Box<Expr>,
1013    },
1014}
1015
1016impl fmt::Display for TableConstraint {
1017    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1018        match self {
1019            TableConstraint::Unique {
1020                name,
1021                columns,
1022                is_primary,
1023            } => write!(
1024                f,
1025                "{}{} ({})",
1026                display_constraint_name(name),
1027                if *is_primary { "PRIMARY KEY" } else { "UNIQUE" },
1028                display_comma_separated(columns)
1029            ),
1030            TableConstraint::ForeignKey {
1031                name,
1032                columns,
1033                foreign_table,
1034                referred_columns,
1035                on_delete,
1036                on_update,
1037            } => {
1038                write!(
1039                    f,
1040                    "{}FOREIGN KEY ({}) REFERENCES {}({})",
1041                    display_constraint_name(name),
1042                    display_comma_separated(columns),
1043                    foreign_table,
1044                    display_comma_separated(referred_columns),
1045                )?;
1046                if let Some(action) = on_delete {
1047                    write!(f, " ON DELETE {}", action)?;
1048                }
1049                if let Some(action) = on_update {
1050                    write!(f, " ON UPDATE {}", action)?;
1051                }
1052                Ok(())
1053            }
1054            TableConstraint::Check { name, expr } => {
1055                write!(f, "{}CHECK ({})", display_constraint_name(name), expr)
1056            }
1057        }
1058    }
1059}
1060
1061/// SQL column definition
1062#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1063pub struct ColumnDef {
1064    pub name: Ident,
1065    pub data_type: Option<DataType>,
1066    pub collation: Option<ObjectName>,
1067    pub options: Vec<ColumnOptionDef>,
1068}
1069
1070impl ColumnDef {
1071    pub fn new(
1072        name: Ident,
1073        data_type: DataType,
1074        collation: Option<ObjectName>,
1075        options: Vec<ColumnOptionDef>,
1076    ) -> Self {
1077        ColumnDef {
1078            name,
1079            data_type: Some(data_type),
1080            collation,
1081            options,
1082        }
1083    }
1084
1085    pub fn is_generated(&self) -> bool {
1086        self.options
1087            .iter()
1088            .any(|option| matches!(option.option, ColumnOption::GeneratedColumns(_)))
1089    }
1090}
1091
1092impl fmt::Display for ColumnDef {
1093    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1094        write!(
1095            f,
1096            "{} {}",
1097            self.name,
1098            if let Some(data_type) = &self.data_type {
1099                data_type.to_string()
1100            } else {
1101                "None".to_owned()
1102            }
1103        )?;
1104        for option in &self.options {
1105            write!(f, " {}", option)?;
1106        }
1107        Ok(())
1108    }
1109}
1110
1111/// An optionally-named `ColumnOption`: `[ CONSTRAINT <name> ] <column-option>`.
1112///
1113/// Note that implementations are substantially more permissive than the ANSI
1114/// specification on what order column options can be presented in, and whether
1115/// they are allowed to be named. The specification distinguishes between
1116/// constraints (NOT NULL, UNIQUE, PRIMARY KEY, and CHECK), which can be named
1117/// and can appear in any order, and other options (DEFAULT, GENERATED), which
1118/// cannot be named and must appear in a fixed order. PostgreSQL, however,
1119/// allows preceding any option with `CONSTRAINT <name>`, even those that are
1120/// not really constraints, like NULL and DEFAULT. MSSQL is less permissive,
1121/// allowing DEFAULT, UNIQUE, PRIMARY KEY and CHECK to be named, but not NULL or
1122/// NOT NULL constraints (the last of which is in violation of the spec).
1123///
1124/// For maximum flexibility, we don't distinguish between constraint and
1125/// non-constraint options, lumping them all together under the umbrella of
1126/// "column options," and we allow any column option to be named.
1127#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1128pub struct ColumnOptionDef {
1129    pub name: Option<Ident>,
1130    pub option: ColumnOption,
1131}
1132
1133impl fmt::Display for ColumnOptionDef {
1134    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1135        write!(f, "{}{}", display_constraint_name(&self.name), self.option)
1136    }
1137}
1138
1139/// `ColumnOption`s are modifiers that follow a column definition in a `CREATE
1140/// TABLE` statement.
1141#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1142pub enum ColumnOption {
1143    /// `NULL`
1144    Null,
1145    /// `NOT NULL`
1146    NotNull,
1147    /// `DEFAULT <restricted-expr>`
1148    DefaultValue(Expr),
1149    /// Default value from previous bound `DefaultColumnDesc`. Used internally
1150    /// for schema change and should not be specified by users.
1151    DefaultValueInternal {
1152        /// Protobuf encoded `DefaultColumnDesc`.
1153        persisted: Box<[u8]>,
1154        /// Optional AST for unparsing. If `None`, the default value will be
1155        /// shown as `DEFAULT INTERNAL` which is for demonstrating and should
1156        /// not be specified by users.
1157        expr: Option<Expr>,
1158    },
1159    /// `{ PRIMARY KEY | UNIQUE }`
1160    Unique { is_primary: bool },
1161    /// A referential integrity constraint (`[FOREIGN KEY REFERENCES
1162    /// <foreign_table> (<referred_columns>)
1163    /// { [ON DELETE <referential_action>] [ON UPDATE <referential_action>] |
1164    ///   [ON UPDATE <referential_action>] [ON DELETE <referential_action>]
1165    /// }`).
1166    ForeignKey {
1167        foreign_table: ObjectName,
1168        referred_columns: Vec<Ident>,
1169        on_delete: Option<ReferentialAction>,
1170        on_update: Option<ReferentialAction>,
1171    },
1172    /// `CHECK (<expr>)`
1173    Check(Expr),
1174    /// Dialect-specific options, such as:
1175    /// - MySQL's `AUTO_INCREMENT` or SQLite's `AUTOINCREMENT`
1176    /// - ...
1177    DialectSpecific(Vec<Token>),
1178    /// AS ( <generation_expr> )`
1179    GeneratedColumns(Expr),
1180}
1181
1182impl fmt::Display for ColumnOption {
1183    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1184        use ColumnOption::*;
1185        match self {
1186            Null => write!(f, "NULL"),
1187            NotNull => write!(f, "NOT NULL"),
1188            DefaultValue(expr) => write!(f, "DEFAULT {}", expr),
1189            DefaultValueInternal { persisted: _, expr } => {
1190                if let Some(expr) = expr {
1191                    write!(f, "DEFAULT {}", expr)
1192                } else {
1193                    write!(f, "DEFAULT INTERNAL")
1194                }
1195            }
1196            Unique { is_primary } => {
1197                write!(f, "{}", if *is_primary { "PRIMARY KEY" } else { "UNIQUE" })
1198            }
1199            ForeignKey {
1200                foreign_table,
1201                referred_columns,
1202                on_delete,
1203                on_update,
1204            } => {
1205                write!(f, "REFERENCES {}", foreign_table)?;
1206                if !referred_columns.is_empty() {
1207                    write!(f, " ({})", display_comma_separated(referred_columns))?;
1208                }
1209                if let Some(action) = on_delete {
1210                    write!(f, " ON DELETE {}", action)?;
1211                }
1212                if let Some(action) = on_update {
1213                    write!(f, " ON UPDATE {}", action)?;
1214                }
1215                Ok(())
1216            }
1217            Check(expr) => write!(f, "CHECK ({})", expr),
1218            DialectSpecific(val) => write!(f, "{}", display_separated(val, " ")),
1219            GeneratedColumns(expr) => write!(f, "AS {}", expr),
1220        }
1221    }
1222}
1223
1224fn display_constraint_name(name: &'_ Option<Ident>) -> impl fmt::Display + '_ {
1225    struct ConstraintName<'a>(&'a Option<Ident>);
1226    impl fmt::Display for ConstraintName<'_> {
1227        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1228            if let Some(name) = self.0 {
1229                write!(f, "CONSTRAINT {} ", name)?;
1230            }
1231            Ok(())
1232        }
1233    }
1234    ConstraintName(name)
1235}
1236
1237/// `<referential_action> =
1238/// { RESTRICT | CASCADE | SET NULL | NO ACTION | SET DEFAULT }`
1239///
1240/// Used in foreign key constraints in `ON UPDATE` and `ON DELETE` options.
1241#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1242pub enum ReferentialAction {
1243    Restrict,
1244    Cascade,
1245    SetNull,
1246    NoAction,
1247    SetDefault,
1248}
1249
1250impl fmt::Display for ReferentialAction {
1251    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1252        f.write_str(match self {
1253            ReferentialAction::Restrict => "RESTRICT",
1254            ReferentialAction::Cascade => "CASCADE",
1255            ReferentialAction::SetNull => "SET NULL",
1256            ReferentialAction::NoAction => "NO ACTION",
1257            ReferentialAction::SetDefault => "SET DEFAULT",
1258        })
1259    }
1260}
1261
1262/// secure secret definition for webhook source
1263#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1264pub struct WebhookSourceInfo {
1265    pub secret_ref: Option<SecretRefValue>,
1266    pub signature_expr: Option<Expr>,
1267    pub wait_for_persistence: bool,
1268    pub is_batched: bool,
1269}